Skip to main content
Workflow is a system for defining long-running tasks that execute in the background. Using the @workflow decorator, you can reliably execute email sending, data processing, scheduled tasks, and more.

Basic Concepts

The purpose of a Workflow is to safely execute asynchronous tasks. Unlike API requests, Workflows:
  • Run for extended periods: Can execute from minutes to hours
  • Are retryable: Automatically retry on failure
  • Are monitored: Execution state is recorded in the database
  • Are schedulable: Run periodically using Cron expressions

Basic Usage

When defining a Workflow, provide a name and an execution function. The execution function receives parameters like input, step, and logger.
// src/workflows/email.workflow.ts
import { workflow } from "sonamu";

export const sendWelcomeEmail = workflow(
  { name: "send_welcome_email" },
  async ({ input, step, logger }) => {
    logger.info("Sending welcome email", { email: input.email });

    // Send email
    await step.define({ name: "send_email" }, async () => {
      await sendEmail({
        to: input.email,
        subject: "Welcome!",
        body: "...",
      });
    }).run();

    return { success: true };
  }
);
Key Parameters:
  • name: Workflow identifier (must be unique)
  • input: Input data passed to the Workflow
  • step: Step execution object (divides and manages tasks)
  • logger: Logging object (records execution state)
If you omit the workflow name, it will be automatically set by converting the function name to snake_case. Example: sendWelcomeEmail -> send_welcome_email

Running Workflows

Running from API

When running a Workflow from an API endpoint, it returns immediately and the task proceeds in the background. Users don’t have to wait for long-running tasks.
import { BaseModel, api, Sonamu } from "sonamu";
import { sendWelcomeEmail } from "../workflows/email.workflow";

class UserModelClass extends BaseModel {
  @api({ httpMethod: 'POST' })
  async register(email: string, password: string) {
    // Save user
    const wdb = this.getPuri("w");
    wdb.ubRegister("users", { email, password });

    const [userId] = await wdb.transaction(async (trx) => {
      return trx.ubUpsert("users");
    });

    // Run Workflow (background)
    await Sonamu.workflows.run(
      {
        name: "send_welcome_email",
        version: null,
      },
      { email }
    );

    return userId;
  }
}
Execution Flow:
  1. API calls Sonamu.workflows.run()
  2. Workflow is added to the queue and returns immediately
  3. Worker picks up the Workflow from the queue and executes it
  4. Execution results are stored in the database

Direct Execution

You can also run directly from scripts or other Workflows. Use handle.result() to wait for completion.
import { Sonamu } from "sonamu";

const handle = await Sonamu.workflows.run(
  {
    name: "send_welcome_email",
    version: null,
  },
  { email: "user@example.com" }
);

// Wait for result (optional)
const result = await handle.result();
console.log(result);  // { success: true }
handle.result() waits until the Workflow completes. Don’t use this in API responses!

Schema Validation

Using Zod schemas, you can automatically validate input and output data. This prevents runtime errors from invalid data and improves TypeScript type inference.
import { workflow } from "sonamu";
import { z } from "zod";

const inputSchema = z.object({
  userId: z.number(),
  amount: z.number().positive(),
});

const outputSchema = z.object({
  orderId: z.number(),
  status: z.enum(["success", "failed"]),
});

export const processPayment = workflow(
  {
    name: "process_payment",
    schema: {
      input: inputSchema,
      output: outputSchema,
    },
  },
  async ({ input, step, logger }) => {
    // input is automatically validated
    logger.info("Processing payment", { userId: input.userId });

    const orderId = await step.define({ name: "create_order" }, async () => {
      return await createOrder(input.userId, input.amount);
    }).run();

    // output is also automatically validated
    return {
      orderId,
      status: "success" as const,
    };
  }
);
Schema Validation Benefits:
  • Type Safety: TypeScript accurately infers input/output types
  • Runtime Validation: Verifies data format before execution
  • Clear Contract: Makes the Workflow interface explicit

Version Management

When you need to change Workflow logic, specifying a version ensures existing running tasks complete with the old logic while new executions use the new logic.
export const sendWelcomeEmail = workflow(
  {
    name: "send_welcome_email",
    version: "v2",
  },
  async ({ input, step, version, logger }) => {
    logger.info("Version", { version });  // "v2"

    // Version-specific logic
    if (version === "v2") {
      // New logic: HTML email
      await sendHTMLEmail(input.email);
    } else {
      // Old logic: Plain text
      await sendPlainEmail(input.email);
    }
  }
);
Use Cases:
  • Email template changes
  • Data processing logic improvements
  • External API integration changes
When changing versions, existing running Workflows need the old version logic to still exist in the code to complete. Verify all executions are completed before deleting old version code.

Scheduling

Using Cron expressions, you can automatically run Workflows on a schedule. Use this for daily report generation, periodic data backups, and more.
export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily_at_9am",
        expression: "0 9 * * *",  // Every day at 9 AM
        input: () => ({
          date: new Date().toISOString().split('T')[0],
        }),
      },
    ],
  },
  async ({ input, step, logger }) => {
    logger.info("Generating daily report", { date: input.date });

    // Generate report
    const report = await step.define({ name: "generate_report" }, async () => {
      return await generateReport(input.date);
    }).run();

    // Send email
    await step.define({ name: "send_report" }, async () => {
      await sendReportEmail(report);
    }).run();

    return { success: true };
  }
);
Cron Expression Guide:
ExpressionExecution TimeUse Case
* * * * *Every minuteTesting
0 * * * *Every hourHourly aggregation
0 9 * * *Daily at 9 AMDaily reports
0 9 * * 1Every Monday at 9 AMWeekly reports
0 0 1 * *First day of month at midnightMonthly settlements
Providing a function for the input parameter allows you to dynamically generate data at execution time.

Multiple Schedules

You can register multiple schedules for a single Workflow. Each schedule can pass different input data, allowing the same logic to perform different tasks.
export const backupData = workflow(
  {
    name: "backup_data",
    schedules: [
      {
        name: "hourly_backup",
        expression: "0 * * * *",  // Every hour
        input: { type: "incremental" },
      },
      {
        name: "daily_full_backup",
        expression: "0 2 * * *",  // Daily at 2 AM
        input: { type: "full" },
      },
    ],
  },
  async ({ input, step, logger }) => {
    logger.info("Backup started", { type: input.type });

    if (input.type === "full") {
      // Full backup
      await fullBackup();
    } else {
      // Incremental backup
      await incrementalBackup();
    }
  }
);
Practical Applications:
  • Incremental backup: Only changed data every hour
  • Full backup: All data daily
  • Different timezones: Run at different times by region

Practical Examples

1. Bulk Email Sending

When sending emails to thousands of users, Workflows let you process safely without API request timeouts.
// src/workflows/email.workflow.ts
import { workflow } from "sonamu";
import { z } from "zod";

export const sendBulkEmail = workflow(
  {
    name: "send_bulk_email",
    schema: {
      input: z.object({
        userIds: z.array(z.number()),
        subject: z.string(),
        body: z.string(),
      }),
    },
  },
  async ({ input, step, logger }) => {
    logger.info("Sending bulk email", { count: input.userIds.length });

    // Fetch users
    const users = await step.define({ name: "fetch_users" }, async () => {
      return await UserModel.findMany({
        wq: [['id', 'IN', input.userIds]],
      });
    }).run();

    // Send emails
    for (const user of users) {
      await step.define(
        { name: `send_email_${user.id}` },
        async () => {
          await sendEmail({
            to: user.email,
            subject: input.subject,
            body: input.body,
          });
        }
      ).run();
    }

    return { sent: users.length };
  }
);
Key Points:
  • User fetching and email sending are separate Steps
  • Each email creates a Step enabling individual retries
  • Progress is recorded in DB for monitoring

2. Data Pipeline

You can build pipelines that fetch data from external APIs, transform it, and store it.
// src/workflows/data-processing.workflow.ts
import { workflow } from "sonamu";

export const processUserData = workflow(
  {
    name: "process_user_data",
  },
  async ({ input, step, logger }) => {
    // Stage 1: Collect data
    const rawData = await step.define({ name: "collect_data" }, async () => {
      return await fetchRawData();
    }).run();

    // Stage 2: Transform
    const transformed = await step.define({ name: "transform" }, async () => {
      return transformData(rawData);
    }).run();

    // Stage 3: Save
    await step.define({ name: "save" }, async () => {
      await saveToDatabase(transformed);
    }).run();

    logger.info("Processing completed", { count: transformed.length });

    return { processed: transformed.length };
  }
);
Benefits:
  • Even if a stage fails, don’t restart from the beginning
  • Measure execution time of each stage to identify bottlenecks
  • When transform logic changes, skip the collection stage

3. Scheduled Cleanup Task

Schedule automatic deletion of old data.
// src/workflows/cleanup.workflow.ts
import { workflow } from "sonamu";

export const cleanupOldData = workflow(
  {
    name: "cleanup_old_data",
    schedules: [
      {
        name: "daily_cleanup",
        expression: "0 3 * * *",  // Daily at 3 AM
        input: {
          daysToKeep: 90,
        },
      },
    ],
  },
  async ({ input, step, logger }) => {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - input.daysToKeep);

    // Delete old data
    const deleted = await step.define({ name: "delete_old_records" }, async () => {
      return await knex('logs')
        .where('created_at', '<', cutoffDate)
        .delete();
    }).run();

    logger.info("Cleanup completed", { deleted });

    return { deleted };
  }
);
Use Cases:
  • Log data cleanup
  • Temporary file deletion
  • Expired session removal

Pause and Resume

You can pause a running Workflow and resume it later. This feature is useful when managing resources or dealing with external dependency issues.

State Transitions

Workflow states transition as follows:
Current StatusAvailable ActionResult Status
pendingpausepaused
runningpausepaused
sleepingpausepaused
pausedresumepending
completed, failed, canceled-Cannot change (terminal states)

Using Sonamu UI

You can directly pause or resume running Workflows from the Tasks tab in Sonamu UI.
  1. Pause: Click the β€œPause” button on Workflow cards with pending, running, or sleeping status
  2. Resume: Click the β€œResume” button on Workflow cards with paused status

Using the API

You can programmatically control Workflows from the backend.
import { Sonamu } from "sonamu";

// Run a Workflow
const handle = await Sonamu.workflows.run(
  { name: "data_processing", version: null },
  { dataId: 123 }
);

// Pause (using workflowRunId)
await Sonamu.workflows.backend.pauseWorkflowRun({
  workflowRunId: handle.workflowRunId,
});

// Resume
await Sonamu.workflows.backend.resumeWorkflowRun({
  workflowRunId: handle.workflowRunId,
});

Key Features

  • Idempotency Guaranteed: Calling pause on an already paused Workflow doesn’t throw an error. The same applies to resume.
  • Terminal State Protection: Workflows with completed, failed, or canceled status cannot be paused/resumed.
  • Immediate Resume: When resume is called, available_at is set to the current time, so the Worker picks up the task immediately.

Use Cases

// Example: Pause related Workflows during external API outage
export async function handleExternalApiOutage() {
  const { data: runs } = await Sonamu.workflows.backend.listWorkflowRuns({
    status: ["pending", "running", "sleeping"],
    workflowName: "sync_external_data",
  });

  for (const run of runs) {
    await Sonamu.workflows.backend.pauseWorkflowRun({
      workflowRunId: run.id,
    });
  }

  console.log(`Paused ${runs.length} Workflows.`);
}

// Example: Batch resume after external API recovery
export async function handleExternalApiRecovery() {
  const { data: runs } = await Sonamu.workflows.backend.listWorkflowRuns({
    status: ["paused"],
    workflowName: "sync_external_data",
  });

  for (const run of runs) {
    await Sonamu.workflows.backend.resumeWorkflowRun({
      workflowRunId: run.id,
    });
  }

  console.log(`Resumed ${runs.length} Workflows.`);
}
When a Workflow is paused, the currently running Step continues until completion. Before starting the next Step, the Worker checks the paused status and stops the work.

Important Notes

Workflow Best Practices:
  1. Worker Required: A Worker process must be running to execute Workflows.
    pnpm worker
    
  2. Divide into Steps: Split long tasks into multiple Steps. On failure, you don’t need to re-run everything.
    // Bad: Long task without Steps
    await longRunningTask();
    
    // Good: Divided into Steps
    await step.define({ name: "task" }, longRunningTask).run();
    
  3. Avoid Duplicate Schedule Names: Schedule names must be unique within the same Workflow.
    schedules: [
      { name: "backup_hourly", ... },
      { name: "backup_daily", ... },  // Different name
    ]
    
  4. Timezone Configuration: Schedules follow Sonamu.config.api.timezone.
    // sonamu.config.ts
    api: {
      timezone: "Asia/Seoul",
    }
    
  5. Error Handling: Failed Workflows are automatically retried. Protect important tasks with try-catch.
    await step.define({ name: "task" }, async () => {
      try {
        await riskyOperation();
      } catch (error) {
        logger.error("Operation failed", { error });
        throw error;
      }
    }).run();
    

Next Steps