Skip to main content
Step is a core concept for dividing Workflows into smaller execution units. Each Step executes independently and can be retried from the point of failure, enabling safe processing of long-running tasks.

Why Steps Are Needed

Without Steps in a Workflow, if something fails midway, you have to restart from the beginning. With Steps, you can resume from where it failed.
// Bad: Without Steps - restart from beginning on failure
export const processOrder = workflow(
  { name: "process_order" },
  async ({ input }) => {
    await validatePayment(input.orderId);      // 1. Validate payment
    await updateInventory(input.items);        // 2. Update inventory
    await sendConfirmationEmail(input.email);  // 3. Send email
    // What if step 3 fails? -> Restart from step 1
  }
);

// Good: With Steps - retry from failed step
export const processOrder = workflow(
  { name: "process_order" },
  async ({ input, step }) => {
    await step.define({ name: "validate_payment" }, async () => {
      await validatePayment(input.orderId);
    }).run();

    await step.define({ name: "update_inventory" }, async () => {
      await updateInventory(input.items);
    }).run();

    await step.define({ name: "send_email" }, async () => {
      await sendConfirmationEmail(input.email);
    }).run();
    // What if step 3 fails? -> Retry from step 3
  }
);
Benefits of Using Steps:
  • Partial Retry: Re-execute only from the failed step
  • Progress Tracking: Check execution time and status of each step
  • Easier Debugging: Clearly identify which step failed

step.define()

step.define() is the most basic way to wrap a function as a step. When you make an async function a step, its execution history is saved to the database.
export const sendEmail = workflow(
  { name: "send_email" },
  async ({ input, step }) => {
    // Define and run step
    await step.define(
      { name: "send" },
      async () => {
        await emailService.send({
          to: input.email,
          subject: "Hello",
          body: "...",
        });
      }
    ).run();
  }
);
Key Concepts:
  • name: Step identifier (must be unique within the workflow)
  • Async function: Code that performs the actual work
  • .run(): Executes the step and returns the result

Using Return Values

You can use the return value of a Step in subsequent steps. Each step is saved independently, but data is passed in memory.
export const processData = workflow(
  { name: "process_data" },
  async ({ input, step }) => {
    // Step 1: Fetch data
    const data = await step.define(
      { name: "fetch_data" },
      async () => {
        return await fetchDataFromAPI(input.userId);
      }
    ).run();

    // Step 2: Transform data (uses previous step result)
    const transformed = await step.define(
      { name: "transform" },
      async () => {
        return transformData(data);
      }
    ).run();

    // Step 3: Save
    await step.define(
      { name: "save" },
      async () => {
        await saveToDatabase(transformed);
      }
    ).run();

    return { count: transformed.length };
  }
);
Data Flow:
  1. fetch_data step returns data
  2. data is passed in memory to transform step
  3. transformed is passed in memory to save step
When a Step is retried, previous step results are restored from the database.

step.get()

step.get() is a convenient way to execute a Model method as a step. You don’t need to wrap the function separately, making the code more concise.
import { BaseModel, workflow } from "sonamu";

class UserModelClass extends BaseModel {
  async sendWelcomeEmail(userId: number) {
    const user = await this.findById(userId);
    await emailService.send({
      to: user.email,
      subject: "Welcome!",
      body: "...",
    });
  }
}

export const onboardUser = workflow(
  { name: "onboard_user" },
  async ({ input, step }) => {
    // Run Model method as a step
    await step.get(
      UserModel,
      "sendWelcomeEmail"
    ).run(input.userId);
  }
);
Use Cases:
  • Reuse Model business logic
  • Eliminate code duplication
  • Maintain type safety

Custom Name

You can explicitly specify a Step name to make logs more readable.
await step.get(
  { name: "send_email" },  // Custom name
  UserModel,
  "sendWelcomeEmail"
).run(userId);

Using Return Values

Model method return values work the same way.
const user = await step.get(
  UserModel,
  "findById"
).run(userId);

console.log(user.email);

step.sleep()

step.sleep() is a step that waits for a specified duration. Use it for delays between retries, avoiding API rate limits, periodic polling, and more.
export const retryWithDelay = workflow(
  { name: "retry_with_delay" },
  async ({ input, step, logger }) => {
    for (let i = 0; i < 3; i++) {
      try {
        await step.define(
          { name: `attempt_${i + 1}` },
          async () => {
            await unreliableAPICall(input.data);
          }
        ).run();

        break;  // Success
      } catch (error) {
        logger.warn(`Attempt ${i + 1} failed`, { error });

        if (i < 2) {
          // Wait before retry
          await step.sleep("retry_delay", "5s");
        } else {
          throw error;  // Final attempt failed
        }
      }
    }
  }
);
Supported Time Formats:
FormatDescriptionExample
sSeconds"5s" = 5 seconds
mMinutes"10m" = 10 minutes
hHours"2h" = 2 hours
dDays"1d" = 1 day
Practical Uses:
  • Retry delay: Wait 5 seconds after failure
  • Rate limiting: Wait 1 second between API calls
  • Batch interval: Rest 10 seconds after processing 100 items
step.sleep() is also a step, so it’s recorded in the database. On retry, the sleep won’t be re-executed.

Practical Examples

1. Batch Processing

When processing large amounts of data, divide into smaller batches. Making each batch a separate step means already processed batches won’t be re-run on failure.
export const processBatch = workflow(
  { name: "process_batch" },
  async ({ input, step, logger }) => {
    const batchSize = 100;
    const total = input.items.length;

    for (let i = 0; i < total; i += batchSize) {
      const batch = input.items.slice(i, i + batchSize);

      await step.define(
        { name: `process_batch_${i / batchSize}` },
        async () => {
          await Promise.all(
            batch.map(item => processItem(item))
          );
        }
      ).run();

      logger.info("Batch processed", {
        batch: i / batchSize,
        progress: `${Math.min(i + batchSize, total)}/${total}`,
      });
    }

    return { processed: total };
  }
);
Key Points:
  • Process in batches of 100
  • Each batch is an independent step
  • Progress logged

2. External API Calls

External API calls can fail due to network issues. With Steps, you can retry only the failed API call.
export const syncData = workflow(
  { name: "sync_data" },
  async ({ input, step, logger }) => {
    // Step 1: Fetch data from API
    const externalData = await step.define(
      { name: "fetch_from_api" },
      async () => {
        const response = await fetch(
          `https://api.example.com/data?userId=${input.userId}`
        );
        return await response.json();
      }
    ).run();

    // Step 2: Normalize data
    const normalized = await step.define(
      { name: "normalize_data" },
      async () => {
        return normalizeData(externalData);
      }
    ).run();

    // Step 3: Save to DB
    await step.get(
      { name: "save_to_db" },
      DataModel,
      "saveMany"
    ).run(normalized);

    logger.info("Sync completed", { count: normalized.length });
  }
);
Benefits:
  • On API call failure, skip data normalization
  • On normalization failure, don’t re-call the API
  • Can measure execution time for each stage

3. File Processing Pipeline

A pipeline that downloads a file, parses it, and saves it. Dividing each stage into steps makes debugging and retrying easier.
export const processUploadedFile = workflow(
  { name: "process_uploaded_file" },
  async ({ input, step, logger }) => {
    // Step 1: Download file
    const fileContent = await step.define(
      { name: "download_file" },
      async () => {
        return await downloadFromS3(input.fileKey);
      }
    ).run();

    // Step 2: Parse
    const parsed = await step.define(
      { name: "parse_file" },
      async () => {
        return parseCSV(fileContent);
      }
    ).run();

    // Step 3: Validate
    const validated = await step.define(
      { name: "validate" },
      async () => {
        return validateData(parsed);
      }
    ).run();

    // Step 4: Save to DB
    await step.define(
      { name: "save_to_db" },
      async () => {
        await bulkInsert(validated);
      }
    ).run();

    return { imported: validated.length };
  }
);
Pipeline Structure:
  1. Download -> 2. Parse -> 3. Validate -> 4. Save
  2. On failure, retry from that step
  3. File is downloaded only once

4. API with Rate Limiting

When an API has rate limits, add delays between requests. Using step.sleep() prevents the delay from repeating on retry.
export const sendNotifications = workflow(
  { name: "send_notifications" },
  async ({ input, step, logger }) => {
    const users = input.userIds;

    for (const userId of users) {
      // Notify each user
      await step.define(
        { name: `notify_user_${userId}` },
        async () => {
          const user = await UserModel.findById(userId);
          await sendPushNotification(user.deviceToken, input.message);
        }
      ).run();

      // Delay between users (avoid API limits)
      if (userId !== users[users.length - 1]) {
        await step.sleep("rate_limit_delay", "100ms");
      }
    }

    return { sent: users.length };
  }
);
Rate Limiting Strategy:
  • Wait 100ms between each notification
  • Prevent API server overload
  • On retry, skip already successful notifications

Step Naming Conventions

Use Unique Names

Step names must be unique within the same workflow. Using duplicate names will only execute the last step.
// Bad: Same name (only last one executes)
await step.define({ name: "process" }, async () => { ... }).run();
await step.define({ name: "process" }, async () => { ... }).run();

// Good: Different names
await step.define({ name: "process_payment" }, async () => { ... }).run();
await step.define({ name: "process_shipping" }, async () => { ... }).run();

Dynamic Names in Loops

When creating steps in loops, include the index to make names unique.
// In a loop
for (let i = 0; i < items.length; i++) {
  await step.define(
    { name: `process_item_${i}` },  // Each step has different name
    async () => {
      await processItem(items[i]);
    }
  ).run();
}
Naming Tips:
  • Use nouns describing the task: fetch_user, send_email
  • Include index: process_batch_0, process_batch_1
  • Snake case recommended: send_welcome_email

Important Notes

Step Best Practices:
  1. Unique Names: Step names must be unique within the workflow.
    { name: "step_1" }  // Good
    { name: "step_1" }  // Bad - duplicate
    
  2. Execution Order: Steps execute in the order written in code.
    await step.define({ name: "first" }, ...).run();
    await step.define({ name: "second" }, ...).run();
    
  3. Error Propagation: When a step fails, the workflow stops. Wrap optional tasks in try-catch.
    try {
      await step.define({ name: "optional" }, ...).run();
    } catch (error) {
      logger.warn("Optional step failed", { error });
    }
    
  4. Type Inference: TypeScript automatically infers step return types.
    const data: UserData = await step.define(...).run();
    
  5. Sleep Duration Limits: Very long sleeps are not recommended as they maintain database connections.
    await step.sleep("delay", "1h");   // OK
    await step.sleep("delay", "30d");  // Not recommended
    

Next Steps