Sonamu provides a Workflow system for handling asynchronous background jobs. You can separate long-running tasks to the background to improve API response times and distribute work across Worker processes.
Basic Structure
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 1,
usePubSub: true,
listenDelay: 500,
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "127.0.0.1",
session: {},
}),
},
// ...
});
enableWorker
Determines whether to enable the Worker process.
Type: boolean (optional)
Default: false
export default defineConfig({
tasks: {
enableWorker: true, // Enable Worker
},
});
What is a Worker?
- Processes tasks in a separate process
- No impact on main API server
- Can run multiple Workers for distributed processing
In production environments, we recommend setting enableWorker: true and running a dedicated Worker process.
Control via Environment Variable
export default defineConfig({
tasks: {
enableWorker: !["true", "1"].includes(
process.env.DISABLE_WORKER ?? "false"
),
},
});
.env:
# Disable Worker (development)
DISABLE_WORKER=true
# Enable Worker (production)
DISABLE_WORKER=false
workerOptions
Configures how the Worker operates.
Type: WorkflowOptions (optional)
type WorkflowOptions = {
concurrency?: number;
usePubSub?: boolean;
listenDelay?: number;
};
concurrency
The number of tasks the Worker will process simultaneously.
Type: number
Default: 1
export default defineConfig({
tasks: {
workerOptions: {
concurrency: 1, // Process 1 task at a time
},
},
});
Recommended values:
1 - Simple tasks, sequential processing required
2-5 - General background tasks
10+ - Lightweight I/O-bound tasks
Higher concurrency processes more tasks simultaneously but increases CPU and memory usage.
usePubSub
Uses Pub/Sub for distributing work among multiple Workers.
Type: boolean
Default: false
export default defineConfig({
tasks: {
workerOptions: {
usePubSub: true, // Use Redis Pub/Sub
},
},
});
Behavior:
false: Each Worker polls independently
true: Task notifications propagated via Redis Pub/Sub
When running multiple Workers, we recommend usePubSub: true for more efficient task distribution.
listenDelay
The interval (in milliseconds) for checking tasks.
Type: number (ms)
Default: 500 (0.5 seconds)
export default defineConfig({
tasks: {
workerOptions: {
listenDelay: 500, // Check every 0.5 seconds
},
},
});
Recommended values:
100-500ms - Tasks requiring fast response
500-1000ms - General background tasks
1000-5000ms - Non-urgent tasks
contextProvider
Creates the Context for tasks running in the Worker.
Type: (defaultContext) => Context | Promise<Context>
export default defineConfig({
tasks: {
contextProvider: (defaultContext) => {
return {
...defaultContext,
ip: "127.0.0.1",
session: {},
};
},
},
});
defaultContext includes:
reply - Fastify reply object (null in worker)
request - Fastify request object (null in worker)
headers - Request headers
createSSE - SSE stream creator
naiteStore - Naite logging store
Custom Context
export default defineConfig({
tasks: {
contextProvider: (defaultContext) => {
return {
...defaultContext,
ip: "worker",
session: {},
config: {
apiUrl: process.env.API_URL,
},
logger: console,
};
},
},
});
In Worker Context, request and reply are null. HTTP-related features cannot be used.
Basic Examples
Single Worker (Development)
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
enableWorker: process.env.NODE_ENV === "production",
workerOptions: {
concurrency: 1,
usePubSub: false, // Single Worker
listenDelay: 500,
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "127.0.0.1",
session: {},
}),
},
});
Multiple Workers (Production)
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 5,
usePubSub: true, // Distribute via Redis Pub/Sub
listenDelay: 500,
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "worker",
session: {},
}),
},
});
Practical Examples
Environment-based Configuration
import { defineConfig } from "sonamu";
const isDev = process.env.NODE_ENV === "development";
const isProd = process.env.NODE_ENV === "production";
export default defineConfig({
tasks: {
// Development: No Worker (process in main)
// Production: Enable Worker
enableWorker: isProd,
workerOptions: {
concurrency: isProd ? 5 : 1,
usePubSub: isProd,
listenDelay: isDev ? 100 : 500, // Development: fast response
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: isProd ? "worker" : "127.0.0.1",
session: {},
}),
},
});
High-Load Workflow
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 10, // 10 concurrent tasks
usePubSub: true, // Multiple Worker support
listenDelay: 1000, // 1 second interval
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "worker",
session: {},
logger: {
info: (msg) => console.log(`[Worker] ${msg}`),
error: (msg) => console.error(`[Worker Error] ${msg}`),
},
}),
},
});
Custom Context
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 3,
usePubSub: true,
listenDelay: 500,
},
contextProvider: async (defaultContext) => {
// Async initialization
const config = await loadConfig();
return {
...defaultContext,
ip: "worker",
session: {},
config,
services: {
email: new EmailService(),
notification: new NotificationService(),
},
};
},
},
});
Using Workflows
After tasks configuration, define background jobs with the @workflow decorator.
import { workflow, step } from "sonamu";
export class EmailModel {
@workflow()
static async sendWelcomeEmail(userId: number) {
const user = await step("fetch-user", async () => {
return UserModel.findById(userId);
});
await step("send-email", async () => {
return sendEmail(user.email, "Welcome!");
});
}
}
Calling:
// Runs in background
await EmailModel.sendWelcomeEmail(123);
→ Workflow Usage
Running Workers
Development Environment
In development, keep enableWorker: false and process in main:
Production Environment
Run a dedicated Worker process:
# API server
pnpm start
# Worker process (separate terminal)
pnpm start:worker
package.json:
{
"scripts": {
"start": "node dist/server.js",
"start:worker": "ENABLE_WORKER=true node dist/worker.js"
}
}
Multiple Workers
Run multiple Workers to increase throughput:
# Worker 1
ENABLE_WORKER=true node dist/worker.js
# Worker 2
ENABLE_WORKER=true node dist/worker.js
# Worker 3
ENABLE_WORKER=true node dist/worker.js
Using PM2 or Docker makes it easy to manage multiple Workers.
Redis Setup (usePubSub: true)
Redis is required to use usePubSub: true.
Redis Connection
Redis is configured in the cache settings:
import { createClient } from "redis";
const redisConnection = createClient({
url: process.env.REDIS_URL ?? "redis://localhost:6379",
});
await redisConnection.connect();
export default defineConfig({
server: {
cache: {
default: "main",
stores: {
main: store()
.useL2Layer(drivers.redis({ connection: redisConnection }))
.useBus(drivers.redisBus({ connection: redisConnection })),
},
},
},
tasks: {
enableWorker: true,
workerOptions: {
usePubSub: true, // Use Redis Pub/Sub
},
},
});
Cautions
1. Worker Context Limitations
// ❌ Bad: Trying HTTP response in Worker
@workflow()
static async processData() {
const ctx = getContext();
ctx.reply.send({ done: true }); // null in Worker!
}
// ✅ Good: Save results to DB
@workflow()
static async processData() {
const result = await heavyProcessing();
await ResultModel.save({ data: result });
}
2. enableWorker Setting
// ❌ Bad: No Worker in production
enableWorker: false // All tasks run in main process!
// ✅ Good: Different per environment
enableWorker: process.env.NODE_ENV === "production"
3. Excessive concurrency
// ❌ Bad: Too high concurrency
workerOptions: {
concurrency: 100, // Risk of CPU and memory shortage
}
// ✅ Good: Appropriate level
workerOptions: {
concurrency: 5, // Match server resources
}
Next Steps
After completing Task configuration: