@workflow λ°μ½λ μ΄ν°λ μ₯κΈ° μ€ν μμ
, λ°±κ·ΈλΌμ΄λ μμ
, μ€μΌμ€λ§ μμ
μ μ μν©λλ€. OpenWorkflowλ₯Ό κΈ°λ°μΌλ‘ λ΄κ΅¬μ± μλ μ€ν, μ¬μλ, λͺ¨λν°λ§μ μ 곡ν©λλ€.
κΈ°λ³Έ μ¬μ©λ²
import { workflow } from "sonamu";
export const processOrder = workflow(
{
name: "process_order",
version: "1.0",
},
async ({ input, step, logger }) => {
logger.info("Processing order", { orderId: input.orderId });
// Step 1: κ²°μ μ²λ¦¬
const payment = await step
.define({ name: "process-payment" }, async () => {
return await PaymentService.charge(input.amount);
})
.run();
// Step 2: μ¬κ³ μ
λ°μ΄νΈ
await step
.define({ name: "update-inventory" }, async () => {
return await InventoryModel.decrease(input.productId, input.quantity);
})
.run();
// Step 3: λ°°μ‘ μμ
const shipping = await step
.define({ name: "start-shipping" }, async () => {
return await ShippingService.ship(input.address);
})
.run();
return {
orderId: input.orderId,
paymentId: payment.id,
trackingNumber: shipping.trackingNumber,
};
},
);
μ€μ (sonamu.config.ts)
μν¬νλ‘μ°λ₯Ό μ¬μ©νλ €λ©΄ PostgreSQL μ€μ μ΄ νμν©λλ€.import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
dbConfig: {
client: "pg",
connection: {
host: process.env.DB_HOST,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
},
},
worker: {
concurrency: 4, // λμ μ€ν μ
usePubSub: true, // Pub/Sub μ¬μ©
listenDelay: 500, // μμ μ§μ° (ms)
},
},
});
μ΅μ
name
μν¬νλ‘μ° μ΄λ¦μ μ§μ ν©λλ€. κΈ°λ³Έκ°: ν¨μ μ΄λ¦μ snake_caseλ‘ λ³ν// λͺ
μμ μ΄λ¦
export const myTask = workflow({ name: "process_order" }, async ({ input }) => {
/* ... */
});
// μλ μ΄λ¦ (ν¨μ μ΄λ¦ μ¬μ©)
export const processOrder = workflow(
{}, // name μλ΅ β "process_order"
async ({ input }) => {
/* ... */
},
);
version
μν¬νλ‘μ° λ²μ μ μ§μ ν©λλ€. κΈ°λ³Έκ°:null
export const processOrder = workflow(
{
name: "process_order",
version: "1.0",
},
async ({ input }) => {
/* ... */
},
);
// λ²μ μ
λ°μ΄νΈ μ
export const processOrderV2 = workflow(
{
name: "process_order",
version: "2.0",
},
async ({ input }) => {
/* ... */
},
);
κ°μ μ΄λ¦μ λ€λ₯Έ λ²μ μ μν¬νλ‘μ°λ₯Ό λ³λ ¬λ‘ μ€νν μ μμ΅λλ€.
schema
μ λ ₯ λ°μ΄ν°μ Zod μ€ν€λ§λ₯Ό μ μν©λλ€.import { z } from "zod";
const OrderInputSchema = z.object({
orderId: z.number(),
productId: z.number(),
quantity: z.number(),
amount: z.number(),
address: z.string(),
});
export const processOrder = workflow(
{
name: "process_order",
schema: OrderInputSchema,
},
async ({ input }) => {
// inputμ μλμΌλ‘ νμ
κ²μ¦λ¨
input.orderId; // number
input.address; // string
},
);
schedules
Cron ννμμΌλ‘ μ€μΌμ€μ μ μν©λλ€.export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily-at-midnight",
expression: "0 0 * * *", // λ§€μΌ μμ
input: { date: new Date().toISOString().split("T")[0] },
},
],
},
async ({ input, logger }) => {
logger.info("Generating daily report", { date: input.date });
// λ³΄κ³ μ μμ±
},
);
// λμ input
export const weeklyBackup = workflow(
{
name: "weekly_backup",
schedules: [
{
name: "weekly-sunday",
expression: "0 2 * * 0", // λ§€μ£Ό μΌμμΌ 2μ
input: () => ({
timestamp: new Date().toISOString(),
backupPath: `/backups/${Date.now()}`,
}),
},
],
},
async ({ input }) => {
// λ°±μ
μν
},
);
* * * * * - λ§€λΆ
0 * * * * - λ§€μκ°
0 0 * * * - λ§€μΌ μμ
0 0 * * 0 - λ§€μ£Ό μΌμμΌ μμ
0 0 1 * * - λ§€μ 1μΌ μμ
0 9 * * 1-5 - νμΌ μ€μ 9μ
*/5 * * * * - 5λΆλ§λ€
μν¬νλ‘μ° ν¨μ
μν¬νλ‘μ° ν¨μλ 4κ°μ λ§€κ°λ³μλ₯Ό λ°μ΅λλ€:async function workflowFn({ input, step, logger, version }) {
// input: μ
λ ₯ λ°μ΄ν°
// step: Step API
// logger: LogTape Logger
// version: μν¬νλ‘μ° λ²μ
}
input
μν¬νλ‘μ° μ€ν μ μ λ¬λ μ λ ₯ λ°μ΄ν°μ λλ€.export const processOrder = workflow({ name: "process_order" }, async ({ input }) => {
console.log("Order ID:", input.orderId);
console.log("Amount:", input.amount);
});
// μ€ν
await WorkflowManager.run({ name: "process_order" }, { orderId: 123, amount: 50000 });
step
Step APIλ μν¬νλ‘μ°μ κ° λ¨κ³λ₯Ό μ μν©λλ€. κ° stepμ λ 립μ μΌλ‘ μ¬μλλκ³ λ³΅κ΅¬λ μ μμ΅λλ€.export const processOrder = workflow({ name: "process_order" }, async ({ step }) => {
// Step 1
const payment = await step
.define({ name: "charge-payment" }, async () => {
return await PaymentService.charge(100);
})
.run();
// Step 2
const inventory = await step
.define({ name: "update-inventory" }, async () => {
return await InventoryModel.decrease(1);
})
.run();
// Step 3
await step
.define({ name: "send-email" }, async () => {
return await EmailService.send({
to: "user@example.com",
subject: "Order confirmed",
});
})
.run();
return { success: true };
});
- κ° stepμ ν λ²λ§ μ€νλ¨ (λ©±λ±μ±)
- μ€ν¨ μ ν΄λΉ stepλ§ μ¬μλ
- μ΄λ―Έ μλ£λ stepμ 건λλ
logger
LogTape Logger μΈμ€ν΄μ€μ λλ€.export const processData = workflow({ name: "process_data" }, async ({ logger }) => {
logger.info("Starting workflow");
logger.debug("Processing item", { itemId: 123 });
logger.warn("High memory usage", { usage: "80%" });
logger.error("Failed to process", { error: "Connection timeout" });
});
version
νμ¬ μν¬νλ‘μ°μ λ²μ μ λλ€.export const processOrder = workflow(
{
name: "process_order",
version: "2.0",
},
async ({ version }) => {
console.log("Running version:", version); // "2.0"
},
);
μν¬νλ‘μ° μ€ν
μλ μ€ν
// WorkflowManager μ¬μ©
import { Sonamu } from "sonamu";
const handle = await Sonamu.workflows.run(
{
name: "process_order",
version: "1.0",
},
{
orderId: 123,
amount: 50000,
},
);
// κ²°κ³Ό λκΈ°
const result = await handle.result();
console.log("Result:", result);
μ€μΌμ€ μ€ν
export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily-at-midnight",
expression: "0 0 * * *",
input: { date: new Date().toISOString().split("T")[0] },
},
],
},
async ({ input, logger }) => {
logger.info("Generating report", { date: input.date });
// μλμΌλ‘ λ§€μΌ μμ μ€νλ¨
},
);
Step API μμΈ
step.define().run()
ν¨μλ₯Ό stepμΌλ‘ κ°μΈμ μ€νν©λλ€.const result = await step
.define({ name: "step-name" }, async () => {
// μμ
μν
return { data: "result" };
})
.run();
export const processOrder = workflow({ name: "process_order" }, async ({ step }) => {
// κ²°μ μ²λ¦¬
const payment = await step
.define({ name: "process-payment" }, async () => {
return await PaymentService.charge(100);
})
.run();
// μ¬κ³ μ
λ°μ΄νΈ
await step
.define({ name: "update-inventory" }, async () => {
return await InventoryModel.decrease(1);
})
.run();
return { paymentId: payment.id };
});
step.get().run()
Model λ©μλλ₯Ό stepμΌλ‘ μ€νν©λλ€.// name λͺ
μ
const result = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 10 });
// name μλ΅ (λ©μλ μ΄λ¦ μλ μ¬μ©: "list")
const result = await step.get(UserModel, "list").run({ limit: 10 });
export const processUsers = workflow({ name: "process_users" }, async ({ step }) => {
// Model λ©μλλ₯Ό stepμΌλ‘ μ€ν
const users = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 100 });
// κ° μ¬μ©μ μ²λ¦¬
for (const user of users) {
await step
.define({ name: `process-user-${user.id}` }, async () => {
return await UserModel.updateStatus(user.id, "processed");
})
.run();
}
return { processed: users.length };
});
step.sleep()
μ§μ λ μκ° λμ λκΈ°ν©λλ€.await step.sleep("wait-1-hour", "1h");
await step.sleep("wait-30-minutes", "30m");
await step.sleep("wait-10-seconds", "10s");
"10s", "5m", "1h", "1d"
μ¬μ© μμ:
export const delayedTask = workflow({ name: "delayed_task" }, async ({ step, logger }) => {
logger.info("Starting task");
// 1μκ° λκΈ°
await step.sleep("wait-1-hour", "1h");
logger.info("Continuing after 1 hour");
// 30λΆ λκΈ°
await step.sleep("wait-30-minutes", "30m");
logger.info("Task complete");
});
μ¬μλ μ μ±
Stepμ μ¬μλλ OpenWorkflow λ°±μλμμ μλμΌλ‘ μ²λ¦¬λ©λλ€. λ³λμ retry μ΅μ
μ€μ μ΄ νμνμ§
μμ΅λλ€.
export const resilientTask = workflow({ name: "resilient_task" }, async ({ step }) => {
// Stepμ΄ μ€ν¨νλ©΄ μλμΌλ‘ μ¬μλλ¨
const result = await step
.define({ name: "flaky-operation" }, async () => {
const response = await fetch("https://api.example.com");
return response.json();
})
.run();
return result;
});
λ°μ΄ν°λ² μ΄μ€ μ κ·Ό
μν¬νλ‘μ° λ΄μμ Sonamu Modelμ μμ λ‘κ² μ¬μ©ν μ μμ΅λλ€.export const processUsers = workflow({ name: "process_users" }, async ({ step, logger }) => {
// Step 1: μ¬μ©μ μ‘°ν
const users = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 100 });
logger.info("Found users", { count: users.length });
// Step 2: κ° μ¬μ©μ μ²λ¦¬
for (const user of users) {
await step
.define({ name: `process-user-${user.id}` }, async () => {
return await UserModel.processUser(user.id);
})
.run();
}
return { processed: users.length };
});
μλ¬ μ²λ¦¬
μλ μ¬μλ
Stepμ΄ μ€ν¨νλ©΄ μλμΌλ‘ μ¬μλλ©λλ€.export const resilientTask = workflow({ name: "resilient_task" }, async ({ step }) => {
const result = await step
.define({ name: "flaky-operation" }, async () => {
// μ€ν¨ν μ μλ μμ
const response = await fetch("https://api.example.com");
return response.json();
})
.run();
return result;
});
μλ μλ¬ μ²λ¦¬
export const handleErrors = workflow({ name: "handle_errors" }, async ({ step, logger }) => {
try {
await step
.define({ name: "risky-operation" }, async () => {
throw new Error("Something went wrong");
})
.run();
} catch (error) {
logger.error("Operation failed", { error });
// λ체 λ‘μ§
await step
.define({ name: "fallback" }, async () => {
return await AlternativeService.process();
})
.run();
}
});
λ°μ½λ μ΄ν° μ€νμΌ (TypeScript Decorator)
ν¨μ μ€νμΌ λμ λ°μ½λ μ΄ν° μ€νμΌλ μ¬μ© κ°λ₯ν©λλ€:@workflow({
name: "process_order",
version: "1.0"
})
async function processOrder({ input, step, logger }) {
logger.info("Processing order", { orderId: input.orderId });
const payment = await step.define(
{ name: "charge" },
async () => {
return await PaymentService.charge(input.amount);
}
).run();
return { paymentId: payment.id };
}
μ£Όμμ¬ν
1. λ©±λ±μ±
Stepμ λ©±λ±ν΄μΌ ν©λλ€. κ°μ μ λ ₯μΌλ‘ μ¬λ¬ λ² μ€νν΄λ κ°μ κ²°κ³Όλ₯Ό 보μ₯ν΄μΌ ν©λλ€.// β λμ μ: λ©±λ±νμ§ μμ
await step
.define({ name: "increment-counter" }, async () => {
counter++; // μ¬μλ μ μ€λ³΅ μ¦κ°
})
.run();
// β
μ’μ μ: λ©±λ±ν¨
await step
.define({ name: "set-counter" }, async () => {
return await CounterModel.set(10); // νμ κ°μ κ°
})
.run();
2. κΈ΄ μμ μ stepμΌλ‘ λλκΈ°
// β λμ μ: κ±°λν step
await step
.define({ name: "process-all" }, async () => {
// 1μκ° κ±Έλ¦¬λ μμ
for (let i = 0; i < 1000000; i++) {
await processItem(i);
}
})
.run();
// β
μ’μ μ: μμ stepλ€
for (let i = 0; i < 1000; i += 100) {
await step
.define({ name: `process-batch-${i}` }, async () => {
const batch = items.slice(i, i + 100);
await processBatch(batch);
})
.run();
}
3. Step μ΄λ¦ κ³ μ μ±
Step μ΄λ¦μ μν¬νλ‘μ° λ΄μμ κ³ μ ν΄μΌ ν©λλ€.// β λμ μ: μ€λ³΅λ μ΄λ¦
await step
.define({ name: "process" }, async () => {
/* ... */
})
.run();
await step
.define({ name: "process" }, async () => {
/* ... */
})
.run(); // μλ¬!
// β
μ’μ μ: κ³ μ ν μ΄λ¦
await step
.define({ name: "process-payment" }, async () => {
/* ... */
})
.run();
await step
.define({ name: "process-shipping" }, async () => {
/* ... */
})
.run();
λͺ¨λν°λ§
μ€ν μν νμΈ
const handle = await Sonamu.workflows.run({ name: "process_order" }, { orderId: 123 });
// μν νμΈ
console.log("Status:", handle.status);
// κ²°κ³Ό λκΈ° (μλ£λ λκΉμ§)
const result = await handle.result();
console.log("Result:", result);
λ‘κ·Έ νμΈ
μν¬νλ‘μ°λ μλμΌλ‘ LogTapeλ₯Ό ν΅ν΄ λ‘κΉ λ©λλ€:[INFO] [workflow:process_order] Processing order {"orderId":123}
[DEBUG] [workflow:process_order] Step: charge-payment
[INFO] [workflow:process_order] Payment successful {"paymentId":"pay_123"}
μμ λͺ¨μ
- μ£Όλ¬Έ μ²λ¦¬
- λ°μ΄ν° λκΈ°ν
- λ³΄κ³ μ μμ±
- 리λ§μΈλ
import { workflow } from "sonamu";
import { z } from "zod";
const OrderSchema = z.object({
orderId: z.number(),
userId: z.number(),
items: z.array(z.object({
productId: z.number(),
quantity: z.number(),
price: z.number()
})),
totalAmount: z.number(),
shippingAddress: z.string()
});
export const processOrder = workflow(
{
name: "process_order",
version: "1.0",
schema: OrderSchema
},
async ({ input, step, logger }) => {
logger.info("Processing order", { orderId: input.orderId });
// Step 1: μ¬κ³ νμΈ
const inventory = await step.define(
{ name: "check-inventory" },
async () => {
for (const item of input.items) {
const available = await InventoryModel.check(
item.productId,
item.quantity
);
if (!available) {
throw new Error(`Product ${item.productId} out of stock`);
}
}
return { available: true };
}
).run();
// Step 2: κ²°μ μ²λ¦¬
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge({
userId: input.userId,
amount: input.totalAmount,
orderId: input.orderId
});
}
).run();
logger.info("Payment successful", { paymentId: payment.id });
// Step 3: μ¬κ³ μ°¨κ°
await step.define(
{ name: "decrease-inventory" },
async () => {
for (const item of input.items) {
await InventoryModel.decrease(item.productId, item.quantity);
}
}
).run();
// Step 4: λ°°μ‘ μμ
const shipping = await step.define(
{ name: "start-shipping" },
async () => {
return await ShippingService.createShipment({
orderId: input.orderId,
address: input.shippingAddress,
items: input.items
});
}
).run();
// Step 5: μλ¦Ό μ μ‘
await step.define(
{ name: "send-notifications" },
async () => {
await NotificationService.send({
userId: input.userId,
type: "order_confirmed",
data: {
orderId: input.orderId,
trackingNumber: shipping.trackingNumber
}
});
}
).run();
return {
orderId: input.orderId,
paymentId: payment.id,
trackingNumber: shipping.trackingNumber,
status: "completed"
};
}
);
export const syncUserData = workflow(
{
name: "sync_user_data",
schedules: [
{
name: "hourly-sync",
expression: "0 * * * *", // λ§€μκ°
input: () => ({
timestamp: new Date().toISOString()
})
}
]
},
async ({ input, step, logger }) => {
logger.info("Starting user data sync", { timestamp: input.timestamp });
// Step 1: μΈλΆ APIμμ μ¬μ©μ κ°μ Έμ€κΈ°
const users = await step.define(
{ name: "fetch-external-users" },
async () => {
const response = await fetch("https://api.external.com/users");
return response.json();
}
).run();
logger.info("Fetched users", { count: users.length });
// Step 2: λ°°μΉλ‘ μ²λ¦¬
const batchSize = 100;
const batches = Math.ceil(users.length / batchSize);
for (let i = 0; i < batches; i++) {
await step.define(
{ name: `process-batch-${i}` },
async () => {
const batch = users.slice(i * batchSize, (i + 1) * batchSize);
for (const user of batch) {
await UserModel.upsert({
external_id: user.id,
email: user.email,
name: user.name,
synced_at: new Date()
});
}
return { processed: batch.length };
}
).run();
// λ°°μΉ κ° 1μ΄ λκΈ°
await step.sleep(`wait-after-batch-${i}`, "1s");
}
return {
totalUsers: users.length,
batches,
timestamp: input.timestamp
};
}
);
export const generateMonthlyReport = workflow(
{
name: "generate_monthly_report",
schedules: [
{
name: "first-of-month",
expression: "0 3 1 * *", // λ§€μ 1μΌ 3μ
input: () => {
const now = new Date();
const lastMonth = new Date(now.getFullYear(), now.getMonth() - 1);
return {
year: lastMonth.getFullYear(),
month: lastMonth.getMonth() + 1
};
}
}
]
},
async ({ input, step, logger }) => {
logger.info("Generating monthly report", input);
// Step 1: μ£Όλ¬Έ λ°μ΄ν° μ§κ³
const orderStats = await step.define(
{ name: "aggregate-orders" },
async () => {
const rdb = OrderModel.getPuri("r");
return rdb.table("orders")
.whereRaw("YEAR(created_at) = ?", [input.year])
.whereRaw("MONTH(created_at) = ?", [input.month])
.select(
rdb.raw("COUNT(*) as total_orders"),
rdb.raw("SUM(total_amount) as total_revenue"),
rdb.raw("AVG(total_amount) as avg_order_value")
)
.first();
}
).run();
// Step 2: μ¬μ©μ ν΅κ³
const userStats = await step.define(
{ name: "aggregate-users" },
async () => {
const rdb = UserModel.getPuri("r");
return rdb.table("users")
.whereRaw("YEAR(created_at) = ?", [input.year])
.whereRaw("MONTH(created_at) = ?", [input.month])
.count("* as new_users")
.first();
}
).run();
// Step 3: μν μμ
const topProducts = await step.define(
{ name: "rank-products" },
async () => {
const rdb = OrderModel.getPuri("r");
return rdb.table("order_items")
.join("orders", "orders.id", "order_items.order_id")
.whereRaw("YEAR(orders.created_at) = ?", [input.year])
.whereRaw("MONTH(orders.created_at) = ?", [input.month])
.groupBy("order_items.product_id")
.select(
"order_items.product_id",
rdb.raw("SUM(order_items.quantity) as total_quantity"),
rdb.raw("SUM(order_items.price * order_items.quantity) as total_sales")
)
.orderBy("total_sales", "desc")
.limit(10);
}
).run();
// Step 4: λ³΄κ³ μ μ μ₯
const report = await step.define(
{ name: "save-report" },
async () => {
const wdb = ReportModel.getDB("w");
return wdb.table("reports").insert({
type: "monthly",
year: input.year,
month: input.month,
data: JSON.stringify({
orders: orderStats,
users: userStats,
products: topProducts
}),
created_at: new Date()
}).returning("*");
}
).run();
// Step 5: κ΄λ¦¬μμκ² μ΄λ©μΌ μ μ‘
await step.define(
{ name: "send-email" },
async () => {
await EmailService.send({
to: "admin@example.com",
subject: `Monthly Report - ${input.year}/${input.month}`,
html: generateReportHTML({
orders: orderStats,
users: userStats,
products: topProducts
})
});
}
).run();
logger.info("Report generated", { reportId: report[0].id });
return {
reportId: report[0].id,
year: input.year,
month: input.month
};
}
);
export const sendReminders = workflow(
{
name: "send_reminders",
schedules: [
{
name: "daily-9am",
expression: "0 9 * * *", // λ§€μΌ μ€μ 9μ
input: () => ({
date: new Date().toISOString().split('T')[0]
})
}
]
},
async ({ input, step, logger }) => {
logger.info("Sending reminders", { date: input.date });
// Step 1: 리λ§μΈλ λμ μ‘°ν
const tasks = await step.get(
{ name: "fetch-tasks" },
TaskModel,
"findPendingReminders"
).run();
logger.info("Found tasks", { count: tasks.length });
if (tasks.length === 0) {
return { sent: 0 };
}
// Step 2: κ° μμ
μ λν΄ λ¦¬λ§μΈλ μ μ‘
for (const task of tasks) {
await step.define(
{ name: `send-reminder-${task.id}` },
async () => {
// μλ¦Ό μ μ‘
await NotificationService.send({
userId: task.user_id,
type: "task_reminder",
data: {
taskId: task.id,
title: task.title,
dueDate: task.due_date
}
});
// μν μ
λ°μ΄νΈ
const wdb = TaskModel.getDB("w");
await wdb.table("tasks")
.where("id", task.id)
.update({ reminder_sent: true });
return { taskId: task.id };
}
).run();
// κ° λ¦¬λ§μΈλ μ¬μ΄ 0.5μ΄ λκΈ° (rate limiting)
await step.sleep(`wait-after-${task.id}`, "500ms");
}
return { sent: tasks.length };
}
);
λ€μ λ¨κ³
@api
API μλν¬μΈνΈ λ§λ€κΈ°
@transactional
νΈλμμ
μ¬μ©νκΈ°
μ€μΌμ€λ§
μμ
μ€μΌμ€λ§ κ°μ΄λ
OpenWorkflow
OpenWorkflow λ¬Έμ