GPU Dispatcher Queue

اَلسَلامُ عَلَيْكُم

(Peace be upon you)

Building a Production GPU Dispatcher

Distributed GPU orchestration, job reliability, and the bugs we shipped.

Choose your reading preference

Select a theme for the best reading experience

Part 1: Architecture

February 2026 • ~15 min read

What We Built

A dispatcher that routes AI inference requests across multiple GPU servers:

  • Models: zimg (image, ~21s), flux (image, 5-20min), trellis (3D, ~10min), ltx2 (video, 5-20min)
  • Capacity: Each GPU handles 2-4 concurrent jobs ("slots")
  • Patterns: Mix of sync and async response APIs

Requirements: Zero job loss, FIFO ordering with priority support, efficient GPU utilization, handle crashes/restarts/network failures.

Why Cloud Tasks + Redis

We needed reliable job delivery without adding infrastructure complexity.

Why not RabbitMQ/SQS? — Another service to operate, monitor, and scale. We already needed Redis for state management (locks, results, WAL). Adding a separate queue meant more moving parts.

Why Cloud Tasks? — Acts as a buffer between clients and our dispatcher. We can throw a million requests at it without overwhelming Azure App Service, which would straight up reject or lose requests if hit directly with that load. Cloud Tasks queues them and delivers at a controlled rate with built-in retries. No infrastructure to manage on our side — just HTTP endpoints.

Why Redis for queuing? — Already there for state. Sorted sets gave us exactly the priority logic and retry semantics we needed. Full control without fighting a framework.

GPU Server Priority Model

A key design decision: GPU servers differentiate between dispatcher traffic and direct calls.

CallerHeaderBehavior
DispatcherX-Source: dispatcherReturns 503 if busy or if direct queue has pending jobs
DirectNoneQueued with priority (max 10), processed first

Why this matters: When testing or running demos, direct API calls shouldn't compete with bulk dispatcher traffic. Direct calls go to a priority queue that drains completely before dispatcher traffic gets any slots.

@app.post("/generate")
def generate(request: Request, payload: GenerateRequest):
    is_dispatcher = request.headers.get("X-Source") == "dispatcher"

    if is_dispatcher:
        # Reject if busy OR if direct queue has pending work
        if len(active_jobs) >= MAX_CONCURRENT or len(direct_queue) > 0:
            raise HTTPException(503, "GPU busy")
        return process_job(payload)

    else:
        # Direct call - add to priority queue
        if len(direct_queue) >= MAX_DIRECT_QUEUE:
            raise HTTPException(503, "Queue full")
        direct_queue.append(Job(id=job_id, payload=payload))
        return {"status": "queued", "job_id": job_id, "position": len(direct_queue)}

This creates a cooperative system: the dispatcher knows to route elsewhere on 503, and GPUs protect their capacity for high-priority direct access when needed.

Sync vs Async GPU Patterns

Different models have different inference times. The dispatcher handles both patterns:

PatternInference TimeGPU ResponseDispatcher Behavior
Sync< 30s{"status": "success", "result": {...}}Stores result immediately, releases lease
Async> 30s{"status": "processing", "job_id": "xxx"}Spawns poller, holds lease until completion

For async jobs, the dispatcher polls the GPU's /status/{job_id} endpoint every few seconds. The lease is held for the entire duration — not released after submission. This ensures the dispatcher's slot count matches actual GPU capacity.

# Model configuration determines behavior
"zimg": ModelConfig(
    model_type=ModelType.SYNC,  # Returns result immediately
)

"flux": ModelConfig(
    model_type=ModelType.ASYNC,
    poll_endpoint_template="/status/{id}",
    poll_interval=3.0,
    max_poll_time=1200.0,  # 20 minutes max
)

Callbacks & Webhooks

Instead of clients polling for results, they provide a callback_url:

POST /queue/submit
{
  "model": "zimg",
  "payload": {"prompt": "a sunset"},
  "callback_url": "https://your-backend.com/webhook/gpu-result"
}

When the job completes, dispatcher POSTs the result:

POST https://your-backend.com/webhook/gpu-result
{
  "job_id": "job_123",
  "status": "completed",
  "result": {"url": "https://s3.../image.png"}
}

Key point: The GPU never sees the client's callback_url. The dispatcher is always the intermediary:

  1. Client → Dispatcher: Client provides callback_url, dispatcher stores it in job metadata
  2. Dispatcher → GPU: Dispatcher sends job to GPU (without client's callback)
  3. GPU → Dispatcher: For async jobs, dispatcher polls GPU's /status endpoint (or GPU can call dispatcher's /tasks/callback/{job_id})
  4. Dispatcher → callback_url: Dispatcher stores result, releases lock, then forwards to client's callback_url

The callback_url can be any endpoint — the original client, a different microservice, a Slack webhook, anything that accepts HTTP POST. The callback is fire-and-forget (with retries) after the slot is already freed for the next job.

System Architecture

┌─────────────┐
│   Client    │
└──────┬──────┘
       │ POST /queue/submit
       ▼
┌──────────────┐  creates task   ┌─────────────────┐
│  Dispatcher  │────────────────▶│  GCP Cloud Tasks │
└──────────────┘                 └────────┬────────┘
       ▲                                  │
       │ pushes task via HTTP             │
       └──────────────────────────────────┘
       │
       │ acquire lease, store state
       ▼
┌─────────────┐
│    Redis    │
│  (Cluster)  │
└─────────────┘
       │
       │ forward request
       ▼
┌─────────┐  ┌─────────┐  ┌─────────┐
│  zimg   │  │  flux   │  │ trellis │
└─────────┘  └─────────┘  └─────────┘
ComponentRole
DispatcherFastAPI service. Receives requests, manages GPU leases, routes to appropriate GPU
Redis ClusterStores locks, job queues, results, metadata. All state lives here
Cloud TasksReliable task delivery with built-in concurrency control
GPU ServicesModel-specific inference servers on Azure

Request Flow

1. Client POSTs to /queue/submit
   {"model": "zimg", "payload": {...}, "callback_url": "..."}

2. Dispatcher writes WAL entry
   Redis: SET pending:job_abc {...} EX 86400

3. Dispatcher creates Cloud Task → gpu-zimg queue

4. Cloud Tasks calls /tasks/handle

5. Dispatcher acquires GPU lease
   Redis: SET lock:zimg-api4-slot0 job_abc NX EX 300

6. Dispatcher POSTs to GPU

7. GPU returns response (sync or async job ID)

8. For async jobs: Dispatcher spawns poller
   - Polls /status every 2-5 seconds
   - Refreshes lease TTL every 60 seconds

9. On completion:
   - Store result, release lease, send callback
   - Delete WAL entry, trigger next job

Redis Data Model

# GPU Instance Registry
models                          # SET: ["zimg", "flux", "trellis", ...]
model:{model}:instances         # SET: ["zimg-api4-slot0", ...]
instance:{instance_id}          # HASH: {endpoint, status, healthy}

# Atomic Locks (auto-expire)
lock:{instance_id}              # STRING: job_id (EX: 300 seconds)

# Job Queue (sorted set = FIFO order)
jobs:queue:{model}              # ZSET: {job_id: timestamp_score}
jobs:queue_data:{job_id}        # HASH: {payload, callback_url, attempts}

# Results (with TTL)
result:{job_id}                 # JSON: {status, result, completed_at}

# Write-Ahead Log (crash recovery)
pending:{job_id}                # JSON: {job_id, payload} (EX: 86400)

# Dead Letter Queue
jobs:dlq                        # LIST: [job_id, ...]

Dynamic Model Configuration

Models and instances can be registered and configured at runtime — no redeployment needed.

Registering GPU Instances:

POST /admin/register
{
  "model": "flux",
  "server": "flux-server-1",
  "endpoint": "https://flux.example.com/generate",
  "slots": 2
}

This automatically registers the instances in Redis and creates/updates the Cloud Tasks queue with correct concurrency limits.

Runtime Model Config:

Model behavior can be tuned via API without code changes:

# Get current config
GET /admin/model-configs/flux

# Update at runtime (stored in Redis, overrides code defaults)
PUT /admin/model-configs/flux
{
  "model_type": "async",
  "poll_endpoint_template": "/status/{id}",
  "poll_interval": 5.0,
  "max_poll_time": 1800,
  "max_retries": 3
}

# Delete override (reverts to code default)
DELETE /admin/model-configs/flux

Configurable Fields:

FieldDescriptionDefault
model_type"sync" or "async"sync
poll_endpoint_templateURL template for polling (e.g., "/status/{id}")null
id_fieldField name in GPU response containing job IDnull
poll_intervalSeconds between poll requests2.0
max_poll_timeMax seconds to poll before timeout600
max_retriesRetries before moving to DLQ50

This lets you add new GPU models, adjust timeouts for slow models, or change retry behavior — all without touching code or restarting the dispatcher.


Part 2: Reliability

Core requirement: every submitted job must either complete or be explicitly marked as failed. No silent drops.

Write-Ahead Log (WAL)

The pattern: write to durable storage before doing anything else.

async def submit_job(request: JobSubmitRequest) -> str:
    job_id = generate_job_id()

    # Step 1: Write to WAL FIRST
    await redis.set(
        f"pending:{job_id}",
        json.dumps({...}),
        ex=86400  # 24 hour TTL
    )

    # Step 2: Now create Cloud Task
    await cloud_tasks.create_task(job_id, request)

    return job_id

A reconciler runs every 5 minutes, finds jobs in WAL for >5 min without completion, and re-queues them. WAL entry deleted only on definitive completion.

Retry Queue with Priority Support

Redis sorted set for the queue. Score determines order:

async def add_to_queue(job_id, model, payload, priority=False):
    if priority:
        score = time.time() - 1e12  # Negative scores go first
    else:
        score = time.time()

    await redis.zadd(f"jobs:queue:{model}", {job_id: score})

Bug: Priority LIFO — First implementation used score = -timestamp. Newer priority jobs had more negative scores, so LIFO not FIFO. Fix: score = timestamp - 1e12

Exponential Backoff

Failed jobs retry with increasing delays:

def calculate_retry_delay(attempt: int) -> float:
    base_delay = 1.0
    max_delay = 30.0
    delay = min(base_delay * (2 ** attempt), max_delay)

    # Add ±10% jitter to prevent thundering herd
    jitter = delay * 0.1 * (random.random() * 2 - 1)
    return delay + jitter

# Attempt 0: ~1s
# Attempt 1: ~2s
# Attempt 2: ~4s
# Attempt 3: ~8s
# Attempt 4: ~16s
# Attempt 5+: ~30s (capped)

The jitter prevents multiple failed jobs from retrying at exactly the same time (thundering herd).

Dead Letter Queue

After 50 retries, jobs move to DLQ for manual inspection:

async def move_to_dlq(job_id: str, error: str):
    job_data = await redis.hgetall(f"jobs:queue_data:{job_id}")

    await redis.rpush("jobs:dlq", job_id)
    await redis.hset(f"jobs:dlq_data:{job_id}", mapping={
        **job_data,
        "error": error,
        "failed_at": datetime.utcnow().isoformat()
    })

    await update_result(job_id, status="max_retries_exceeded")

Admin endpoints let you inspect DLQ, retry specific jobs, or retry all:

GET  /admin/dlq                    # List all DLQ jobs
POST /admin/dlq/{job_id}/retry     # Retry specific job
POST /admin/dlq/retry-all          # Retry everything
DELETE /admin/dlq/{job_id}         # Delete from DLQ

Orphan Detection

Sometimes jobs get stuck — GPU accepts the request but never responds (process crash, OOM, infinite loop). A background worker runs every 60 seconds:

async def orphan_detector():
    while True:
        processing_jobs = await get_jobs_by_status("processing")

        for job in processing_jobs:
            started_at = parse_datetime(job["started_at"])
            if now() - started_at > timedelta(minutes=10):
                await update_result(job["job_id"],
                    status="failed",
                    error="Exceeded max processing time")
                await release_lease(job["instance_id"])

        await asyncio.sleep(60)

Jobs stuck in "processing" for >10 minutes get marked failed and their lease released. The WAL reconciler can then pick them up for retry if attempts remain.

Event-Driven + Polling Fallback

When a job completes, we immediately trigger the next one:

async def release_lease(instance_id: str, trigger_next: bool = True):
    await redis.delete(f"lock:{instance_id}")

    if trigger_next:
        model = instance_id.split("-")[0]
        asyncio.create_task(process_next_job(model))

But we also run a background worker every 2 seconds that checks for available slots:

async def queue_worker():
    while True:
        for model in await get_all_models():
            available_slots = await count_available_slots(model)
            for _ in range(available_slots):
                asyncio.create_task(process_next_job(model))
        await asyncio.sleep(2)

The event-driven path handles the normal case quickly. The polling fallback catches anything missed (network blip, dispatcher restart, race condition).


Part 3: Distributed Systems

This is where subtle bugs live. Code that looks correct fails under concurrency.

Atomic Leasing

Each GPU slot protected by Redis lock:

async def acquire_lease(model: str, job_id: str) -> Optional[str]:
    for instance_id in await get_instances(model):
        # SET NX = only if not exists, EX = auto-expire
        acquired = await redis.set(
            f"lock:{instance_id}",
            job_id,
            nx=True,
            ex=300
        )
        if acquired:
            return instance_id
    return None

The NX flag makes this atomic. Two dispatchers trying same slot = only one wins.

Lua Scripts for True Atomicity

The loop above has a subtle issue: between checking instances and acquiring a lock, another dispatcher could grab it. For true atomicity, we use a Lua script that runs entirely within Redis:

-- Find a healthy instance and lock it in ONE atomic operation
local instances = redis.call('SMEMBERS', 'model:' .. model .. ':instances')
for _, instance_id in ipairs(instances) do
    local healthy = redis.call('HGET', 'instance:' .. instance_id, 'healthy')
    if healthy == 'true' then
        -- SET NX = only set if not exists (atomic lock)
        local acquired = redis.call('SET', 'lock:' .. instance_id, job_id, 'NX', 'EX', ttl)
        if acquired then
            return instance_id  -- Got the lock!
        end
    end
end
return nil  -- No available instance

The entire script executes atomically — no other Redis commands can interleave. This eliminates race conditions between checking availability and acquiring the lock.

Lease TTL During Long Jobs

Lock TTL is 5 minutes (safety net if dispatcher crashes). But async jobs can take 20+ minutes. The poller refreshes the TTL every 60 seconds:

async def poll_async_job(job_id: str, instance_id: str):
    try:
        while True:
            # Keep the lease alive
            await redis.expire(f"lock:{instance_id}", 300)

            status = await poll_gpu_status(job_id)

            if status["status"] in ["success", "completed", "done"]:
                return status["result"]
            elif status["status"] in ["failed", "error"]:
                raise GPUError(status.get("error"))

            await asyncio.sleep(polling_interval)
    finally:
        # Always release, even on exception
        await release_lease(instance_id)

The try/finally ensures the lease is released even if the poller crashes or gets an unexpected exception.

Bug: Non-Atomic Queue Pop

Original code:

async def pop_next_job(model: str):
    jobs = await redis.zrange(f"jobs:queue:{model}", 0, 0)  # Get first
    job_id = jobs[0]
    await redis.zrem(f"jobs:queue:{model}", job_id)  # Remove it
    return await redis.hgetall(f"jobs:queue_data:{job_id}")

The bug: zrange and zrem are separate commands. Not atomic. Concurrent workers could both get same job, one removes it, other gets stuck.

Fix: use ZPOPMIN which atomically pops and returns:

async def pop_next_job(model: str):
    result = await redis.zpopmin(f"jobs:queue:{model}", count=1)
    if not result:
        return None
    job_id = result[0][0]
    return await redis.hgetall(f"jobs:queue_data:{job_id}")

Bug: Burning Retries on Capacity

Original flow: pop job first, then try to acquire lease. If no capacity, put job back with incremented attempts.

async def process_next_job(model: str):
    job = await pop_next_job(model)  # Pop first
    if not job:
        return

    instance_id = await acquire_lease(model, job["job_id"])
    if not instance_id:
        # No GPU slots available, put it back
        await add_to_queue(job, increment_attempts=True)  # BUG
        return

    await execute_job(job, instance_id)

The bug: when no GPU capacity was available, we incremented the attempt count. Jobs burned through their retry budget just waiting for slots to free up.

Fix: acquire lease before popping:

async def process_next_job(model: str):
    # Try to get a slot first
    instance_id = await acquire_lease(model, "pending")
    if not instance_id:
        return  # No capacity, leave jobs in queue untouched

    # Now pop
    job = await pop_next_job(model)
    if not job:
        await release_lease(instance_id)
        return

    # Update lock with actual job_id
    await redis.set(f"lock:{instance_id}", job["job_id"], ex=300)

    await execute_job(job, instance_id)

Now retry count only increments on actual GPU failures, not capacity issues.

Syncing Locks with GPU Reality

Distributed state can diverge from reality. The dispatcher's view (Redis locks) and the GPU's actual state can get out of sync in several scenarios:

Scenario 1: GPU Restart

GPU server restarts (deployment, crash, Azure maintenance). It comes back with 0 active jobs. But dispatcher still holds locks from before the restart — slots appear "busy" when they're actually free.

Scenario 2: Dispatcher Crash Mid-Job

Dispatcher crashes while a job is running on GPU. The lock has TTL so it will eventually expire. But the GPU might finish the job — result sits there with no one polling for it. The WAL reconciler handles job recovery, but locks need separate cleanup.

Scenario 3: Network Partition

Dispatcher loses connectivity to GPU mid-job. From dispatcher's view, job failed. From GPU's view, job completed successfully. Lock might get released (job "failed"), but GPU slot is still occupied until inference finishes.

The Solution: Lock Sync Worker

A background worker runs every 30 seconds, comparing dispatcher's lock state with GPU's reported state:

async def lock_sync_worker():
    while True:
        for server in await get_all_servers():
            try:
                health = await fetch_gpu_health(server)

                if health["active_jobs"] == 0:
                    # GPU says it's idle, but we might have locks
                    locks = await get_locks_for_server(server)

                    for lock in locks:
                        ttl = await redis.ttl(f"lock:{lock}")
                        # Only clear old locks (TTL < 295 means lock is >5 sec old)
                        if ttl < 295:
                            await redis.delete(f"lock:{lock}")
                            logger.info(f"Cleared stale lock: {lock}")

            except Exception as e:
                logger.warning(f"Health check failed: {server}: {e}")

        await asyncio.sleep(30)

The TTL Check Race Condition

Why check ttl < 295? Consider this race:

  1. Lock sync worker calls GPU health → reports 0 active jobs
  2. Meanwhile, dispatcher acquires lock and submits new job
  3. Lock sync worker sees the lock, deletes it
  4. Another dispatcher instance acquires same slot → double-booking

The TTL check prevents this: fresh locks have TTL ~300. If TTL < 295, the lock is at least 5 seconds old — enough time for the job to show up in GPU's active_jobs count. We only clear locks that are definitively stale.

Why Not Just Use Lock TTL?

Lock TTL (5 min) is a safety net, not the primary mechanism. Without active sync:

  • GPU restarts → 5 minutes of wasted capacity waiting for locks to expire
  • Multiple GPUs restart → significant throughput loss
  • Lock sync recovers capacity in ~30 seconds instead of ~300

Redis Cluster: KEYS Only Hits One Node

In cluster mode, KEYS pattern only scans the node the client is connected to:

# This misses keys on other nodes!
keys = await redis.keys("result:*")

Fix: iterate all nodes:

async def keys_all_nodes(self, pattern: str) -> list:
    all_keys = []

    if self.cluster_mode:
        for node in self.redis.get_nodes():
            node_client = self.redis.get_redis_connection(node)
            keys = await node_client.keys(pattern)
            all_keys.extend(keys)
    else:
        all_keys = await self.redis.keys(pattern)

    return all_keys

Preliminary Load Test

GPU Dispatcher Dashboard

Load test with 50,000 jobs:

MetricResult
Completed + In Queue~49,500 (99%)
Lost~500 (1%)

Throughput: ~18-22 jobs/min with 8 GPU slots.

Open Problem: Lost ~500 jobs to Azure App Service 502 errors during burst traffic. Job never enters system, WAL can't save it. Evaluating upstream queue buffer or horizontal scaling.

Key Takeaways

  1. Write-Ahead Log — Persist before processing. Reconcile periodically.
  2. Atomic operationszrange + zrem looks atomic, isn't. Use ZPOPMIN.
  3. Lease lifecycle — For async jobs, hold lease for entire execution.
  4. Lease before pop — Acquire resources before claiming work.
  5. Event-driven + polling — Events for speed, polling for reliability.
  6. Sync with reality — Periodically reconcile state with actual GPU state.
  7. Cluster mode differsKEYS only hits one node. Iterate all nodes.
  8. Negative timestamp ordering-timestamp = LIFO. Use timestamp - 1e12 for priority FIFO.
← Blog Home Site Home →