Skip to main content
Workflow is a system for executing background tasks. Use it for email sending, data processing, scheduled jobs, and more.

Tasks Configuration

// sonamu.config.ts
import { defineConfig } from "sonamu";

export default defineConfig({
  // ...
  tasks: {
    enableWorker: true,  // Enable Worker
    workerOptions: {
      concurrency: 4,      // Concurrent execution count
      usePubSub: true,     // Use Pub/Sub
      listenDelay: 500,    // Listen delay (ms)
    },
    contextProvider: async (baseContext) => {
      // Configure Context used in Workflows
      return {
        ...baseContext,
        // Custom data can be added
      };
    },
  },
});
Options:
  • enableWorker: Whether to enable Worker (default: true only in daemon mode)
  • workerOptions.concurrency: Number of concurrent tasks (default: CPU cores - 1)
  • workerOptions.usePubSub: Use PostgreSQL LISTEN/NOTIFY (default: true)
  • workerOptions.listenDelay: Wait time after receiving Pub/Sub message (default: 500ms)
  • contextProvider: Function to create Context used within Workflows

WorkflowManager

Sonamu automatically creates and manages the WorkflowManager. Access it via Sonamu.workflows.
import { Sonamu } from "sonamu";

// Run Workflow
const workflowManager = Sonamu.workflows;
const handle = await workflowManager.run(
  {
    name: "send_email",
    version: null,
  },
  { email: "user@example.com" }
);

// Wait for result
const result = await handle.result();

Worker Modes

1. Daemon Mode (Auto-enabled)

# Worker starts automatically
pnpm sonamu daemon
In daemon mode, enableWorker: true is the default.

2. Separate Worker Process

// src/worker.ts
import { Sonamu } from "sonamu";

async function startWorker() {
  await Sonamu.init(true, false);

  await Sonamu.workflows.startWorker();

  console.log("Worker started");

  // Graceful shutdown
  const shutdown = async () => {
    console.log("Stopping worker...");
    await Sonamu.workflows.stopWorker();
    process.exit(0);
  };

  process.on("SIGTERM", shutdown);
  process.on("SIGINT", shutdown);
}

startWorker();
// package.json
{
  "scripts": {
    "worker": "tsx src/worker.ts",
    "worker:dev": "tsx watch src/worker.ts"
  }
}
# Run Worker
pnpm worker

3. API Server + Worker

// sonamu.config.ts
export default defineConfig({
  tasks: {
    enableWorker: true,  // Run Worker in API server too
    workerOptions: {
      concurrency: 2,  // Lower concurrency for API server
    },
    contextProvider: async (baseContext) => baseContext,
  },
});

Distributed Workers

You can run Workers on multiple servers.
// Server 1, 2, 3...
tasks: {
  enableWorker: true,
  workerOptions: {
    concurrency: 4,
    usePubSub: true,
  },
}
PostgreSQL acts as the task queue, and Workers communicate via Pub/Sub.

Context Provider

Customize the Context used in Workflows.
// sonamu.config.ts
tasks: {
  enableWorker: true,
  contextProvider: async (baseContext) => {
    // Fetch system user
    const systemUser = await UserModel.findOne({
      wq: [['email', '=', 'system@example.com']],
    });

    return {
      ...baseContext,
      user: systemUser,
      customData: "...",
    };
  },
}
Access via Sonamu.getContext() within a Workflow function:
import { workflow } from "sonamu";
import { Sonamu } from "sonamu";

export const myWorkflow = workflow(
  { name: "my_workflow" },
  async ({ input, step }) => {
    const context = Sonamu.getContext();
    console.log(context.user);  // systemUser
  }
);

Table Structure

Sonamu automatically creates the following tables:
-- Workflow definitions
CREATE TABLE workflows (
  id UUID PRIMARY KEY,
  name TEXT NOT NULL,
  version TEXT,
  ...
);

-- Workflow execution records
CREATE TABLE workflow_runs (
  id UUID PRIMARY KEY,
  workflow_id UUID REFERENCES workflows(id),
  status TEXT,
  input JSONB,
  output JSONB,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  ...
);

-- Step execution records
CREATE TABLE workflow_steps (
  id UUID PRIMARY KEY,
  workflow_run_id UUID REFERENCES workflow_runs(id),
  name TEXT,
  status TEXT,
  ...
);

Monitoring

Check Running Workflows

// Direct DB query
import { DB } from "sonamu";

const knex = DB.getDB();
const runs = await knex('workflow_runs')
  .where('status', 'running')
  .select('*');

Logging

export const myWorkflow = workflow(
  { name: "my_workflow" },
  async ({ logger, input }) => {
    logger.info("Workflow started", { input });
    logger.error("Error occurred", { error });
  }
);

Next Steps