A durable task primitive built on BullMQ
Building reliable task systems with BullMQ requires juggling queues, workers, and events, turning simple functions into scattered configuration. DurableTask solves this by wrapping BullMQ primitives into a single abstraction that gives any function automatic archiving, retries, scheduling, and complete execution history.
I'm working on a project that requires a lot of different tasks to be executed reliably. Some tasks run on schedules, some tasks call other tasks. Most importantly: tasks should keep a good history trail, and when things fail the whole system must continue where it left off.
Queues are a natural choice with these requirements. I used RabbitMQ in the past, but I decided to explore BullMQ this time. It provides more than just a message queue. BullMQ builds on messages and queues to create concepts such as workers, jobs, and scheduling timers. There are some good UIs too, like QueueDash, which gives a complete overview of all the queues and jobs.
Here's a simple example showing Worker, Job, and Queue in action:
import { Worker, Job, Queue } from 'bullmq';
const queue = new Queue('Cars');
const job = await queue.add('paint', { color: 'red' });
const worker = new Worker('Cars', async (job: Job) => {
job.log("Applying paint");
await job.updateProgress(20);
job.log("Watching paint dry");
await job.updateProgress(99);
job.log(`The car is now ${job.data.color}!`);
return { newColor: job.data.color };
});
Now suppose I want to immediately get the result of a job I just dispatched. I have to introduce the QueueEvents class:
const queueEvents = new QueueEvents('Cars');
const job = await queue.add('paint', { color: 'red' });
const result = await job.waitUntilFinished(queueEvents, 3000); // 3 seconds
You can already see these are powerful tools which are well separated. This strong separation allows for great flexibility in the way you architect your application. Workers can pull from multiple queues. Multiple workers can pull from a single queue. Workers can push jobs into another queue, etc.
The pain points
However, there are quite a few references to juggle in the air (queue
, queueEvents
, job
, worker
) and your codebase may quickly get into a state of scattered configuration where queues live in one file, workers in another, and overall orchestration in yet another.
There's a lot of manual wiring involved and no simple way to just call a function and get a result. Execution history can be found on the queue, job results and logs can be found on the job.
So before you know it you have made spaghetti and your business data flow has become opaque through the magic of decoupling.
Intentional coupling
At least for my use case, I observed:
- A queue without a worker is useless and vice versa
- They often exist in a 1:1 relationship in practice
What if we can take all of these durable properties and magically bestow them upon a simple function?
One function = One queue + One worker, automatically configured and lifecycle-managed together.
The benefits
- Your business logic is just a function, no boilerplate.
- Every execution is recorded with inputs, outputs, logs, and errors.
- Time travel debugging: Query executions from days, weeks, or months ago.
- Zero configuration
- Built-in scheduling if needed
The DurableTask API
Here's the complete transformation. A simple car painting function, but with durability:
// tasks/paintCar.ts
import { DurableTask } from './durable-task.js';
export const paintCar = new DurableTask<
{ color: string },
{ newColor: string }
>(
'paint-car',
async (input, log) => {
const { color } = input;
log.info("Applying paint");
await job.updateProgress(20);
log.info("Watching paint dry");
await job.updateProgress(99);
return { newColor: color };
});
That's it. No manual queue creation. No worker registration. Just a function.
Three simple usage patterns
Fire-and-forget (async call)
Need to queue work without waiting? Just call it:
paintCar.startWorker();
// Queue the job and continue immediately
const jobId = await paintCar.call({ color: 'red' });
console.log(`Job queued: ${jobId}`);
Wait for result (sync call)
Need the result right away? Use callSync
:
const result = await paintCar.callSync(
{ color: 'red' },
undefined, // optional name for the job
120000 // 2 minute timeout
);
Schedule recurring execution
// Run every hour
await paintCar.schedule(
'0 * * * *',
{ color: 'blue' }
);
// Remove the schedule when done
await paintCar.unschedule();
Always available history
Every execution is automatically archived. Query it anytime:
// Get recent executions
const history = await paintCar.getHistory(50);
history.forEach(exec => {
console.log(`${exec.id}: ${exec.status}`);
console.log(` Input: ${JSON.stringify(exec.input)}`);
console.log(` Output: ${JSON.stringify(exec.output)}`);
console.log(` Duration: ${exec.duration}ms`);
console.log(` Logs: ${exec.logs.join('\n')}`);
});
// Get failed executions
const failed = await paintCar.getFailed();
console.log(`Found ${failed.length} failures`);
// Retry all failures
const retried = await paintCar.retryAllFailed();
console.log(`Retried ${retried} jobs`);
// Get queue statistics
const stats = await paintCar.getStats();
console.log(`Waiting: ${stats.waiting}, Active: ${stats.active}`);
Continue reading
Advice for self-taught learners entering formal education
On how autodidacts can navigate the transition to formal higher education without losing their motivation and curiosity
Building a custom telescope mount with harmonic drives and ESP32
How I went from buying a €200 tracker to building a custom telescope mount with harmonic drives, ESP32, and way more engineering than necessary