Workers & processing
A Worker declares the processor that runs your jobs. NextMQ runs the worker loop and calls your processor over a signed webhook — you just write the function.
Declaring a worker#
import { Worker } from '@nextmq/sdk'
export const summarizeWorker = new Worker('ai', async (job) => {
await job.updateProgress(20)
const summary = await llm.summarize(job.data.docId)
return { summary }
}, {
concurrency: 5,
limiter: { max: 100, duration: 1000 }, // protect your model provider's quota
})Workeron cold start re-registers idempotently — you don't manage worker lifecycle by hand.Mounting the route#
createNextMQHandler returns GET and POST handlers that verify signatures, dispatch jobs to the right worker, and serve a /health path. It must run on the Node.js runtime.
import { createNextMQHandler } from '@nextmq/sdk/next'
import { summarizeWorker } from '@/jobs/ai'
export const runtime = 'nodejs'
export const { GET, POST } = createNextMQHandler({
workers: [summarizeWorker],
})Registration#
Registration publishes one deterministic webhook URL per queue. The handler registers its workers, but if producers can enqueue before the route is first hit, register explicitly from your bootstrap or producer path:
import { ensureWorkersRegistered, getWorkerRegistrationStatuses } from '@nextmq/sdk/next'
import { summarizeWorker } from '@/jobs/ai'
await ensureWorkersRegistered([summarizeWorker])
const statuses = await getWorkerRegistrationStatuses([summarizeWorker])Liveness is inferred from webhook delivery, not an in-process heartbeat. A registered URL, secret, or worker option can't change silently — change it and the SDK throws WorkerRegistrationError until the old worker is unregistered.
/health path to re-assert registration for queues that go idle. See Deploying to Vercel.Worker options#
NextMQ runs the real BullMQ worker, so it owns and enforces these:
| Option | Description |
|---|---|
concurrency | Max jobs this worker processes in parallel. |
limiter | Rate limit: { max, duration } per window. |
timeoutMs | Max time to wait for your webhook to respond before dispatch counts as failed. |
lockDuration | How long a job's lock is held while processing. |
lockRenewTime | How often the lock is renewed. |
stalledInterval | How often stalled jobs are checked. |
maxStalledCount | Times a job can stall before being failed. |
drainDelay | Delay used when the queue drains. |
Signalling job outcome#
NextMQ maps your processor's behavior to job state:
- Return a value → the job is
completedwith that value. - Throw → the job
failed; retries and backoff apply. - Throw
UnrecoverableJobError→ fail immediately, skipping remaining attempts. - Throw
RateLimitError→ re-queue as rate-limited instead of failing.
import { Worker, UnrecoverableJobError, RateLimitError } from '@nextmq/sdk'
export const worker = new Worker('ai', async (job) => {
if (!job.data.docId) throw new UnrecoverableJobError('missing docId')
const res = await llm.summarize(job.data.docId)
if (res.status === 429) throw new RateLimitError(res.retryAfterMs)
return { summary: res.summary }
})Pause & resume
Pause or resume processing for a queue (admin scope):
await ai.pause()
await ai.resume()job.id idempotency pattern.