Step is a core concept for dividing Workflows into smaller execution units. Each Step executes independently and can be retried from the point of failure, enabling safe processing of long-running tasks.
Why Steps Are Needed
Without Steps in a Workflow, if something fails midway, you have to restart from the beginning. With Steps, you can resume from where it failed.
// Bad: Without Steps - restart from beginning on failure
export const processOrder = workflow(
{ name: "process_order" },
async ({ input }) => {
await validatePayment(input.orderId); // 1. Validate payment
await updateInventory(input.items); // 2. Update inventory
await sendConfirmationEmail(input.email); // 3. Send email
// What if step 3 fails? -> Restart from step 1
}
);
// Good: With Steps - retry from failed step
export const processOrder = workflow(
{ name: "process_order" },
async ({ input, step }) => {
await step.define({ name: "validate_payment" }, async () => {
await validatePayment(input.orderId);
}).run();
await step.define({ name: "update_inventory" }, async () => {
await updateInventory(input.items);
}).run();
await step.define({ name: "send_email" }, async () => {
await sendConfirmationEmail(input.email);
}).run();
// What if step 3 fails? -> Retry from step 3
}
);
Benefits of Using Steps:
- Partial Retry: Re-execute only from the failed step
- Progress Tracking: Check execution time and status of each step
- Easier Debugging: Clearly identify which step failed
step.define()
step.define() is the most basic way to wrap a function as a step. When you make an async function a step, its execution history is saved to the database.
export const sendEmail = workflow(
{ name: "send_email" },
async ({ input, step }) => {
// Define and run step
await step.define(
{ name: "send" },
async () => {
await emailService.send({
to: input.email,
subject: "Hello",
body: "...",
});
}
).run();
}
);
Key Concepts:
name: Step identifier (must be unique within the workflow)
- Async function: Code that performs the actual work
.run(): Executes the step and returns the result
Using Return Values
You can use the return value of a Step in subsequent steps. Each step is saved independently, but data is passed in memory.
export const processData = workflow(
{ name: "process_data" },
async ({ input, step }) => {
// Step 1: Fetch data
const data = await step.define(
{ name: "fetch_data" },
async () => {
return await fetchDataFromAPI(input.userId);
}
).run();
// Step 2: Transform data (uses previous step result)
const transformed = await step.define(
{ name: "transform" },
async () => {
return transformData(data);
}
).run();
// Step 3: Save
await step.define(
{ name: "save" },
async () => {
await saveToDatabase(transformed);
}
).run();
return { count: transformed.length };
}
);
Data Flow:
fetch_data step returns data
- data is passed in memory to
transform step
- transformed is passed in memory to
save step
When a Step is retried, previous step results are restored from the database.
step.get()
step.get() is a convenient way to execute a Model method as a step. You don’t need to wrap the function separately, making the code more concise.
import { BaseModel, workflow } from "sonamu";
class UserModelClass extends BaseModel {
async sendWelcomeEmail(userId: number) {
const user = await this.findById(userId);
await emailService.send({
to: user.email,
subject: "Welcome!",
body: "...",
});
}
}
export const onboardUser = workflow(
{ name: "onboard_user" },
async ({ input, step }) => {
// Run Model method as a step
await step.get(
UserModel,
"sendWelcomeEmail"
).run(input.userId);
}
);
Use Cases:
- Reuse Model business logic
- Eliminate code duplication
- Maintain type safety
Custom Name
You can explicitly specify a Step name to make logs more readable.
await step.get(
{ name: "send_email" }, // Custom name
UserModel,
"sendWelcomeEmail"
).run(userId);
Using Return Values
Model method return values work the same way.
const user = await step.get(
UserModel,
"findById"
).run(userId);
console.log(user.email);
step.sleep()
step.sleep() is a step that waits for a specified duration. Use it for delays between retries, avoiding API rate limits, periodic polling, and more.
export const retryWithDelay = workflow(
{ name: "retry_with_delay" },
async ({ input, step, logger }) => {
for (let i = 0; i < 3; i++) {
try {
await step.define(
{ name: `attempt_${i + 1}` },
async () => {
await unreliableAPICall(input.data);
}
).run();
break; // Success
} catch (error) {
logger.warn(`Attempt ${i + 1} failed`, { error });
if (i < 2) {
// Wait before retry
await step.sleep("retry_delay", "5s");
} else {
throw error; // Final attempt failed
}
}
}
}
);
Supported Time Formats:
| Format | Description | Example |
|---|
s | Seconds | "5s" = 5 seconds |
m | Minutes | "10m" = 10 minutes |
h | Hours | "2h" = 2 hours |
d | Days | "1d" = 1 day |
Practical Uses:
- Retry delay: Wait 5 seconds after failure
- Rate limiting: Wait 1 second between API calls
- Batch interval: Rest 10 seconds after processing 100 items
step.sleep() is also a step, so it’s recorded in the database. On retry, the sleep won’t be re-executed.
Practical Examples
1. Batch Processing
When processing large amounts of data, divide into smaller batches. Making each batch a separate step means already processed batches won’t be re-run on failure.
export const processBatch = workflow(
{ name: "process_batch" },
async ({ input, step, logger }) => {
const batchSize = 100;
const total = input.items.length;
for (let i = 0; i < total; i += batchSize) {
const batch = input.items.slice(i, i + batchSize);
await step.define(
{ name: `process_batch_${i / batchSize}` },
async () => {
await Promise.all(
batch.map(item => processItem(item))
);
}
).run();
logger.info("Batch processed", {
batch: i / batchSize,
progress: `${Math.min(i + batchSize, total)}/${total}`,
});
}
return { processed: total };
}
);
Key Points:
- Process in batches of 100
- Each batch is an independent step
- Progress logged
2. External API Calls
External API calls can fail due to network issues. With Steps, you can retry only the failed API call.
export const syncData = workflow(
{ name: "sync_data" },
async ({ input, step, logger }) => {
// Step 1: Fetch data from API
const externalData = await step.define(
{ name: "fetch_from_api" },
async () => {
const response = await fetch(
`https://api.example.com/data?userId=${input.userId}`
);
return await response.json();
}
).run();
// Step 2: Normalize data
const normalized = await step.define(
{ name: "normalize_data" },
async () => {
return normalizeData(externalData);
}
).run();
// Step 3: Save to DB
await step.get(
{ name: "save_to_db" },
DataModel,
"saveMany"
).run(normalized);
logger.info("Sync completed", { count: normalized.length });
}
);
Benefits:
- On API call failure, skip data normalization
- On normalization failure, don’t re-call the API
- Can measure execution time for each stage
3. File Processing Pipeline
A pipeline that downloads a file, parses it, and saves it. Dividing each stage into steps makes debugging and retrying easier.
export const processUploadedFile = workflow(
{ name: "process_uploaded_file" },
async ({ input, step, logger }) => {
// Step 1: Download file
const fileContent = await step.define(
{ name: "download_file" },
async () => {
return await downloadFromS3(input.fileKey);
}
).run();
// Step 2: Parse
const parsed = await step.define(
{ name: "parse_file" },
async () => {
return parseCSV(fileContent);
}
).run();
// Step 3: Validate
const validated = await step.define(
{ name: "validate" },
async () => {
return validateData(parsed);
}
).run();
// Step 4: Save to DB
await step.define(
{ name: "save_to_db" },
async () => {
await bulkInsert(validated);
}
).run();
return { imported: validated.length };
}
);
Pipeline Structure:
- Download -> 2. Parse -> 3. Validate -> 4. Save
- On failure, retry from that step
- File is downloaded only once
4. API with Rate Limiting
When an API has rate limits, add delays between requests. Using step.sleep() prevents the delay from repeating on retry.
export const sendNotifications = workflow(
{ name: "send_notifications" },
async ({ input, step, logger }) => {
const users = input.userIds;
for (const userId of users) {
// Notify each user
await step.define(
{ name: `notify_user_${userId}` },
async () => {
const user = await UserModel.findById(userId);
await sendPushNotification(user.deviceToken, input.message);
}
).run();
// Delay between users (avoid API limits)
if (userId !== users[users.length - 1]) {
await step.sleep("rate_limit_delay", "100ms");
}
}
return { sent: users.length };
}
);
Rate Limiting Strategy:
- Wait 100ms between each notification
- Prevent API server overload
- On retry, skip already successful notifications
Step Naming Conventions
Use Unique Names
Step names must be unique within the same workflow. Using duplicate names will only execute the last step.
// Bad: Same name (only last one executes)
await step.define({ name: "process" }, async () => { ... }).run();
await step.define({ name: "process" }, async () => { ... }).run();
// Good: Different names
await step.define({ name: "process_payment" }, async () => { ... }).run();
await step.define({ name: "process_shipping" }, async () => { ... }).run();
Dynamic Names in Loops
When creating steps in loops, include the index to make names unique.
// In a loop
for (let i = 0; i < items.length; i++) {
await step.define(
{ name: `process_item_${i}` }, // Each step has different name
async () => {
await processItem(items[i]);
}
).run();
}
Naming Tips:
- Use nouns describing the task:
fetch_user, send_email
- Include index:
process_batch_0, process_batch_1
- Snake case recommended:
send_welcome_email
Important Notes
Step Best Practices:
-
Unique Names: Step names must be unique within the workflow.
{ name: "step_1" } // Good
{ name: "step_1" } // Bad - duplicate
-
Execution Order: Steps execute in the order written in code.
await step.define({ name: "first" }, ...).run();
await step.define({ name: "second" }, ...).run();
-
Error Propagation: When a step fails, the workflow stops. Wrap optional tasks in try-catch.
try {
await step.define({ name: "optional" }, ...).run();
} catch (error) {
logger.warn("Optional step failed", { error });
}
-
Type Inference: TypeScript automatically infers step return types.
const data: UserData = await step.define(...).run();
-
Sleep Duration Limits: Very long sleeps are not recommended as they maintain database connections.
await step.sleep("delay", "1h"); // OK
await step.sleep("delay", "30d"); // Not recommended
Next Steps