Queues & job options
A Queue is a named producer. Creating one is local; add() and addBulk() send your jobs to NextMQ, which validates the options and runs them.
Creating a queue#
import { Queue } from '@nextmq/sdk'
type NotifyJob = { userId: string; channel: 'push' | 'sms' }
const notifications = new Queue<NotifyJob>('notifications')Adding jobs#
// Single job
await notifications.add('digest', { userId, channel: 'push' })
// With options
await notifications.add('digest', { userId, channel: 'push' }, {
delay: 5000,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
priority: 1,
})addBulk
await notifications.addBulk([
{ name: 'digest', data: { userId: 'u_1', channel: 'push' } },
{ name: 'digest', data: { userId: 'u_2', channel: 'sms' }, opts: { delay: 1000 } },
])Supported job options#
Pass any of these in the third argument to add(). Options that can't survive a remote queue are rejected with a clear error, never silently dropped.
| Option | Description |
|---|---|
delay | Milliseconds to wait before the job becomes eligible to run. |
attempts | Total times to try the job before it's marked failed. |
backoff | Retry delay strategy: { type: 'fixed' | 'exponential', delay }. |
priority | Lower number runs first. |
jobId | Custom job id (colon-free). Enables idempotent enqueues. |
lifo | Push to the front of the queue instead of the back. |
keepLogs | Max number of log lines to retain. |
sizeLimit | Reject the job if its serialized data exceeds this many bytes. |
removeOnComplete | Retention for completed jobs — see clamping below. |
removeOnFail | Retention for failed jobs — see clamping below. |
repeat | Limited repeatable config; prefer Job Schedulers. |
deduplication | Collapse duplicate work — see below. |
Retention is clamped#
Native BullMQ keeps completed and failed jobs forever by default. To keep your project's storage bounded, NextMQ clamps retention on every job:
- Unset or
false("keep forever") becomes a{ count, age }cap. - Explicit values are clamped to your project's caps.
keepLogsis capped the same way.
The applied values are visible in the job's serialized opts, so you can always see what was stored.
Deduplication#
Pass a deduplication id to collapse duplicate work within a TTL window — handy when a form can be double-submitted or a request gets retried.
await exports.add('csv', { reportId }, {
deduplication: { id: `csv-${reportId}`, ttl: 60_000 },
})Supported fields: id, ttl, extend, replace, and keepLastIfActive. The helpers getDeduplicationJobId and removeDeduplicationKey are available on the queue.
Queue-wide throughput controls#
These apply across every worker on the queue — distinct from per-worker concurrency and limiter. Useful for protecting a shared downstream like a third-party or AI API.
await ai.setGlobalConcurrency(50)
await ai.setGlobalRateLimit(100, 1000) // max, duration (ms)
await ai.getGlobalConcurrency() // number | null
await ai.getGlobalRateLimit() // { max, duration } | null
await ai.removeGlobalRateLimit()repeat options.