@workflow decorator defines long-running tasks, background jobs, and scheduled tasks. Built on OpenWorkflow, it provides durable execution, retries, and monitoring.
Basic Usage
Copy
import { workflow } from "sonamu";
export const processOrder = workflow(
{
name: "process_order",
version: "1.0"
},
async ({ input, step, logger }) => {
logger.info("Processing order", { orderId: input.orderId });
// Step 1: Process payment
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge(input.amount);
}
).run();
// Step 2: Update inventory
await step.define(
{ name: "update-inventory" },
async () => {
return await InventoryModel.decrease(input.productId, input.quantity);
}
).run();
// Step 3: Start shipping
const shipping = await step.define(
{ name: "start-shipping" },
async () => {
return await ShippingService.ship(input.address);
}
).run();
return {
orderId: input.orderId,
paymentId: payment.id,
trackingNumber: shipping.trackingNumber
};
}
);
Configuration (sonamu.config.ts)
Workflows require PostgreSQL configuration.Copy
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
dbConfig: {
client: "pg",
connection: {
host: process.env.DB_HOST,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD
}
},
worker: {
concurrency: 4, // Concurrent execution count
usePubSub: true, // Use Pub/Sub
listenDelay: 500 // Listen delay (ms)
}
}
});
Options
name
Specifies the workflow name. Default: Function name converted to snake_caseCopy
// Explicit name
export const myTask = workflow(
{ name: "process_order" },
async ({ input }) => { /* ... */ }
);
// Auto name (uses function name)
export const processOrder = workflow(
{}, // name omitted → "process_order"
async ({ input }) => { /* ... */ }
);
version
Specifies the workflow version. Default:null
Copy
export const processOrder = workflow(
{
name: "process_order",
version: "1.0"
},
async ({ input }) => { /* ... */ }
);
// When updating version
export const processOrderV2 = workflow(
{
name: "process_order",
version: "2.0"
},
async ({ input }) => { /* ... */ }
);
You can run workflows with the same name but different versions in parallel.
schema
Defines a Zod schema for input data.Copy
import { z } from "zod";
const OrderInputSchema = z.object({
orderId: z.number(),
productId: z.number(),
quantity: z.number(),
amount: z.number(),
address: z.string()
});
export const processOrder = workflow(
{
name: "process_order",
schema: OrderInputSchema
},
async ({ input }) => {
// input is automatically type-validated
input.orderId; // number
input.address; // string
}
);
schedules
Defines schedules using Cron expressions.Copy
export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily-at-midnight",
expression: "0 0 * * *", // Daily at midnight
input: { date: new Date().toISOString().split('T')[0] }
}
]
},
async ({ input, logger }) => {
logger.info("Generating daily report", { date: input.date });
// Generate report
}
);
// Dynamic input
export const weeklyBackup = workflow(
{
name: "weekly_backup",
schedules: [
{
name: "weekly-sunday",
expression: "0 2 * * 0", // Sundays at 2am
input: () => ({
timestamp: new Date().toISOString(),
backupPath: `/backups/${Date.now()}`
})
}
]
},
async ({ input }) => {
// Perform backup
}
);
Copy
* * * * * - Every minute
0 * * * * - Every hour
0 0 * * * - Daily at midnight
0 0 * * 0 - Sundays at midnight
0 0 1 * * - 1st of month at midnight
0 9 * * 1-5 - Weekdays at 9am
*/5 * * * * - Every 5 minutes
Workflow Function
The workflow function receives 4 parameters:Copy
async function workflowFn({ input, step, logger, version }) {
// input: Input data
// step: Step API
// logger: LogTape Logger
// version: Workflow version
}
input
Input data passed when executing the workflow.Copy
export const processOrder = workflow(
{ name: "process_order" },
async ({ input }) => {
console.log("Order ID:", input.orderId);
console.log("Amount:", input.amount);
}
);
// Execution
await WorkflowManager.run(
{ name: "process_order" },
{ orderId: 123, amount: 50000 }
);
step
The Step API defines each stage of the workflow. Each step can be retried and recovered independently.Copy
export const processOrder = workflow(
{ name: "process_order" },
async ({ step }) => {
// Step 1
const payment = await step.define(
{ name: "charge-payment" },
async () => {
return await PaymentService.charge(100);
}
).run();
// Step 2
const inventory = await step.define(
{ name: "update-inventory" },
async () => {
return await InventoryModel.decrease(1);
}
).run();
// Step 3
await step.define(
{ name: "send-email" },
async () => {
return await EmailService.send({
to: "user@example.com",
subject: "Order confirmed"
});
}
).run();
return { success: true };
}
);
- Each step runs only once (idempotent)
- On failure, only that step is retried
- Already completed steps are skipped
logger
LogTape Logger instance.Copy
export const processData = workflow(
{ name: "process_data" },
async ({ logger }) => {
logger.info("Starting workflow");
logger.debug("Processing item", { itemId: 123 });
logger.warn("High memory usage", { usage: "80%" });
logger.error("Failed to process", { error: "Connection timeout" });
}
);
version
Current workflow version.Copy
export const processOrder = workflow(
{
name: "process_order",
version: "2.0"
},
async ({ version }) => {
console.log("Running version:", version); // "2.0"
}
);
Workflow Execution
Manual Execution
Copy
// Using WorkflowManager
import { Sonamu } from "sonamu";
const handle = await Sonamu.workflowManager.run(
{
name: "process_order",
version: "1.0"
},
{
orderId: 123,
amount: 50000
}
);
// Wait for result
const result = await handle.result();
console.log("Result:", result);
Scheduled Execution
Copy
export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily-at-midnight",
expression: "0 0 * * *",
input: { date: new Date().toISOString().split('T')[0] }
}
]
},
async ({ input, logger }) => {
logger.info("Generating report", { date: input.date });
// Automatically runs daily at midnight
}
);
Step API Details
step.define().run()
Wraps a function as a step and executes it.Copy
const result = await step.define(
{ name: "step-name" },
async () => {
// Perform task
return { data: "result" };
}
).run();
Copy
export const processOrder = workflow(
{ name: "process_order" },
async ({ step }) => {
// Process payment
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge(100);
}
).run();
// Update inventory
await step.define(
{ name: "update-inventory" },
async () => {
return await InventoryModel.decrease(1);
}
).run();
return { paymentId: payment.id };
}
);
step.get().run()
Executes a Model method as a step.Copy
// Explicit name
const result = await step.get(
{ name: "fetch-users" },
UserModel,
"list"
).run({ limit: 10 });
// Name omitted (auto-uses method name: "list")
const result = await step.get(
UserModel,
"list"
).run({ limit: 10 });
Copy
export const processUsers = workflow(
{ name: "process_users" },
async ({ step }) => {
// Execute Model method as step
const users = await step.get(
{ name: "fetch-users" },
UserModel,
"list"
).run({ limit: 100 });
// Process each user
for (const user of users) {
await step.define(
{ name: `process-user-${user.id}` },
async () => {
return await UserModel.updateStatus(user.id, "processed");
}
).run();
}
return { processed: users.length };
}
);
step.sleep()
Waits for a specified duration.Copy
await step.sleep("wait-1-hour", "1h");
await step.sleep("wait-30-minutes", "30m");
await step.sleep("wait-10-seconds", "10s");
"10s", "5m", "1h", "1d"
Usage Example:
Copy
export const delayedTask = workflow(
{ name: "delayed_task" },
async ({ step, logger }) => {
logger.info("Starting task");
// Wait 1 hour
await step.sleep("wait-1-hour", "1h");
logger.info("Continuing after 1 hour");
// Wait 30 minutes
await step.sleep("wait-30-minutes", "30m");
logger.info("Task complete");
}
);
Retry Policy
Step retries are automatically handled by the OpenWorkflow backend. No separate retry options need to be configured.
Copy
export const resilientTask = workflow(
{ name: "resilient_task" },
async ({ step }) => {
// Step is automatically retried on failure
const result = await step.define(
{ name: "flaky-operation" },
async () => {
const response = await fetch("https://api.example.com");
return response.json();
}
).run();
return result;
}
);
Database Access
You can freely use Sonamu Models within workflows.Copy
export const processUsers = workflow(
{ name: "process_users" },
async ({ step, logger }) => {
// Step 1: Fetch users
const users = await step.get(
{ name: "fetch-users" },
UserModel,
"list"
).run({ limit: 100 });
logger.info("Found users", { count: users.length });
// Step 2: Process each user
for (const user of users) {
await step.define(
{ name: `process-user-${user.id}` },
async () => {
return await UserModel.processUser(user.id);
}
).run();
}
return { processed: users.length };
}
);
Error Handling
Automatic Retry
Steps are automatically retried on failure.Copy
export const resilientTask = workflow(
{ name: "resilient_task" },
async ({ step }) => {
const result = await step.define(
{ name: "flaky-operation" },
async () => {
// Task that may fail
const response = await fetch("https://api.example.com");
return response.json();
}
).run();
return result;
}
);
Manual Error Handling
Copy
export const handleErrors = workflow(
{ name: "handle_errors" },
async ({ step, logger }) => {
try {
await step.define(
{ name: "risky-operation" },
async () => {
throw new Error("Something went wrong");
}
).run();
} catch (error) {
logger.error("Operation failed", { error });
// Fallback logic
await step.define(
{ name: "fallback" },
async () => {
return await AlternativeService.process();
}
).run();
}
}
);
Decorator Style (TypeScript Decorator)
You can also use decorator style instead of function style:Copy
@workflow({
name: "process_order",
version: "1.0"
})
async function processOrder({ input, step, logger }) {
logger.info("Processing order", { orderId: input.orderId });
const payment = await step.define(
{ name: "charge" },
async () => {
return await PaymentService.charge(input.amount);
}
).run();
return { paymentId: payment.id };
}
Cautions
1. Idempotence
Steps must be idempotent. Running multiple times with the same input should guarantee the same result.Copy
// ❌ Bad: Not idempotent
await step.define(
{ name: "increment-counter" },
async () => {
counter++; // Duplicate increment on retry
}
).run();
// ✅ Good: Idempotent
await step.define(
{ name: "set-counter" },
async () => {
return await CounterModel.set(10); // Always the same value
}
).run();
2. Split Long Tasks into Steps
Copy
// ❌ Bad: Giant step
await step.define(
{ name: "process-all" },
async () => {
// Task taking 1 hour
for (let i = 0; i < 1000000; i++) {
await processItem(i);
}
}
).run();
// ✅ Good: Small steps
for (let i = 0; i < 1000; i += 100) {
await step.define(
{ name: `process-batch-${i}` },
async () => {
const batch = items.slice(i, i + 100);
await processBatch(batch);
}
).run();
}
3. Step Name Uniqueness
Step names must be unique within a workflow.Copy
// ❌ Bad: Duplicate names
await step.define({ name: "process" }, async () => { /* ... */ }).run();
await step.define({ name: "process" }, async () => { /* ... */ }).run(); // Error!
// ✅ Good: Unique names
await step.define({ name: "process-payment" }, async () => { /* ... */ }).run();
await step.define({ name: "process-shipping" }, async () => { /* ... */ }).run();
Monitoring
Check Execution Status
Copy
const handle = await Sonamu.workflowManager.run(
{ name: "process_order" },
{ orderId: 123 }
);
// Check status
console.log("Status:", handle.status);
// Wait for result (until complete)
const result = await handle.result();
console.log("Result:", result);
View Logs
Workflows automatically log through LogTape:Copy
[INFO] [workflow:process_order] Processing order {"orderId":123}
[DEBUG] [workflow:process_order] Step: charge-payment
[INFO] [workflow:process_order] Payment successful {"paymentId":"pay_123"}
Example Collection
- Order Processing
- Data Sync
- Report Generation
- Reminders
Copy
import { workflow } from "sonamu";
import { z } from "zod";
const OrderSchema = z.object({
orderId: z.number(),
userId: z.number(),
items: z.array(z.object({
productId: z.number(),
quantity: z.number(),
price: z.number()
})),
totalAmount: z.number(),
shippingAddress: z.string()
});
export const processOrder = workflow(
{
name: "process_order",
version: "1.0",
schema: OrderSchema
},
async ({ input, step, logger }) => {
logger.info("Processing order", { orderId: input.orderId });
// Step 1: Check inventory
const inventory = await step.define(
{ name: "check-inventory" },
async () => {
for (const item of input.items) {
const available = await InventoryModel.check(
item.productId,
item.quantity
);
if (!available) {
throw new Error(`Product ${item.productId} out of stock`);
}
}
return { available: true };
}
).run();
// Step 2: Process payment
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge({
userId: input.userId,
amount: input.totalAmount,
orderId: input.orderId
});
}
).run();
logger.info("Payment successful", { paymentId: payment.id });
// Step 3: Decrease inventory
await step.define(
{ name: "decrease-inventory" },
async () => {
for (const item of input.items) {
await InventoryModel.decrease(item.productId, item.quantity);
}
}
).run();
// Step 4: Start shipping
const shipping = await step.define(
{ name: "start-shipping" },
async () => {
return await ShippingService.createShipment({
orderId: input.orderId,
address: input.shippingAddress,
items: input.items
});
}
).run();
// Step 5: Send notifications
await step.define(
{ name: "send-notifications" },
async () => {
await NotificationService.send({
userId: input.userId,
type: "order_confirmed",
data: {
orderId: input.orderId,
trackingNumber: shipping.trackingNumber
}
});
}
).run();
return {
orderId: input.orderId,
paymentId: payment.id,
trackingNumber: shipping.trackingNumber,
status: "completed"
};
}
);
Copy
export const syncUserData = workflow(
{
name: "sync_user_data",
schedules: [
{
name: "hourly-sync",
expression: "0 * * * *", // Every hour
input: () => ({
timestamp: new Date().toISOString()
})
}
]
},
async ({ input, step, logger }) => {
logger.info("Starting user data sync", { timestamp: input.timestamp });
// Step 1: Fetch users from external API
const users = await step.define(
{ name: "fetch-external-users" },
async () => {
const response = await fetch("https://api.external.com/users");
return response.json();
}
).run();
logger.info("Fetched users", { count: users.length });
// Step 2: Process in batches
const batchSize = 100;
const batches = Math.ceil(users.length / batchSize);
for (let i = 0; i < batches; i++) {
await step.define(
{ name: `process-batch-${i}` },
async () => {
const batch = users.slice(i * batchSize, (i + 1) * batchSize);
for (const user of batch) {
await UserModel.upsert({
external_id: user.id,
email: user.email,
name: user.name,
synced_at: new Date()
});
}
return { processed: batch.length };
}
).run();
// Wait 1 second between batches
await step.sleep(`wait-after-batch-${i}`, "1s");
}
return {
totalUsers: users.length,
batches,
timestamp: input.timestamp
};
}
);
Copy
export const generateMonthlyReport = workflow(
{
name: "generate_monthly_report",
schedules: [
{
name: "first-of-month",
expression: "0 3 1 * *", // 1st of month at 3am
input: () => {
const now = new Date();
const lastMonth = new Date(now.getFullYear(), now.getMonth() - 1);
return {
year: lastMonth.getFullYear(),
month: lastMonth.getMonth() + 1
};
}
}
]
},
async ({ input, step, logger }) => {
logger.info("Generating monthly report", input);
// Step 1: Aggregate order data
const orderStats = await step.define(
{ name: "aggregate-orders" },
async () => {
const rdb = OrderModel.getPuri("r");
return rdb.table("orders")
.whereRaw("YEAR(created_at) = ?", [input.year])
.whereRaw("MONTH(created_at) = ?", [input.month])
.select(
rdb.raw("COUNT(*) as total_orders"),
rdb.raw("SUM(total_amount) as total_revenue"),
rdb.raw("AVG(total_amount) as avg_order_value")
)
.first();
}
).run();
// Step 2: User statistics
const userStats = await step.define(
{ name: "aggregate-users" },
async () => {
const rdb = UserModel.getPuri("r");
return rdb.table("users")
.whereRaw("YEAR(created_at) = ?", [input.year])
.whereRaw("MONTH(created_at) = ?", [input.month])
.count("* as new_users")
.first();
}
).run();
// Step 3: Product rankings
const topProducts = await step.define(
{ name: "rank-products" },
async () => {
const rdb = OrderModel.getPuri("r");
return rdb.table("order_items")
.join("orders", "orders.id", "order_items.order_id")
.whereRaw("YEAR(orders.created_at) = ?", [input.year])
.whereRaw("MONTH(orders.created_at) = ?", [input.month])
.groupBy("order_items.product_id")
.select(
"order_items.product_id",
rdb.raw("SUM(order_items.quantity) as total_quantity"),
rdb.raw("SUM(order_items.price * order_items.quantity) as total_sales")
)
.orderBy("total_sales", "desc")
.limit(10);
}
).run();
// Step 4: Save report
const report = await step.define(
{ name: "save-report" },
async () => {
const wdb = ReportModel.getDB("w");
return wdb.table("reports").insert({
type: "monthly",
year: input.year,
month: input.month,
data: JSON.stringify({
orders: orderStats,
users: userStats,
products: topProducts
}),
created_at: new Date()
}).returning("*");
}
).run();
// Step 5: Send email to admin
await step.define(
{ name: "send-email" },
async () => {
await EmailService.send({
to: "admin@example.com",
subject: `Monthly Report - ${input.year}/${input.month}`,
html: generateReportHTML({
orders: orderStats,
users: userStats,
products: topProducts
})
});
}
).run();
logger.info("Report generated", { reportId: report[0].id });
return {
reportId: report[0].id,
year: input.year,
month: input.month
};
}
);
Copy
export const sendReminders = workflow(
{
name: "send_reminders",
schedules: [
{
name: "daily-9am",
expression: "0 9 * * *", // Daily at 9am
input: () => ({
date: new Date().toISOString().split('T')[0]
})
}
]
},
async ({ input, step, logger }) => {
logger.info("Sending reminders", { date: input.date });
// Step 1: Fetch reminder targets
const tasks = await step.get(
{ name: "fetch-tasks" },
TaskModel,
"findPendingReminders"
).run();
logger.info("Found tasks", { count: tasks.length });
if (tasks.length === 0) {
return { sent: 0 };
}
// Step 2: Send reminder for each task
for (const task of tasks) {
await step.define(
{ name: `send-reminder-${task.id}` },
async () => {
// Send notification
await NotificationService.send({
userId: task.user_id,
type: "task_reminder",
data: {
taskId: task.id,
title: task.title,
dueDate: task.due_date
}
});
// Update status
const wdb = TaskModel.getDB("w");
await wdb.table("tasks")
.where("id", task.id)
.update({ reminder_sent: true });
return { taskId: task.id };
}
).run();
// Wait 0.5 seconds between reminders (rate limiting)
await step.sleep(`wait-after-${task.id}`, "500ms");
}
return { sent: tasks.length };
}
);