Skip to main content
The @workflow decorator defines long-running tasks, background jobs, and scheduled tasks. Built on OpenWorkflow, it provides durable execution, retries, and monitoring.

Basic Usage

import { workflow } from "sonamu";

export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0"
  },
  async ({ input, step, logger }) => {
    logger.info("Processing order", { orderId: input.orderId });

    // Step 1: Process payment
    const payment = await step.define(
      { name: "process-payment" },
      async () => {
        return await PaymentService.charge(input.amount);
      }
    ).run();

    // Step 2: Update inventory
    await step.define(
      { name: "update-inventory" },
      async () => {
        return await InventoryModel.decrease(input.productId, input.quantity);
      }
    ).run();

    // Step 3: Start shipping
    const shipping = await step.define(
      { name: "start-shipping" },
      async () => {
        return await ShippingService.ship(input.address);
      }
    ).run();

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      trackingNumber: shipping.trackingNumber
    };
  }
);

Configuration (sonamu.config.ts)

Workflows require PostgreSQL configuration.
import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    dbConfig: {
      client: "pg",
      connection: {
        host: process.env.DB_HOST,
        port: 5432,
        database: process.env.DB_NAME,
        user: process.env.DB_USER,
        password: process.env.DB_PASSWORD
      }
    },
    worker: {
      concurrency: 4,      // Concurrent execution count
      usePubSub: true,     // Use Pub/Sub
      listenDelay: 500     // Listen delay (ms)
    }
  }
});

Options

name

Specifies the workflow name. Default: Function name converted to snake_case
// Explicit name
export const myTask = workflow(
  { name: "process_order" },
  async ({ input }) => { /* ... */ }
);

// Auto name (uses function name)
export const processOrder = workflow(
  {},  // name omitted → "process_order"
  async ({ input }) => { /* ... */ }
);

version

Specifies the workflow version. Default: null
export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0"
  },
  async ({ input }) => { /* ... */ }
);

// When updating version
export const processOrderV2 = workflow(
  {
    name: "process_order",
    version: "2.0"
  },
  async ({ input }) => { /* ... */ }
);
You can run workflows with the same name but different versions in parallel.

schema

Defines a Zod schema for input data.
import { z } from "zod";

const OrderInputSchema = z.object({
  orderId: z.number(),
  productId: z.number(),
  quantity: z.number(),
  amount: z.number(),
  address: z.string()
});

export const processOrder = workflow(
  {
    name: "process_order",
    schema: OrderInputSchema
  },
  async ({ input }) => {
    // input is automatically type-validated
    input.orderId;  // number
    input.address;  // string
  }
);

schedules

Defines schedules using Cron expressions.
export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily-at-midnight",
        expression: "0 0 * * *",  // Daily at midnight
        input: { date: new Date().toISOString().split('T')[0] }
      }
    ]
  },
  async ({ input, logger }) => {
    logger.info("Generating daily report", { date: input.date });
    // Generate report
  }
);

// Dynamic input
export const weeklyBackup = workflow(
  {
    name: "weekly_backup",
    schedules: [
      {
        name: "weekly-sunday",
        expression: "0 2 * * 0",  // Sundays at 2am
        input: () => ({
          timestamp: new Date().toISOString(),
          backupPath: `/backups/${Date.now()}`
        })
      }
    ]
  },
  async ({ input }) => {
    // Perform backup
  }
);
Cron Expression Examples:
* * * * *    - Every minute
0 * * * *    - Every hour
0 0 * * *    - Daily at midnight
0 0 * * 0    - Sundays at midnight
0 0 1 * *    - 1st of month at midnight
0 9 * * 1-5  - Weekdays at 9am
*/5 * * * *  - Every 5 minutes

Workflow Function

The workflow function receives 4 parameters:
async function workflowFn({ input, step, logger, version }) {
  // input: Input data
  // step: Step API
  // logger: LogTape Logger
  // version: Workflow version
}

input

Input data passed when executing the workflow.
export const processOrder = workflow(
  { name: "process_order" },
  async ({ input }) => {
    console.log("Order ID:", input.orderId);
    console.log("Amount:", input.amount);
  }
);

// Execution
await WorkflowManager.run(
  { name: "process_order" },
  { orderId: 123, amount: 50000 }
);

step

The Step API defines each stage of the workflow. Each step can be retried and recovered independently.
export const processOrder = workflow(
  { name: "process_order" },
  async ({ step }) => {
    // Step 1
    const payment = await step.define(
      { name: "charge-payment" },
      async () => {
        return await PaymentService.charge(100);
      }
    ).run();

    // Step 2
    const inventory = await step.define(
      { name: "update-inventory" },
      async () => {
        return await InventoryModel.decrease(1);
      }
    ).run();

    // Step 3
    await step.define(
      { name: "send-email" },
      async () => {
        return await EmailService.send({
          to: "user@example.com",
          subject: "Order confirmed"
        });
      }
    ).run();

    return { success: true };
  }
);
Step Advantages:
  • Each step runs only once (idempotent)
  • On failure, only that step is retried
  • Already completed steps are skipped

logger

LogTape Logger instance.
export const processData = workflow(
  { name: "process_data" },
  async ({ logger }) => {
    logger.info("Starting workflow");
    logger.debug("Processing item", { itemId: 123 });
    logger.warn("High memory usage", { usage: "80%" });
    logger.error("Failed to process", { error: "Connection timeout" });
  }
);

version

Current workflow version.
export const processOrder = workflow(
  {
    name: "process_order",
    version: "2.0"
  },
  async ({ version }) => {
    console.log("Running version:", version);  // "2.0"
  }
);

Workflow Execution

Manual Execution

// Using WorkflowManager
import { Sonamu } from "sonamu";

const handle = await Sonamu.workflowManager.run(
  {
    name: "process_order",
    version: "1.0"
  },
  {
    orderId: 123,
    amount: 50000
  }
);

// Wait for result
const result = await handle.result();
console.log("Result:", result);

Scheduled Execution

export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily-at-midnight",
        expression: "0 0 * * *",
        input: { date: new Date().toISOString().split('T')[0] }
      }
    ]
  },
  async ({ input, logger }) => {
    logger.info("Generating report", { date: input.date });
    // Automatically runs daily at midnight
  }
);

Step API Details

step.define().run()

Wraps a function as a step and executes it.
const result = await step.define(
  { name: "step-name" },
  async () => {
    // Perform task
    return { data: "result" };
  }
).run();
Usage Example:
export const processOrder = workflow(
  { name: "process_order" },
  async ({ step }) => {
    // Process payment
    const payment = await step.define(
      { name: "process-payment" },
      async () => {
        return await PaymentService.charge(100);
      }
    ).run();

    // Update inventory
    await step.define(
      { name: "update-inventory" },
      async () => {
        return await InventoryModel.decrease(1);
      }
    ).run();

    return { paymentId: payment.id };
  }
);

step.get().run()

Executes a Model method as a step.
// Explicit name
const result = await step.get(
  { name: "fetch-users" },
  UserModel,
  "list"
).run({ limit: 10 });

// Name omitted (auto-uses method name: "list")
const result = await step.get(
  UserModel,
  "list"
).run({ limit: 10 });
Usage Example:
export const processUsers = workflow(
  { name: "process_users" },
  async ({ step }) => {
    // Execute Model method as step
    const users = await step.get(
      { name: "fetch-users" },
      UserModel,
      "list"
    ).run({ limit: 100 });

    // Process each user
    for (const user of users) {
      await step.define(
        { name: `process-user-${user.id}` },
        async () => {
          return await UserModel.updateStatus(user.id, "processed");
        }
      ).run();
    }

    return { processed: users.length };
  }
);

step.sleep()

Waits for a specified duration.
await step.sleep("wait-1-hour", "1h");
await step.sleep("wait-30-minutes", "30m");
await step.sleep("wait-10-seconds", "10s");
Time Format: "10s", "5m", "1h", "1d" Usage Example:
export const delayedTask = workflow(
  { name: "delayed_task" },
  async ({ step, logger }) => {
    logger.info("Starting task");

    // Wait 1 hour
    await step.sleep("wait-1-hour", "1h");

    logger.info("Continuing after 1 hour");

    // Wait 30 minutes
    await step.sleep("wait-30-minutes", "30m");

    logger.info("Task complete");
  }
);

Retry Policy

Step retries are automatically handled by the OpenWorkflow backend. No separate retry options need to be configured.
export const resilientTask = workflow(
  { name: "resilient_task" },
  async ({ step }) => {
    // Step is automatically retried on failure
    const result = await step.define(
      { name: "flaky-operation" },
      async () => {
        const response = await fetch("https://api.example.com");
        return response.json();
      }
    ).run();

    return result;
  }
);

Database Access

You can freely use Sonamu Models within workflows.
export const processUsers = workflow(
  { name: "process_users" },
  async ({ step, logger }) => {
    // Step 1: Fetch users
    const users = await step.get(
      { name: "fetch-users" },
      UserModel,
      "list"
    ).run({ limit: 100 });

    logger.info("Found users", { count: users.length });

    // Step 2: Process each user
    for (const user of users) {
      await step.define(
        { name: `process-user-${user.id}` },
        async () => {
          return await UserModel.processUser(user.id);
        }
      ).run();
    }

    return { processed: users.length };
  }
);

Error Handling

Automatic Retry

Steps are automatically retried on failure.
export const resilientTask = workflow(
  { name: "resilient_task" },
  async ({ step }) => {
    const result = await step.define(
      { name: "flaky-operation" },
      async () => {
        // Task that may fail
        const response = await fetch("https://api.example.com");
        return response.json();
      }
    ).run();

    return result;
  }
);

Manual Error Handling

export const handleErrors = workflow(
  { name: "handle_errors" },
  async ({ step, logger }) => {
    try {
      await step.define(
        { name: "risky-operation" },
        async () => {
          throw new Error("Something went wrong");
        }
      ).run();
    } catch (error) {
      logger.error("Operation failed", { error });

      // Fallback logic
      await step.define(
        { name: "fallback" },
        async () => {
          return await AlternativeService.process();
        }
      ).run();
    }
  }
);

Decorator Style (TypeScript Decorator)

You can also use decorator style instead of function style:
@workflow({
  name: "process_order",
  version: "1.0"
})
async function processOrder({ input, step, logger }) {
  logger.info("Processing order", { orderId: input.orderId });

  const payment = await step.define(
    { name: "charge" },
    async () => {
      return await PaymentService.charge(input.amount);
    }
  ).run();

  return { paymentId: payment.id };
}

Cautions

1. Idempotence

Steps must be idempotent. Running multiple times with the same input should guarantee the same result.
// ❌ Bad: Not idempotent
await step.define(
  { name: "increment-counter" },
  async () => {
    counter++;  // Duplicate increment on retry
  }
).run();

// ✅ Good: Idempotent
await step.define(
  { name: "set-counter" },
  async () => {
    return await CounterModel.set(10);  // Always the same value
  }
).run();

2. Split Long Tasks into Steps

// ❌ Bad: Giant step
await step.define(
  { name: "process-all" },
  async () => {
    // Task taking 1 hour
    for (let i = 0; i < 1000000; i++) {
      await processItem(i);
    }
  }
).run();

// ✅ Good: Small steps
for (let i = 0; i < 1000; i += 100) {
  await step.define(
    { name: `process-batch-${i}` },
    async () => {
      const batch = items.slice(i, i + 100);
      await processBatch(batch);
    }
  ).run();
}

3. Step Name Uniqueness

Step names must be unique within a workflow.
// ❌ Bad: Duplicate names
await step.define({ name: "process" }, async () => { /* ... */ }).run();
await step.define({ name: "process" }, async () => { /* ... */ }).run();  // Error!

// ✅ Good: Unique names
await step.define({ name: "process-payment" }, async () => { /* ... */ }).run();
await step.define({ name: "process-shipping" }, async () => { /* ... */ }).run();

Monitoring

Check Execution Status

const handle = await Sonamu.workflowManager.run(
  { name: "process_order" },
  { orderId: 123 }
);

// Check status
console.log("Status:", handle.status);

// Wait for result (until complete)
const result = await handle.result();
console.log("Result:", result);

View Logs

Workflows automatically log through LogTape:
[INFO] [workflow:process_order] Processing order {"orderId":123}
[DEBUG] [workflow:process_order] Step: charge-payment
[INFO] [workflow:process_order] Payment successful {"paymentId":"pay_123"}

Example Collection

import { workflow } from "sonamu";
import { z } from "zod";

const OrderSchema = z.object({
  orderId: z.number(),
  userId: z.number(),
  items: z.array(z.object({
    productId: z.number(),
    quantity: z.number(),
    price: z.number()
  })),
  totalAmount: z.number(),
  shippingAddress: z.string()
});

export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
    schema: OrderSchema
  },
  async ({ input, step, logger }) => {
    logger.info("Processing order", { orderId: input.orderId });

    // Step 1: Check inventory
    const inventory = await step.define(
      { name: "check-inventory" },
      async () => {
        for (const item of input.items) {
          const available = await InventoryModel.check(
            item.productId,
            item.quantity
          );
          if (!available) {
            throw new Error(`Product ${item.productId} out of stock`);
          }
        }
        return { available: true };
      }
    ).run();

    // Step 2: Process payment
    const payment = await step.define(
      { name: "process-payment" },
      async () => {
        return await PaymentService.charge({
          userId: input.userId,
          amount: input.totalAmount,
          orderId: input.orderId
        });
      }
    ).run();

    logger.info("Payment successful", { paymentId: payment.id });

    // Step 3: Decrease inventory
    await step.define(
      { name: "decrease-inventory" },
      async () => {
        for (const item of input.items) {
          await InventoryModel.decrease(item.productId, item.quantity);
        }
      }
    ).run();

    // Step 4: Start shipping
    const shipping = await step.define(
      { name: "start-shipping" },
      async () => {
        return await ShippingService.createShipment({
          orderId: input.orderId,
          address: input.shippingAddress,
          items: input.items
        });
      }
    ).run();

    // Step 5: Send notifications
    await step.define(
      { name: "send-notifications" },
      async () => {
        await NotificationService.send({
          userId: input.userId,
          type: "order_confirmed",
          data: {
            orderId: input.orderId,
            trackingNumber: shipping.trackingNumber
          }
        });
      }
    ).run();

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      trackingNumber: shipping.trackingNumber,
      status: "completed"
    };
  }
);

Next Steps