NextMQ

Workers & processing

A Worker declares the processor that runs your jobs. NextMQ runs the worker loop and calls your processor over a signed webhook — you just write the function.

Declaring a worker#

jobs/ai.ts
import { Worker } from '@nextmq/sdk'

export const summarizeWorker = new Worker('ai', async (job) => {
  await job.updateProgress(20)
  const summary = await llm.summarize(job.data.docId)
  return { summary }
}, {
  concurrency: 5,
  limiter: { max: 100, duration: 1000 }, // protect your model provider's quota
})
Note
One worker per queue, per project. A fresh Workeron cold start re-registers idempotently — you don't manage worker lifecycle by hand.

Mounting the route#

createNextMQHandler returns GET and POST handlers that verify signatures, dispatch jobs to the right worker, and serve a /health path. It must run on the Node.js runtime.

app/api/nextmq/[...path]/route.ts
import { createNextMQHandler } from '@nextmq/sdk/next'
import { summarizeWorker } from '@/jobs/ai'

export const runtime = 'nodejs'

export const { GET, POST } = createNextMQHandler({
  workers: [summarizeWorker],
})

Registration#

Registration publishes one deterministic webhook URL per queue. The handler registers its workers, but if producers can enqueue before the route is first hit, register explicitly from your bootstrap or producer path:

import { ensureWorkersRegistered, getWorkerRegistrationStatuses } from '@nextmq/sdk/next'
import { summarizeWorker } from '@/jobs/ai'

await ensureWorkersRegistered([summarizeWorker])

const statuses = await getWorkerRegistrationStatuses([summarizeWorker])

Liveness is inferred from webhook delivery, not an in-process heartbeat. A registered URL, secret, or worker option can't change silently — change it and the SDK throws WorkerRegistrationError until the old worker is unregistered.

Tip
Schedule a cron that pings the handler's /health path to re-assert registration for queues that go idle. See Deploying to Vercel.

Worker options#

NextMQ runs the real BullMQ worker, so it owns and enforces these:

OptionDescription
concurrencyMax jobs this worker processes in parallel.
limiterRate limit: { max, duration } per window.
timeoutMsMax time to wait for your webhook to respond before dispatch counts as failed.
lockDurationHow long a job's lock is held while processing.
lockRenewTimeHow often the lock is renewed.
stalledIntervalHow often stalled jobs are checked.
maxStalledCountTimes a job can stall before being failed.
drainDelayDelay used when the queue drains.

Signalling job outcome#

NextMQ maps your processor's behavior to job state:

  • Return a value → the job is completed with that value.
  • Throw → the job failed; retries and backoff apply.
  • Throw UnrecoverableJobError → fail immediately, skipping remaining attempts.
  • Throw RateLimitError → re-queue as rate-limited instead of failing.
import { Worker, UnrecoverableJobError, RateLimitError } from '@nextmq/sdk'

export const worker = new Worker('ai', async (job) => {
  if (!job.data.docId) throw new UnrecoverableJobError('missing docId')

  const res = await llm.summarize(job.data.docId)
  if (res.status === 429) throw new RateLimitError(res.retryAfterMs)

  return { summary: res.summary }
})

Pause & resume

Pause or resume processing for a queue (admin scope):

await ai.pause()
await ai.resume()
Heads up
Webhook delivery is at-least-once, so processors must be idempotent. See How it works for the delivery guarantees and the job.id idempotency pattern.