@nolag/queue Real-time job queues with lifecycle tracking, progress updates, and worker management.
Overview Distribute work across real-time workers with full lifecycle visibility. Jobs follow a state machine: pending → claimed → active → completed or failed. Every state transition is broadcast instantly to all queue participants. Failed jobs are re-queued automatically until maxRetries is exhausted. Three roles participate in a queue: producers add jobs, workers claim and process them, and monitors observe queue state. A single client can fulfil multiple roles simultaneously.
Key Features State machine lifecycle: pending, claimed, active, completed, failed Atomic job claiming, so exactly one worker receives each job Configurable priority and automatic retry with backoff Real-time progress reporting (0-100) broadcast to all participants 7-day job history replay on reconnect Ephemeral progress channel keeps durable storage lean Worker online/offline presence via lobby How It Works NoLagQueue wraps the @nolag/js-sdk client and maintains a lobby for worker presence. Calling joinQueue(name) returns a QueueRoom that subscribes to two topics: jobs for durable lifecycle events and _progress for ephemeral progress updates. Job claiming uses a server-side atomic operation so concurrent claim attempts from multiple workers resolve to exactly one winner.
Topic Purpose Replay jobsJob lifecycle events: added, claimed, completed, failed, retrying 7 days _progressIn-flight progress updates (0-100), not persisted Ephemeral
Installation Terminal
npm install @nolag/queue @nolag/js-sdkQuick Start import { NoLagQueue } from '@nolag/queue'
const client = new NoLagQueue ('your-access-token' )
await client.connect ()
const queue = await client.joinQueue ('video-encoding' )
// ── Producer ─────────────────────────────────────────────────────────────────
const job = await queue.addJob ({
id: 'job-101' , // optional, auto-generated if omitted
payload: { videoId: 'vid_xyz' , preset: '1080p' },
priority: 10, // higher = processed sooner
maxRetries: 3,
})
console.log ('Added job:' , job.id, 'status:' , job.status) // 'pending'
// ── Worker ───────────────────────────────────────────────────────────────────
queue.on ('jobAdded' , async ({ job }) => {
// Atomically claim the job (only one worker succeeds)
const claimed = await queue.claimJob (job.id)
if (!claimed) return // another worker got it first
console.log ('Claimed job:' , job.id)
try {
// Report progress (0–100)
await queue.reportProgress (job.id, 25)
await doExpensiveWork (job.payload)
await queue.reportProgress (job.id, 100)
// Mark complete with result
await queue.completeJob (job.id, { outputUrl: 'https://cdn.example.com/vid.mp4' })
} catch (err) {
// Mark failed (retried automatically if maxRetries not exhausted)
await queue.failJob (job.id, err instanceof Error ? err.message : String (err))
}
})
// ── Monitor ───────────────────────────────────────────────────────────────────
queue.on ('jobProgress' , ({ jobId, progress }) => {
console.log (`Job ${jobId} is ${progress}% complete` )
})
queue.on ('jobCompleted' , ({ jobId, result }) => {
console.log (`Job ${jobId} done:` , result)
})
queue.on ('jobFailed' , ({ jobId, error, attempt, maxRetries }) => {
console.warn (`Job ${jobId} failed (attempt ${attempt}/${maxRetries}):` , error)
})
queue.on ('jobRetrying' , ({ jobId, attempt, maxRetries }) => {
console.log (`Retrying job ${jobId} (${attempt}/${maxRetries})...` )
})
// Queue depth monitoring
console.log ('Pending:' , await queue.pendingCount)
console.log ('Active:' , await queue.activeCount)API Reference NoLagQueue Method Returns Description connect()Promise<void>Establish the WebSocket connection and join the lobby. disconnect()voidClose the connection and leave the lobby. joinQueue(name)Promise<QueueRoom>Subscribe to a named queue. Returns the room instance. leaveQueue(name)Promise<void>Unsubscribe from a queue and release its resources. getOnlineWorkers()Worker[]Return all workers currently present in the lobby.
NoLagQueue Events Event Payload Description connectednone WebSocket connection established. disconnectedreason: stringConnection closed. reconnectednone Connection restored; job history replay begins automatically. errorerror: ErrorA transport or protocol error occurred. workerOnlineworker: WorkerA worker joined the lobby. workerOfflineworker: WorkerA worker left the lobby.
QueueRoom Method Role Returns Description addJob(opts)Producer Promise<Job>Enqueue a new job. Accepts id?, payload, priority?, maxRetries?. claimJob(jobId)Worker Promise<Job | null>Atomically claim a pending job. Returns null if another worker claimed it first. reportProgress(jobId, progress)Worker Promise<void>Broadcast a progress value (0-100) on the ephemeral channel. completeJob(jobId, result?)Worker Promise<void>Mark the job as completed with an optional result payload. failJob(jobId, error)Worker Promise<void>Mark the job as failed. Triggers retry if maxRetries has not been reached. getJob(id)Monitor Promise<Job | null>Fetch the current state of a job by ID. getJobs(filter?)Monitor Promise<Job[]>Fetch jobs filtered by status ('pending' | 'active' | 'completed' | 'failed'). pendingCountMonitor Promise<number>Number of jobs currently in the pending state. activeCountMonitor Promise<number>Number of jobs currently being processed by workers.
QueueRoom Events Event Payload Description jobAdded{ job: Job }A new job was added to the queue. jobClaimed{ jobId, workerId }A worker successfully claimed a job. jobProgress{ jobId, progress: number, workerId }A worker reported a progress update. Delivered via the ephemeral channel. jobCompleted{ jobId, result?, workerId }A job was completed successfully. jobFailed{ jobId, error, attempt, maxRetries, workerId }A job failed. If attempt < maxRetries the job will be retried. jobRetrying{ jobId, attempt, maxRetries }A failed job has been re-queued for another attempt. workerJoinedworker: WorkerA worker joined this queue. workerLeftworker: WorkerA worker left this queue. replayStart{ count: number }Job history replay has begun. replayEnd{ replayed: number }Job history replay has completed.