Workflow is a system for defining long-running tasks that execute in the background. Using the @workflow decorator, you can reliably execute email sending, data processing, scheduled tasks, and more.
Basic Concepts
The purpose of a Workflow is to safely execute asynchronous tasks. Unlike API requests, Workflows:
- Run for extended periods: Can execute from minutes to hours
- Are retryable: Automatically retry on failure
- Are monitored: Execution state is recorded in the database
- Are schedulable: Run periodically using Cron expressions
Basic Usage
When defining a Workflow, provide a name and an execution function. The execution function receives parameters like input, step, and logger.
// src/workflows/email.workflow.ts
import { workflow } from "sonamu";
export const sendWelcomeEmail = workflow(
{ name: "send_welcome_email" },
async ({ input, step, logger }) => {
logger.info("Sending welcome email", { email: input.email });
// Send email
await step.define({ name: "send_email" }, async () => {
await sendEmail({
to: input.email,
subject: "Welcome!",
body: "...",
});
}).run();
return { success: true };
}
);
Key Parameters:
name: Workflow identifier (must be unique)
input: Input data passed to the Workflow
step: Step execution object (divides and manages tasks)
logger: Logging object (records execution state)
If you omit the workflow name, it will be automatically set by converting the function name to snake_case.
Example: sendWelcomeEmail -> send_welcome_email
Running Workflows
Running from API
When running a Workflow from an API endpoint, it returns immediately and the task proceeds in the background. Users donβt have to wait for long-running tasks.
import { BaseModel, api, Sonamu } from "sonamu";
import { sendWelcomeEmail } from "../workflows/email.workflow";
class UserModelClass extends BaseModel {
@api({ httpMethod: 'POST' })
async register(email: string, password: string) {
// Save user
const wdb = this.getPuri("w");
wdb.ubRegister("users", { email, password });
const [userId] = await wdb.transaction(async (trx) => {
return trx.ubUpsert("users");
});
// Run Workflow (background)
await Sonamu.workflows.run(
{
name: "send_welcome_email",
version: null,
},
{ email }
);
return userId;
}
}
Execution Flow:
- API calls
Sonamu.workflows.run()
- Workflow is added to the queue and returns immediately
- Worker picks up the Workflow from the queue and executes it
- Execution results are stored in the database
Direct Execution
You can also run directly from scripts or other Workflows. Use handle.result() to wait for completion.
import { Sonamu } from "sonamu";
const handle = await Sonamu.workflows.run(
{
name: "send_welcome_email",
version: null,
},
{ email: "user@example.com" }
);
// Wait for result (optional)
const result = await handle.result();
console.log(result); // { success: true }
handle.result() waits until the Workflow completes. Donβt use this in API responses!
Schema Validation
Using Zod schemas, you can automatically validate input and output data. This prevents runtime errors from invalid data and improves TypeScript type inference.
import { workflow } from "sonamu";
import { z } from "zod";
const inputSchema = z.object({
userId: z.number(),
amount: z.number().positive(),
});
const outputSchema = z.object({
orderId: z.number(),
status: z.enum(["success", "failed"]),
});
export const processPayment = workflow(
{
name: "process_payment",
schema: {
input: inputSchema,
output: outputSchema,
},
},
async ({ input, step, logger }) => {
// input is automatically validated
logger.info("Processing payment", { userId: input.userId });
const orderId = await step.define({ name: "create_order" }, async () => {
return await createOrder(input.userId, input.amount);
}).run();
// output is also automatically validated
return {
orderId,
status: "success" as const,
};
}
);
Schema Validation Benefits:
- Type Safety: TypeScript accurately infers input/output types
- Runtime Validation: Verifies data format before execution
- Clear Contract: Makes the Workflow interface explicit
Version Management
When you need to change Workflow logic, specifying a version ensures existing running tasks complete with the old logic while new executions use the new logic.
export const sendWelcomeEmail = workflow(
{
name: "send_welcome_email",
version: "v2",
},
async ({ input, step, version, logger }) => {
logger.info("Version", { version }); // "v2"
// Version-specific logic
if (version === "v2") {
// New logic: HTML email
await sendHTMLEmail(input.email);
} else {
// Old logic: Plain text
await sendPlainEmail(input.email);
}
}
);
Use Cases:
- Email template changes
- Data processing logic improvements
- External API integration changes
When changing versions, existing running Workflows need the old version logic to still exist in the code to complete. Verify all executions are completed before deleting old version code.
Scheduling
Using Cron expressions, you can automatically run Workflows on a schedule. Use this for daily report generation, periodic data backups, and more.
export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily_at_9am",
expression: "0 9 * * *", // Every day at 9 AM
input: () => ({
date: new Date().toISOString().split('T')[0],
}),
},
],
},
async ({ input, step, logger }) => {
logger.info("Generating daily report", { date: input.date });
// Generate report
const report = await step.define({ name: "generate_report" }, async () => {
return await generateReport(input.date);
}).run();
// Send email
await step.define({ name: "send_report" }, async () => {
await sendReportEmail(report);
}).run();
return { success: true };
}
);
Cron Expression Guide:
| Expression | Execution Time | Use Case |
|---|
* * * * * | Every minute | Testing |
0 * * * * | Every hour | Hourly aggregation |
0 9 * * * | Daily at 9 AM | Daily reports |
0 9 * * 1 | Every Monday at 9 AM | Weekly reports |
0 0 1 * * | First day of month at midnight | Monthly settlements |
Providing a function for the input parameter allows you to dynamically generate data at execution time.
Multiple Schedules
You can register multiple schedules for a single Workflow. Each schedule can pass different input data, allowing the same logic to perform different tasks.
export const backupData = workflow(
{
name: "backup_data",
schedules: [
{
name: "hourly_backup",
expression: "0 * * * *", // Every hour
input: { type: "incremental" },
},
{
name: "daily_full_backup",
expression: "0 2 * * *", // Daily at 2 AM
input: { type: "full" },
},
],
},
async ({ input, step, logger }) => {
logger.info("Backup started", { type: input.type });
if (input.type === "full") {
// Full backup
await fullBackup();
} else {
// Incremental backup
await incrementalBackup();
}
}
);
Practical Applications:
- Incremental backup: Only changed data every hour
- Full backup: All data daily
- Different timezones: Run at different times by region
Practical Examples
1. Bulk Email Sending
When sending emails to thousands of users, Workflows let you process safely without API request timeouts.
// src/workflows/email.workflow.ts
import { workflow } from "sonamu";
import { z } from "zod";
export const sendBulkEmail = workflow(
{
name: "send_bulk_email",
schema: {
input: z.object({
userIds: z.array(z.number()),
subject: z.string(),
body: z.string(),
}),
},
},
async ({ input, step, logger }) => {
logger.info("Sending bulk email", { count: input.userIds.length });
// Fetch users
const users = await step.define({ name: "fetch_users" }, async () => {
return await UserModel.findMany({
wq: [['id', 'IN', input.userIds]],
});
}).run();
// Send emails
for (const user of users) {
await step.define(
{ name: `send_email_${user.id}` },
async () => {
await sendEmail({
to: user.email,
subject: input.subject,
body: input.body,
});
}
).run();
}
return { sent: users.length };
}
);
Key Points:
- User fetching and email sending are separate Steps
- Each email creates a Step enabling individual retries
- Progress is recorded in DB for monitoring
2. Data Pipeline
You can build pipelines that fetch data from external APIs, transform it, and store it.
// src/workflows/data-processing.workflow.ts
import { workflow } from "sonamu";
export const processUserData = workflow(
{
name: "process_user_data",
},
async ({ input, step, logger }) => {
// Stage 1: Collect data
const rawData = await step.define({ name: "collect_data" }, async () => {
return await fetchRawData();
}).run();
// Stage 2: Transform
const transformed = await step.define({ name: "transform" }, async () => {
return transformData(rawData);
}).run();
// Stage 3: Save
await step.define({ name: "save" }, async () => {
await saveToDatabase(transformed);
}).run();
logger.info("Processing completed", { count: transformed.length });
return { processed: transformed.length };
}
);
Benefits:
- Even if a stage fails, donβt restart from the beginning
- Measure execution time of each stage to identify bottlenecks
- When transform logic changes, skip the collection stage
3. Scheduled Cleanup Task
Schedule automatic deletion of old data.
// src/workflows/cleanup.workflow.ts
import { workflow } from "sonamu";
export const cleanupOldData = workflow(
{
name: "cleanup_old_data",
schedules: [
{
name: "daily_cleanup",
expression: "0 3 * * *", // Daily at 3 AM
input: {
daysToKeep: 90,
},
},
],
},
async ({ input, step, logger }) => {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - input.daysToKeep);
// Delete old data
const deleted = await step.define({ name: "delete_old_records" }, async () => {
return await knex('logs')
.where('created_at', '<', cutoffDate)
.delete();
}).run();
logger.info("Cleanup completed", { deleted });
return { deleted };
}
);
Use Cases:
- Log data cleanup
- Temporary file deletion
- Expired session removal
Pause and Resume
You can pause a running Workflow and resume it later. This feature is useful when managing resources or dealing with external dependency issues.
State Transitions
Workflow states transition as follows:
| Current Status | Available Action | Result Status |
|---|
pending | pause | paused |
running | pause | paused |
sleeping | pause | paused |
paused | resume | pending |
completed, failed, canceled | - | Cannot change (terminal states) |
Using Sonamu UI
You can directly pause or resume running Workflows from the Tasks tab in Sonamu UI.
- Pause: Click the βPauseβ button on Workflow cards with
pending, running, or sleeping status
- Resume: Click the βResumeβ button on Workflow cards with
paused status
Using the API
You can programmatically control Workflows from the backend.
import { Sonamu } from "sonamu";
// Run a Workflow
const handle = await Sonamu.workflows.run(
{ name: "data_processing", version: null },
{ dataId: 123 }
);
// Pause (using workflowRunId)
await Sonamu.workflows.backend.pauseWorkflowRun({
workflowRunId: handle.workflowRunId,
});
// Resume
await Sonamu.workflows.backend.resumeWorkflowRun({
workflowRunId: handle.workflowRunId,
});
Key Features
- Idempotency Guaranteed: Calling pause on an already
paused Workflow doesnβt throw an error. The same applies to resume.
- Terminal State Protection: Workflows with
completed, failed, or canceled status cannot be paused/resumed.
- Immediate Resume: When resume is called,
available_at is set to the current time, so the Worker picks up the task immediately.
Use Cases
// Example: Pause related Workflows during external API outage
export async function handleExternalApiOutage() {
const { data: runs } = await Sonamu.workflows.backend.listWorkflowRuns({
status: ["pending", "running", "sleeping"],
workflowName: "sync_external_data",
});
for (const run of runs) {
await Sonamu.workflows.backend.pauseWorkflowRun({
workflowRunId: run.id,
});
}
console.log(`Paused ${runs.length} Workflows.`);
}
// Example: Batch resume after external API recovery
export async function handleExternalApiRecovery() {
const { data: runs } = await Sonamu.workflows.backend.listWorkflowRuns({
status: ["paused"],
workflowName: "sync_external_data",
});
for (const run of runs) {
await Sonamu.workflows.backend.resumeWorkflowRun({
workflowRunId: run.id,
});
}
console.log(`Resumed ${runs.length} Workflows.`);
}
When a Workflow is paused, the currently running Step continues until completion. Before starting the next Step, the Worker checks the paused status and stops the work.
Important Notes
Workflow Best Practices:
-
Worker Required: A Worker process must be running to execute Workflows.
-
Divide into Steps: Split long tasks into multiple Steps. On failure, you donβt need to re-run everything.
// Bad: Long task without Steps
await longRunningTask();
// Good: Divided into Steps
await step.define({ name: "task" }, longRunningTask).run();
-
Avoid Duplicate Schedule Names: Schedule names must be unique within the same Workflow.
schedules: [
{ name: "backup_hourly", ... },
{ name: "backup_daily", ... }, // Different name
]
-
Timezone Configuration: Schedules follow
Sonamu.config.api.timezone.
// sonamu.config.ts
api: {
timezone: "Asia/Seoul",
}
-
Error Handling: Failed Workflows are automatically retried. Protect important tasks with try-catch.
await step.define({ name: "task" }, async () => {
try {
await riskyOperation();
} catch (error) {
logger.error("Operation failed", { error });
throw error;
}
}).run();
Next Steps