@nolag/queue

Real-time job queues with lifecycle tracking, progress updates, and worker management.

Overview

Distribute work across real-time workers with full lifecycle visibility. Jobs follow a state machine: pendingclaimedactivecompleted or failed. Every state transition is broadcast instantly to all queue participants. Failed jobs are re-queued automatically until maxRetries is exhausted. Three roles participate in a queue: producers add jobs, workers claim and process them, and monitors observe queue state. A single client can fulfil multiple roles simultaneously.

Key Features

  • State machine lifecycle: pending, claimed, active, completed, failed
  • Atomic job claiming, so exactly one worker receives each job
  • Configurable priority and automatic retry with backoff
  • Real-time progress reporting (0-100) broadcast to all participants
  • 7-day job history replay on reconnect
  • Ephemeral progress channel keeps durable storage lean
  • Worker online/offline presence via lobby

How It Works

NoLagQueue wraps the @nolag/js-sdk client and maintains a lobby for worker presence. Calling joinQueue(name) returns a QueueRoom that subscribes to two topics: jobs for durable lifecycle events and _progress for ephemeral progress updates. Job claiming uses a server-side atomic operation so concurrent claim attempts from multiple workers resolve to exactly one winner.

TopicPurposeReplay
jobsJob lifecycle events: added, claimed, completed, failed, retrying7 days
_progressIn-flight progress updates (0-100), not persistedEphemeral

Installation

npm install @nolag/queue @nolag/js-sdk

Quick Start

import { NoLagQueue } from '@nolag/queue'

const client = new NoLagQueue('your-access-token')
await client.connect()

const queue = await client.joinQueue('video-encoding')

// ── Producer ─────────────────────────────────────────────────────────────────
const job = await queue.addJob({
  id: 'job-101',           // optional, auto-generated if omitted
  payload: { videoId: 'vid_xyz', preset: '1080p' },
  priority: 10,            // higher = processed sooner
  maxRetries: 3,
})
console.log('Added job:', job.id, 'status:', job.status) // 'pending'

// ── Worker ───────────────────────────────────────────────────────────────────
queue.on('jobAdded', async ({ job }) => {
  // Atomically claim the job (only one worker succeeds)
  const claimed = await queue.claimJob(job.id)
  if (!claimed) return  // another worker got it first

  console.log('Claimed job:', job.id)

  try {
    // Report progress (0–100)
    await queue.reportProgress(job.id, 25)
    await doExpensiveWork(job.payload)
    await queue.reportProgress(job.id, 100)

    // Mark complete with result
    await queue.completeJob(job.id, { outputUrl: 'https://cdn.example.com/vid.mp4' })
  } catch (err) {
    // Mark failed (retried automatically if maxRetries not exhausted)
    await queue.failJob(job.id, err instanceof Error ? err.message : String(err))
  }
})

// ── Monitor ───────────────────────────────────────────────────────────────────
queue.on('jobProgress', ({ jobId, progress }) => {
  console.log(`Job ${jobId} is ${progress}% complete`)
})

queue.on('jobCompleted', ({ jobId, result }) => {
  console.log(`Job ${jobId} done:`, result)
})

queue.on('jobFailed', ({ jobId, error, attempt, maxRetries }) => {
  console.warn(`Job ${jobId} failed (attempt ${attempt}/${maxRetries}):`, error)
})

queue.on('jobRetrying', ({ jobId, attempt, maxRetries }) => {
  console.log(`Retrying job ${jobId} (${attempt}/${maxRetries})...`)
})

// Queue depth monitoring
console.log('Pending:', await queue.pendingCount)
console.log('Active:', await queue.activeCount)

API Reference

NoLagQueue

MethodReturnsDescription
connect()Promise<void>Establish the WebSocket connection and join the lobby.
disconnect()voidClose the connection and leave the lobby.
joinQueue(name)Promise<QueueRoom>Subscribe to a named queue. Returns the room instance.
leaveQueue(name)Promise<void>Unsubscribe from a queue and release its resources.
getOnlineWorkers()Worker[]Return all workers currently present in the lobby.

NoLagQueue Events

EventPayloadDescription
connectednoneWebSocket connection established.
disconnectedreason: stringConnection closed.
reconnectednoneConnection restored; job history replay begins automatically.
errorerror: ErrorA transport or protocol error occurred.
workerOnlineworker: WorkerA worker joined the lobby.
workerOfflineworker: WorkerA worker left the lobby.

QueueRoom

MethodRoleReturnsDescription
addJob(opts)ProducerPromise<Job>Enqueue a new job. Accepts id?, payload, priority?, maxRetries?.
claimJob(jobId)WorkerPromise<Job | null>Atomically claim a pending job. Returns null if another worker claimed it first.
reportProgress(jobId, progress)WorkerPromise<void>Broadcast a progress value (0-100) on the ephemeral channel.
completeJob(jobId, result?)WorkerPromise<void>Mark the job as completed with an optional result payload.
failJob(jobId, error)WorkerPromise<void>Mark the job as failed. Triggers retry if maxRetries has not been reached.
getJob(id)MonitorPromise<Job | null>Fetch the current state of a job by ID.
getJobs(filter?)MonitorPromise<Job[]>Fetch jobs filtered by status ('pending' | 'active' | 'completed' | 'failed').
pendingCountMonitorPromise<number>Number of jobs currently in the pending state.
activeCountMonitorPromise<number>Number of jobs currently being processed by workers.

QueueRoom Events

EventPayloadDescription
jobAdded{ job: Job }A new job was added to the queue.
jobClaimed{ jobId, workerId }A worker successfully claimed a job.
jobProgress{ jobId, progress: number, workerId }A worker reported a progress update. Delivered via the ephemeral channel.
jobCompleted{ jobId, result?, workerId }A job was completed successfully.
jobFailed{ jobId, error, attempt, maxRetries, workerId }A job failed. If attempt < maxRetries the job will be retried.
jobRetrying{ jobId, attempt, maxRetries }A failed job has been re-queued for another attempt.
workerJoinedworker: WorkerA worker joined this queue.
workerLeftworker: WorkerA worker left this queue.
replayStart{ count: number }Job history replay has begun.
replayEnd{ replayed: number }Job history replay has completed.