Skip to main content
The @workflow decorator defines long-running tasks, background jobs, and scheduled tasks. Built on OpenWorkflow, it provides durable execution, retries, and monitoring.

Basic Usage

import { workflow } from "sonamu";

export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
  },
  async ({ input, step, logger }) => {
    logger.info("Processing order", { orderId: input.orderId });

    // Step 1: Process payment
    const payment = await step
      .define({ name: "process-payment" }, async () => {
        return await PaymentService.charge(input.amount);
      })
      .run();

    // Step 2: Update inventory
    await step
      .define({ name: "update-inventory" }, async () => {
        return await InventoryModel.decrease(input.productId, input.quantity);
      })
      .run();

    // Step 3: Start shipping
    const shipping = await step
      .define({ name: "start-shipping" }, async () => {
        return await ShippingService.ship(input.address);
      })
      .run();

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      trackingNumber: shipping.trackingNumber,
    };
  },
);

Configuration (sonamu.config.ts)

Workflows require PostgreSQL configuration.
import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    dbConfig: {
      client: "pg",
      connection: {
        host: process.env.DB_HOST,
        port: 5432,
        database: process.env.DB_NAME,
        user: process.env.DB_USER,
        password: process.env.DB_PASSWORD,
      },
    },
    worker: {
      concurrency: 4, // Concurrent execution count
      usePubSub: true, // Use Pub/Sub
      listenDelay: 500, // Listen delay (ms)
    },
  },
});

Options

name

Specifies the workflow name. Default: Function name converted to snake_case
// Explicit name
export const myTask = workflow({ name: "process_order" }, async ({ input }) => {
  /* ... */
});

// Auto name (uses function name)
export const processOrder = workflow(
  {}, // name omitted → "process_order"
  async ({ input }) => {
    /* ... */
  },
);

version

Specifies the workflow version. Default: null
export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
  },
  async ({ input }) => {
    /* ... */
  },
);

// When updating version
export const processOrderV2 = workflow(
  {
    name: "process_order",
    version: "2.0",
  },
  async ({ input }) => {
    /* ... */
  },
);
You can run workflows with the same name but different versions in parallel.

schema

Defines a Zod schema for input data.
import { z } from "zod";

const OrderInputSchema = z.object({
  orderId: z.number(),
  productId: z.number(),
  quantity: z.number(),
  amount: z.number(),
  address: z.string(),
});

export const processOrder = workflow(
  {
    name: "process_order",
    schema: OrderInputSchema,
  },
  async ({ input }) => {
    // input is automatically type-validated
    input.orderId; // number
    input.address; // string
  },
);

schedules

Defines schedules using Cron expressions.
export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily-at-midnight",
        expression: "0 0 * * *", // Daily at midnight
        input: { date: new Date().toISOString().split("T")[0] },
      },
    ],
  },
  async ({ input, logger }) => {
    logger.info("Generating daily report", { date: input.date });
    // Generate report
  },
);

// Dynamic input
export const weeklyBackup = workflow(
  {
    name: "weekly_backup",
    schedules: [
      {
        name: "weekly-sunday",
        expression: "0 2 * * 0", // Sundays at 2am
        input: () => ({
          timestamp: new Date().toISOString(),
          backupPath: `/backups/${Date.now()}`,
        }),
      },
    ],
  },
  async ({ input }) => {
    // Perform backup
  },
);
Cron Expression Examples:
* * * * *    - Every minute
0 * * * *    - Every hour
0 0 * * *    - Daily at midnight
0 0 * * 0    - Sundays at midnight
0 0 1 * *    - 1st of month at midnight
0 9 * * 1-5  - Weekdays at 9am
*/5 * * * *  - Every 5 minutes

Workflow Function

The workflow function receives 4 parameters:
async function workflowFn({ input, step, logger, version }) {
  // input: Input data
  // step: Step API
  // logger: LogTape Logger
  // version: Workflow version
}

input

Input data passed when executing the workflow.
export const processOrder = workflow({ name: "process_order" }, async ({ input }) => {
  console.log("Order ID:", input.orderId);
  console.log("Amount:", input.amount);
});

// Execution
await WorkflowManager.run({ name: "process_order" }, { orderId: 123, amount: 50000 });

step

The Step API defines each stage of the workflow. Each step can be retried and recovered independently.
export const processOrder = workflow({ name: "process_order" }, async ({ step }) => {
  // Step 1
  const payment = await step
    .define({ name: "charge-payment" }, async () => {
      return await PaymentService.charge(100);
    })
    .run();

  // Step 2
  const inventory = await step
    .define({ name: "update-inventory" }, async () => {
      return await InventoryModel.decrease(1);
    })
    .run();

  // Step 3
  await step
    .define({ name: "send-email" }, async () => {
      return await EmailService.send({
        to: "user@example.com",
        subject: "Order confirmed",
      });
    })
    .run();

  return { success: true };
});
Step Advantages:
  • Each step runs only once (idempotent)
  • On failure, only that step is retried
  • Already completed steps are skipped

logger

LogTape Logger instance.
export const processData = workflow({ name: "process_data" }, async ({ logger }) => {
  logger.info("Starting workflow");
  logger.debug("Processing item", { itemId: 123 });
  logger.warn("High memory usage", { usage: "80%" });
  logger.error("Failed to process", { error: "Connection timeout" });
});

version

Current workflow version.
export const processOrder = workflow(
  {
    name: "process_order",
    version: "2.0",
  },
  async ({ version }) => {
    console.log("Running version:", version); // "2.0"
  },
);

Workflow Execution

Manual Execution

// Using WorkflowManager
import { Sonamu } from "sonamu";

const handle = await Sonamu.workflows.run(
  {
    name: "process_order",
    version: "1.0",
  },
  {
    orderId: 123,
    amount: 50000,
  },
);

// Wait for result
const result = await handle.result();
console.log("Result:", result);

Scheduled Execution

export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily-at-midnight",
        expression: "0 0 * * *",
        input: { date: new Date().toISOString().split("T")[0] },
      },
    ],
  },
  async ({ input, logger }) => {
    logger.info("Generating report", { date: input.date });
    // Automatically runs daily at midnight
  },
);

Step API Details

step.define().run()

Wraps a function as a step and executes it.
const result = await step
  .define({ name: "step-name" }, async () => {
    // Perform task
    return { data: "result" };
  })
  .run();
Usage Example:
export const processOrder = workflow({ name: "process_order" }, async ({ step }) => {
  // Process payment
  const payment = await step
    .define({ name: "process-payment" }, async () => {
      return await PaymentService.charge(100);
    })
    .run();

  // Update inventory
  await step
    .define({ name: "update-inventory" }, async () => {
      return await InventoryModel.decrease(1);
    })
    .run();

  return { paymentId: payment.id };
});

step.get().run()

Executes a Model method as a step.
// Explicit name
const result = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 10 });

// Name omitted (auto-uses method name: "list")
const result = await step.get(UserModel, "list").run({ limit: 10 });
Usage Example:
export const processUsers = workflow({ name: "process_users" }, async ({ step }) => {
  // Execute Model method as step
  const users = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 100 });

  // Process each user
  for (const user of users) {
    await step
      .define({ name: `process-user-${user.id}` }, async () => {
        return await UserModel.updateStatus(user.id, "processed");
      })
      .run();
  }

  return { processed: users.length };
});

step.sleep()

Waits for a specified duration.
await step.sleep("wait-1-hour", "1h");
await step.sleep("wait-30-minutes", "30m");
await step.sleep("wait-10-seconds", "10s");
Time Format: "10s", "5m", "1h", "1d" Usage Example:
export const delayedTask = workflow({ name: "delayed_task" }, async ({ step, logger }) => {
  logger.info("Starting task");

  // Wait 1 hour
  await step.sleep("wait-1-hour", "1h");

  logger.info("Continuing after 1 hour");

  // Wait 30 minutes
  await step.sleep("wait-30-minutes", "30m");

  logger.info("Task complete");
});

Retry Policy

Step retries are automatically handled by the OpenWorkflow backend. No separate retry options need to be configured.
export const resilientTask = workflow({ name: "resilient_task" }, async ({ step }) => {
  // Step is automatically retried on failure
  const result = await step
    .define({ name: "flaky-operation" }, async () => {
      const response = await fetch("https://api.example.com");
      return response.json();
    })
    .run();

  return result;
});

Database Access

You can freely use Sonamu Models within workflows.
export const processUsers = workflow({ name: "process_users" }, async ({ step, logger }) => {
  // Step 1: Fetch users
  const users = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 100 });

  logger.info("Found users", { count: users.length });

  // Step 2: Process each user
  for (const user of users) {
    await step
      .define({ name: `process-user-${user.id}` }, async () => {
        return await UserModel.processUser(user.id);
      })
      .run();
  }

  return { processed: users.length };
});

Error Handling

Automatic Retry

Steps are automatically retried on failure.
export const resilientTask = workflow({ name: "resilient_task" }, async ({ step }) => {
  const result = await step
    .define({ name: "flaky-operation" }, async () => {
      // Task that may fail
      const response = await fetch("https://api.example.com");
      return response.json();
    })
    .run();

  return result;
});

Manual Error Handling

export const handleErrors = workflow({ name: "handle_errors" }, async ({ step, logger }) => {
  try {
    await step
      .define({ name: "risky-operation" }, async () => {
        throw new Error("Something went wrong");
      })
      .run();
  } catch (error) {
    logger.error("Operation failed", { error });

    // Fallback logic
    await step
      .define({ name: "fallback" }, async () => {
        return await AlternativeService.process();
      })
      .run();
  }
});

Decorator Style (TypeScript Decorator)

You can also use decorator style instead of function style:
@workflow({
  name: "process_order",
  version: "1.0"
})
async function processOrder({ input, step, logger }) {
  logger.info("Processing order", { orderId: input.orderId });

  const payment = await step.define(
    { name: "charge" },
    async () => {
      return await PaymentService.charge(input.amount);
    }
  ).run();

  return { paymentId: payment.id };
}

Cautions

1. Idempotence

Steps must be idempotent. Running multiple times with the same input should guarantee the same result.
// ❌ Bad: Not idempotent
await step
  .define({ name: "increment-counter" }, async () => {
    counter++; // Duplicate increment on retry
  })
  .run();

// ✅ Good: Idempotent
await step
  .define({ name: "set-counter" }, async () => {
    return await CounterModel.set(10); // Always the same value
  })
  .run();

2. Split Long Tasks into Steps

// ❌ Bad: Giant step
await step
  .define({ name: "process-all" }, async () => {
    // Task taking 1 hour
    for (let i = 0; i < 1000000; i++) {
      await processItem(i);
    }
  })
  .run();

// ✅ Good: Small steps
for (let i = 0; i < 1000; i += 100) {
  await step
    .define({ name: `process-batch-${i}` }, async () => {
      const batch = items.slice(i, i + 100);
      await processBatch(batch);
    })
    .run();
}

3. Step Name Uniqueness

Step names must be unique within a workflow.
// ❌ Bad: Duplicate names
await step
  .define({ name: "process" }, async () => {
    /* ... */
  })
  .run();
await step
  .define({ name: "process" }, async () => {
    /* ... */
  })
  .run(); // Error!

// ✅ Good: Unique names
await step
  .define({ name: "process-payment" }, async () => {
    /* ... */
  })
  .run();
await step
  .define({ name: "process-shipping" }, async () => {
    /* ... */
  })
  .run();

Monitoring

Check Execution Status

const handle = await Sonamu.workflows.run({ name: "process_order" }, { orderId: 123 });

// Check status
console.log("Status:", handle.status);

// Wait for result (until complete)
const result = await handle.result();
console.log("Result:", result);

View Logs

Workflows automatically log through LogTape:
[INFO] [workflow:process_order] Processing order {"orderId":123}
[DEBUG] [workflow:process_order] Step: charge-payment
[INFO] [workflow:process_order] Payment successful {"paymentId":"pay_123"}

Example Collection

import { workflow } from "sonamu";
import { z } from "zod";

const OrderSchema = z.object({
  orderId: z.number(),
  userId: z.number(),
  items: z.array(z.object({
    productId: z.number(),
    quantity: z.number(),
    price: z.number()
  })),
  totalAmount: z.number(),
  shippingAddress: z.string()
});

export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
    schema: OrderSchema
  },
  async ({ input, step, logger }) => {
    logger.info("Processing order", { orderId: input.orderId });

    // Step 1: Check inventory
    const inventory = await step.define(
      { name: "check-inventory" },
      async () => {
        for (const item of input.items) {
          const available = await InventoryModel.check(
            item.productId,
            item.quantity
          );
          if (!available) {
            throw new Error(`Product ${item.productId} out of stock`);
          }
        }
        return { available: true };
      }
    ).run();

    // Step 2: Process payment
    const payment = await step.define(
      { name: "process-payment" },
      async () => {
        return await PaymentService.charge({
          userId: input.userId,
          amount: input.totalAmount,
          orderId: input.orderId
        });
      }
    ).run();

    logger.info("Payment successful", { paymentId: payment.id });

    // Step 3: Decrease inventory
    await step.define(
      { name: "decrease-inventory" },
      async () => {
        for (const item of input.items) {
          await InventoryModel.decrease(item.productId, item.quantity);
        }
      }
    ).run();

    // Step 4: Start shipping
    const shipping = await step.define(
      { name: "start-shipping" },
      async () => {
        return await ShippingService.createShipment({
          orderId: input.orderId,
          address: input.shippingAddress,
          items: input.items
        });
      }
    ).run();

    // Step 5: Send notifications
    await step.define(
      { name: "send-notifications" },
      async () => {
        await NotificationService.send({
          userId: input.userId,
          type: "order_confirmed",
          data: {
            orderId: input.orderId,
            trackingNumber: shipping.trackingNumber
          }
        });
      }
    ).run();

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      trackingNumber: shipping.trackingNumber,
      status: "completed"
    };
  }
);

Next Steps

@api

Create API endpoints

@transactional

Use transactions

Scheduling

Task scheduling guide

OpenWorkflow

OpenWorkflow documentation