Skip to main content
Sonamu provides a Workflow system for handling asynchronous background jobs. You can separate long-running tasks to the background to improve API response times and distribute work across Worker processes.

Basic Structure

import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 1,
      usePubSub: true,
      listenDelay: 500,
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "127.0.0.1",
      session: {},
    }),
  },
  // ...
});

enableWorker

Determines whether to enable the Worker process. Type: boolean (optional) Default: false
export default defineConfig({
  tasks: {
    enableWorker: true,  // Enable Worker
  },
});
What is a Worker?
  • Processes tasks in a separate process
  • No impact on main API server
  • Can run multiple Workers for distributed processing
In production environments, we recommend setting enableWorker: true and running a dedicated Worker process.

Control via Environment Variable

export default defineConfig({
  tasks: {
    enableWorker: !["true", "1"].includes(
      process.env.DISABLE_WORKER ?? "false"
    ),
  },
});
.env:
# Disable Worker (development)
DISABLE_WORKER=true

# Enable Worker (production)
DISABLE_WORKER=false

workerOptions

Configures how the Worker operates. Type: WorkflowOptions (optional)
type WorkflowOptions = {
  concurrency?: number;
  usePubSub?: boolean;
  listenDelay?: number;
};

concurrency

The number of tasks the Worker will process simultaneously. Type: number Default: 1
export default defineConfig({
  tasks: {
    workerOptions: {
      concurrency: 1,  // Process 1 task at a time
    },
  },
});
Recommended values:
  • 1 - Simple tasks, sequential processing required
  • 2-5 - General background tasks
  • 10+ - Lightweight I/O-bound tasks
Higher concurrency processes more tasks simultaneously but increases CPU and memory usage.

usePubSub

Uses Pub/Sub for distributing work among multiple Workers. Type: boolean Default: false
export default defineConfig({
  tasks: {
    workerOptions: {
      usePubSub: true,  // Use Redis Pub/Sub
    },
  },
});
Behavior:
  • false: Each Worker polls independently
  • true: Task notifications propagated via Redis Pub/Sub
When running multiple Workers, we recommend usePubSub: true for more efficient task distribution.

listenDelay

The interval (in milliseconds) for checking tasks. Type: number (ms) Default: 500 (0.5 seconds)
export default defineConfig({
  tasks: {
    workerOptions: {
      listenDelay: 500,  // Check every 0.5 seconds
    },
  },
});
Recommended values:
  • 100-500ms - Tasks requiring fast response
  • 500-1000ms - General background tasks
  • 1000-5000ms - Non-urgent tasks

contextProvider

Creates the Context for tasks running in the Worker. Type: (defaultContext) => Context | Promise<Context>
export default defineConfig({
  tasks: {
    contextProvider: (defaultContext) => {
      return {
        ...defaultContext,
        ip: "127.0.0.1",
        session: {},
      };
    },
  },
});
defaultContext includes:
  • reply - Fastify reply object (null in worker)
  • request - Fastify request object (null in worker)
  • headers - Request headers
  • createSSE - SSE stream creator
  • naiteStore - Naite logging store

Custom Context

export default defineConfig({
  tasks: {
    contextProvider: (defaultContext) => {
      return {
        ...defaultContext,
        ip: "worker",
        session: {},
        config: {
          apiUrl: process.env.API_URL,
        },
        logger: console,
      };
    },
  },
});
In Worker Context, request and reply are null. HTTP-related features cannot be used.

Basic Examples

Single Worker (Development)

import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    enableWorker: process.env.NODE_ENV === "production",
    workerOptions: {
      concurrency: 1,
      usePubSub: false,  // Single Worker
      listenDelay: 500,
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "127.0.0.1",
      session: {},
    }),
  },
});

Multiple Workers (Production)

import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 5,
      usePubSub: true,   // Distribute via Redis Pub/Sub
      listenDelay: 500,
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "worker",
      session: {},
    }),
  },
});

Practical Examples

Environment-based Configuration

import { defineConfig } from "sonamu";

const isDev = process.env.NODE_ENV === "development";
const isProd = process.env.NODE_ENV === "production";

export default defineConfig({
  tasks: {
    // Development: No Worker (process in main)
    // Production: Enable Worker
    enableWorker: isProd,

    workerOptions: {
      concurrency: isProd ? 5 : 1,
      usePubSub: isProd,
      listenDelay: isDev ? 100 : 500,  // Development: fast response
    },

    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: isProd ? "worker" : "127.0.0.1",
      session: {},
    }),
  },
});

High-Load Workflow

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 10,   // 10 concurrent tasks
      usePubSub: true,   // Multiple Worker support
      listenDelay: 1000, // 1 second interval
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "worker",
      session: {},
      logger: {
        info: (msg) => console.log(`[Worker] ${msg}`),
        error: (msg) => console.error(`[Worker Error] ${msg}`),
      },
    }),
  },
});

Custom Context

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 3,
      usePubSub: true,
      listenDelay: 500,
    },
    contextProvider: async (defaultContext) => {
      // Async initialization
      const config = await loadConfig();

      return {
        ...defaultContext,
        ip: "worker",
        session: {},
        config,
        services: {
          email: new EmailService(),
          notification: new NotificationService(),
        },
      };
    },
  },
});

Using Workflows

After tasks configuration, define background jobs with the @workflow decorator.
import { workflow, step } from "sonamu";

export class EmailModel {
  @workflow()
  static async sendWelcomeEmail(userId: number) {
    const user = await step("fetch-user", async () => {
      return UserModel.findById(userId);
    });

    await step("send-email", async () => {
      return sendEmail(user.email, "Welcome!");
    });
  }
}
Calling:
// Runs in background
await EmailModel.sendWelcomeEmail(123);
Workflow Usage

Running Workers

Development Environment

In development, keep enableWorker: false and process in main:
pnpm dev

Production Environment

Run a dedicated Worker process:
# API server
pnpm start

# Worker process (separate terminal)
pnpm start:worker
package.json:
{
  "scripts": {
    "start": "node dist/server.js",
    "start:worker": "ENABLE_WORKER=true node dist/worker.js"
  }
}

Multiple Workers

Run multiple Workers to increase throughput:
# Worker 1
ENABLE_WORKER=true node dist/worker.js

# Worker 2
ENABLE_WORKER=true node dist/worker.js

# Worker 3
ENABLE_WORKER=true node dist/worker.js
Using PM2 or Docker makes it easy to manage multiple Workers.

Redis Setup (usePubSub: true)

Redis is required to use usePubSub: true.

Redis Connection

Redis is configured in the cache settings:
import { createClient } from "redis";

const redisConnection = createClient({
  url: process.env.REDIS_URL ?? "redis://localhost:6379",
});

await redisConnection.connect();

export default defineConfig({
  server: {
    cache: {
      default: "main",
      stores: {
        main: store()
          .useL2Layer(drivers.redis({ connection: redisConnection }))
          .useBus(drivers.redisBus({ connection: redisConnection })),
      },
    },
  },

  tasks: {
    enableWorker: true,
    workerOptions: {
      usePubSub: true,  // Use Redis Pub/Sub
    },
  },
});

Cautions

1. Worker Context Limitations

// ❌ Bad: Trying HTTP response in Worker
@workflow()
static async processData() {
  const ctx = getContext();
  ctx.reply.send({ done: true });  // null in Worker!
}

// ✅ Good: Save results to DB
@workflow()
static async processData() {
  const result = await heavyProcessing();
  await ResultModel.save({ data: result });
}

2. enableWorker Setting

// ❌ Bad: No Worker in production
enableWorker: false  // All tasks run in main process!

// ✅ Good: Different per environment
enableWorker: process.env.NODE_ENV === "production"

3. Excessive concurrency

// ❌ Bad: Too high concurrency
workerOptions: {
  concurrency: 100,  // Risk of CPU and memory shortage
}

// ✅ Good: Appropriate level
workerOptions: {
  concurrency: 5,  // Match server resources
}

Next Steps

After completing Task configuration: