@workflow ๋ฐ์ฝ๋ ์ดํฐ๋ ์ฅ๊ธฐ ์คํ ์์
, ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
, ์ค์ผ์ค๋ง ์์
์ ์ ์ํฉ๋๋ค. OpenWorkflow๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๋ด๊ตฌ์ฑ ์๋ ์คํ, ์ฌ์๋, ๋ชจ๋ํฐ๋ง์ ์ ๊ณตํฉ๋๋ค.
๊ธฐ๋ณธ ์ฌ์ฉ๋ฒ
๋ณต์ฌ
import { workflow } from "sonamu";
export const processOrder = workflow(
{
name: "process_order",
version: "1.0"
},
async ({ input, step, logger }) => {
logger.info("Processing order", { orderId: input.orderId });
// Step 1: ๊ฒฐ์ ์ฒ๋ฆฌ
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge(input.amount);
}
).run();
// Step 2: ์ฌ๊ณ ์
๋ฐ์ดํธ
await step.define(
{ name: "update-inventory" },
async () => {
return await InventoryModel.decrease(input.productId, input.quantity);
}
).run();
// Step 3: ๋ฐฐ์ก ์์
const shipping = await step.define(
{ name: "start-shipping" },
async () => {
return await ShippingService.ship(input.address);
}
).run();
return {
orderId: input.orderId,
paymentId: payment.id,
trackingNumber: shipping.trackingNumber
};
}
);
์ค์ (sonamu.config.ts)
์ํฌํ๋ก์ฐ๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด PostgreSQL ์ค์ ์ด ํ์ํฉ๋๋ค.๋ณต์ฌ
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
dbConfig: {
client: "pg",
connection: {
host: process.env.DB_HOST,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD
}
},
worker: {
concurrency: 4, // ๋์ ์คํ ์
usePubSub: true, // Pub/Sub ์ฌ์ฉ
listenDelay: 500 // ์์ ์ง์ฐ (ms)
}
}
});
์ต์
name
์ํฌํ๋ก์ฐ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ: ํจ์ ์ด๋ฆ์ snake_case๋ก ๋ณํ๋ณต์ฌ
// ๋ช
์์ ์ด๋ฆ
export const myTask = workflow(
{ name: "process_order" },
async ({ input }) => { /* ... */ }
);
// ์๋ ์ด๋ฆ (ํจ์ ์ด๋ฆ ์ฌ์ฉ)
export const processOrder = workflow(
{}, // name ์๋ต โ "process_order"
async ({ input }) => { /* ... */ }
);
version
์ํฌํ๋ก์ฐ ๋ฒ์ ์ ์ง์ ํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ:null
๋ณต์ฌ
export const processOrder = workflow(
{
name: "process_order",
version: "1.0"
},
async ({ input }) => { /* ... */ }
);
// ๋ฒ์ ์
๋ฐ์ดํธ ์
export const processOrderV2 = workflow(
{
name: "process_order",
version: "2.0"
},
async ({ input }) => { /* ... */ }
);
๊ฐ์ ์ด๋ฆ์ ๋ค๋ฅธ ๋ฒ์ ์ ์ํฌํ๋ก์ฐ๋ฅผ ๋ณ๋ ฌ๋ก ์คํํ ์ ์์ต๋๋ค.
schema
์ ๋ ฅ ๋ฐ์ดํฐ์ Zod ์คํค๋ง๋ฅผ ์ ์ํฉ๋๋ค.๋ณต์ฌ
import { z } from "zod";
const OrderInputSchema = z.object({
orderId: z.number(),
productId: z.number(),
quantity: z.number(),
amount: z.number(),
address: z.string()
});
export const processOrder = workflow(
{
name: "process_order",
schema: OrderInputSchema
},
async ({ input }) => {
// input์ ์๋์ผ๋ก ํ์
๊ฒ์ฆ๋จ
input.orderId; // number
input.address; // string
}
);
schedules
Cron ํํ์์ผ๋ก ์ค์ผ์ค์ ์ ์ํฉ๋๋ค.๋ณต์ฌ
export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily-at-midnight",
expression: "0 0 * * *", // ๋งค์ผ ์์
input: { date: new Date().toISOString().split('T')[0] }
}
]
},
async ({ input, logger }) => {
logger.info("Generating daily report", { date: input.date });
// ๋ณด๊ณ ์ ์์ฑ
}
);
// ๋์ input
export const weeklyBackup = workflow(
{
name: "weekly_backup",
schedules: [
{
name: "weekly-sunday",
expression: "0 2 * * 0", // ๋งค์ฃผ ์ผ์์ผ 2์
input: () => ({
timestamp: new Date().toISOString(),
backupPath: `/backups/${Date.now()}`
})
}
]
},
async ({ input }) => {
// ๋ฐฑ์
์ํ
}
);
๋ณต์ฌ
* * * * * - ๋งค๋ถ
0 * * * * - ๋งค์๊ฐ
0 0 * * * - ๋งค์ผ ์์
0 0 * * 0 - ๋งค์ฃผ ์ผ์์ผ ์์
0 0 1 * * - ๋งค์ 1์ผ ์์
0 9 * * 1-5 - ํ์ผ ์ค์ 9์
*/5 * * * * - 5๋ถ๋ง๋ค
์ํฌํ๋ก์ฐ ํจ์
์ํฌํ๋ก์ฐ ํจ์๋ 4๊ฐ์ ๋งค๊ฐ๋ณ์๋ฅผ ๋ฐ์ต๋๋ค:๋ณต์ฌ
async function workflowFn({ input, step, logger, version }) {
// input: ์
๋ ฅ ๋ฐ์ดํฐ
// step: Step API
// logger: LogTape Logger
// version: ์ํฌํ๋ก์ฐ ๋ฒ์
}
input
์ํฌํ๋ก์ฐ ์คํ ์ ์ ๋ฌ๋ ์ ๋ ฅ ๋ฐ์ดํฐ์ ๋๋ค.๋ณต์ฌ
export const processOrder = workflow(
{ name: "process_order" },
async ({ input }) => {
console.log("Order ID:", input.orderId);
console.log("Amount:", input.amount);
}
);
// ์คํ
await WorkflowManager.run(
{ name: "process_order" },
{ orderId: 123, amount: 50000 }
);
step
Step API๋ ์ํฌํ๋ก์ฐ์ ๊ฐ ๋จ๊ณ๋ฅผ ์ ์ํฉ๋๋ค. ๊ฐ step์ ๋ ๋ฆฝ์ ์ผ๋ก ์ฌ์๋๋๊ณ ๋ณต๊ตฌ๋ ์ ์์ต๋๋ค.๋ณต์ฌ
export const processOrder = workflow(
{ name: "process_order" },
async ({ step }) => {
// Step 1
const payment = await step.define(
{ name: "charge-payment" },
async () => {
return await PaymentService.charge(100);
}
).run();
// Step 2
const inventory = await step.define(
{ name: "update-inventory" },
async () => {
return await InventoryModel.decrease(1);
}
).run();
// Step 3
await step.define(
{ name: "send-email" },
async () => {
return await EmailService.send({
to: "[email protected]",
subject: "Order confirmed"
});
}
).run();
return { success: true };
}
);
- ๊ฐ step์ ํ ๋ฒ๋ง ์คํ๋จ (๋ฉฑ๋ฑ์ฑ)
- ์คํจ ์ ํด๋น step๋ง ์ฌ์๋
- ์ด๋ฏธ ์๋ฃ๋ step์ ๊ฑด๋๋
logger
LogTape Logger ์ธ์คํด์ค์ ๋๋ค.๋ณต์ฌ
export const processData = workflow(
{ name: "process_data" },
async ({ logger }) => {
logger.info("Starting workflow");
logger.debug("Processing item", { itemId: 123 });
logger.warn("High memory usage", { usage: "80%" });
logger.error("Failed to process", { error: "Connection timeout" });
}
);
version
ํ์ฌ ์ํฌํ๋ก์ฐ์ ๋ฒ์ ์ ๋๋ค.๋ณต์ฌ
export const processOrder = workflow(
{
name: "process_order",
version: "2.0"
},
async ({ version }) => {
console.log("Running version:", version); // "2.0"
}
);
์ํฌํ๋ก์ฐ ์คํ
์๋ ์คํ
๋ณต์ฌ
// WorkflowManager ์ฌ์ฉ
import { Sonamu } from "sonamu";
const handle = await Sonamu.workflowManager.run(
{
name: "process_order",
version: "1.0"
},
{
orderId: 123,
amount: 50000
}
);
// ๊ฒฐ๊ณผ ๋๊ธฐ
const result = await handle.result();
console.log("Result:", result);
์ค์ผ์ค ์คํ
๋ณต์ฌ
export const dailyReport = workflow(
{
name: "daily_report",
schedules: [
{
name: "daily-at-midnight",
expression: "0 0 * * *",
input: { date: new Date().toISOString().split('T')[0] }
}
]
},
async ({ input, logger }) => {
logger.info("Generating report", { date: input.date });
// ์๋์ผ๋ก ๋งค์ผ ์์ ์คํ๋จ
}
);
Step API ์์ธ
step.define().run()
ํจ์๋ฅผ step์ผ๋ก ๊ฐ์ธ์ ์คํํฉ๋๋ค.๋ณต์ฌ
const result = await step.define(
{ name: "step-name" },
async () => {
// ์์
์ํ
return { data: "result" };
}
).run();
๋ณต์ฌ
export const processOrder = workflow(
{ name: "process_order" },
async ({ step }) => {
// ๊ฒฐ์ ์ฒ๋ฆฌ
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge(100);
}
).run();
// ์ฌ๊ณ ์
๋ฐ์ดํธ
await step.define(
{ name: "update-inventory" },
async () => {
return await InventoryModel.decrease(1);
}
).run();
return { paymentId: payment.id };
}
);
step.get().run()
Model ๋ฉ์๋๋ฅผ step์ผ๋ก ์คํํฉ๋๋ค.๋ณต์ฌ
// name ๋ช
์
const result = await step.get(
{ name: "fetch-users" },
UserModel,
"list"
).run({ limit: 10 });
// name ์๋ต (๋ฉ์๋ ์ด๋ฆ ์๋ ์ฌ์ฉ: "list")
const result = await step.get(
UserModel,
"list"
).run({ limit: 10 });
๋ณต์ฌ
export const processUsers = workflow(
{ name: "process_users" },
async ({ step }) => {
// Model ๋ฉ์๋๋ฅผ step์ผ๋ก ์คํ
const users = await step.get(
{ name: "fetch-users" },
UserModel,
"list"
).run({ limit: 100 });
// ๊ฐ ์ฌ์ฉ์ ์ฒ๋ฆฌ
for (const user of users) {
await step.define(
{ name: `process-user-${user.id}` },
async () => {
return await UserModel.updateStatus(user.id, "processed");
}
).run();
}
return { processed: users.length };
}
);
step.sleep()
์ง์ ๋ ์๊ฐ ๋์ ๋๊ธฐํฉ๋๋ค.๋ณต์ฌ
await step.sleep("wait-1-hour", "1h");
await step.sleep("wait-30-minutes", "30m");
await step.sleep("wait-10-seconds", "10s");
"10s", "5m", "1h", "1d"
์ฌ์ฉ ์์:
๋ณต์ฌ
export const delayedTask = workflow(
{ name: "delayed_task" },
async ({ step, logger }) => {
logger.info("Starting task");
// 1์๊ฐ ๋๊ธฐ
await step.sleep("wait-1-hour", "1h");
logger.info("Continuing after 1 hour");
// 30๋ถ ๋๊ธฐ
await step.sleep("wait-30-minutes", "30m");
logger.info("Task complete");
}
);
์ฌ์๋ ์ ์ฑ
Step์ ์ฌ์๋๋ OpenWorkflow ๋ฐฑ์๋์์ ์๋์ผ๋ก ์ฒ๋ฆฌ๋ฉ๋๋ค. ๋ณ๋์ retry ์ต์
์ค์ ์ด ํ์ํ์ง ์์ต๋๋ค.
๋ณต์ฌ
export const resilientTask = workflow(
{ name: "resilient_task" },
async ({ step }) => {
// Step์ด ์คํจํ๋ฉด ์๋์ผ๋ก ์ฌ์๋๋จ
const result = await step.define(
{ name: "flaky-operation" },
async () => {
const response = await fetch("https://api.example.com");
return response.json();
}
).run();
return result;
}
);
๋ฐ์ดํฐ๋ฒ ์ด์ค ์ ๊ทผ
์ํฌํ๋ก์ฐ ๋ด์์ Sonamu Model์ ์์ ๋กญ๊ฒ ์ฌ์ฉํ ์ ์์ต๋๋ค.๋ณต์ฌ
export const processUsers = workflow(
{ name: "process_users" },
async ({ step, logger }) => {
// Step 1: ์ฌ์ฉ์ ์กฐํ
const users = await step.get(
{ name: "fetch-users" },
UserModel,
"list"
).run({ limit: 100 });
logger.info("Found users", { count: users.length });
// Step 2: ๊ฐ ์ฌ์ฉ์ ์ฒ๋ฆฌ
for (const user of users) {
await step.define(
{ name: `process-user-${user.id}` },
async () => {
return await UserModel.processUser(user.id);
}
).run();
}
return { processed: users.length };
}
);
์๋ฌ ์ฒ๋ฆฌ
์๋ ์ฌ์๋
Step์ด ์คํจํ๋ฉด ์๋์ผ๋ก ์ฌ์๋๋ฉ๋๋ค.๋ณต์ฌ
export const resilientTask = workflow(
{ name: "resilient_task" },
async ({ step }) => {
const result = await step.define(
{ name: "flaky-operation" },
async () => {
// ์คํจํ ์ ์๋ ์์
const response = await fetch("https://api.example.com");
return response.json();
}
).run();
return result;
}
);
์๋ ์๋ฌ ์ฒ๋ฆฌ
๋ณต์ฌ
export const handleErrors = workflow(
{ name: "handle_errors" },
async ({ step, logger }) => {
try {
await step.define(
{ name: "risky-operation" },
async () => {
throw new Error("Something went wrong");
}
).run();
} catch (error) {
logger.error("Operation failed", { error });
// ๋์ฒด ๋ก์ง
await step.define(
{ name: "fallback" },
async () => {
return await AlternativeService.process();
}
).run();
}
}
);
๋ฐ์ฝ๋ ์ดํฐ ์คํ์ผ (TypeScript Decorator)
ํจ์ ์คํ์ผ ๋์ ๋ฐ์ฝ๋ ์ดํฐ ์คํ์ผ๋ ์ฌ์ฉ ๊ฐ๋ฅํฉ๋๋ค:๋ณต์ฌ
@workflow({
name: "process_order",
version: "1.0"
})
async function processOrder({ input, step, logger }) {
logger.info("Processing order", { orderId: input.orderId });
const payment = await step.define(
{ name: "charge" },
async () => {
return await PaymentService.charge(input.amount);
}
).run();
return { paymentId: payment.id };
}
์ฃผ์์ฌํญ
1. ๋ฉฑ๋ฑ์ฑ
Step์ ๋ฉฑ๋ฑํด์ผ ํฉ๋๋ค. ๊ฐ์ ์ ๋ ฅ์ผ๋ก ์ฌ๋ฌ ๋ฒ ์คํํด๋ ๊ฐ์ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฅํด์ผ ํฉ๋๋ค.๋ณต์ฌ
// โ ๋์ ์: ๋ฉฑ๋ฑํ์ง ์์
await step.define(
{ name: "increment-counter" },
async () => {
counter++; // ์ฌ์๋ ์ ์ค๋ณต ์ฆ๊ฐ
}
).run();
// โ
์ข์ ์: ๋ฉฑ๋ฑํจ
await step.define(
{ name: "set-counter" },
async () => {
return await CounterModel.set(10); // ํญ์ ๊ฐ์ ๊ฐ
}
).run();
2. ๊ธด ์์ ์ step์ผ๋ก ๋๋๊ธฐ
๋ณต์ฌ
// โ ๋์ ์: ๊ฑฐ๋ํ step
await step.define(
{ name: "process-all" },
async () => {
// 1์๊ฐ ๊ฑธ๋ฆฌ๋ ์์
for (let i = 0; i < 1000000; i++) {
await processItem(i);
}
}
).run();
// โ
์ข์ ์: ์์ step๋ค
for (let i = 0; i < 1000; i += 100) {
await step.define(
{ name: `process-batch-${i}` },
async () => {
const batch = items.slice(i, i + 100);
await processBatch(batch);
}
).run();
}
3. Step ์ด๋ฆ ๊ณ ์ ์ฑ
Step ์ด๋ฆ์ ์ํฌํ๋ก์ฐ ๋ด์์ ๊ณ ์ ํด์ผ ํฉ๋๋ค.๋ณต์ฌ
// โ ๋์ ์: ์ค๋ณต๋ ์ด๋ฆ
await step.define({ name: "process" }, async () => { /* ... */ }).run();
await step.define({ name: "process" }, async () => { /* ... */ }).run(); // ์๋ฌ!
// โ
์ข์ ์: ๊ณ ์ ํ ์ด๋ฆ
await step.define({ name: "process-payment" }, async () => { /* ... */ }).run();
await step.define({ name: "process-shipping" }, async () => { /* ... */ }).run();
๋ชจ๋ํฐ๋ง
์คํ ์ํ ํ์ธ
๋ณต์ฌ
const handle = await Sonamu.workflowManager.run(
{ name: "process_order" },
{ orderId: 123 }
);
// ์ํ ํ์ธ
console.log("Status:", handle.status);
// ๊ฒฐ๊ณผ ๋๊ธฐ (์๋ฃ๋ ๋๊น์ง)
const result = await handle.result();
console.log("Result:", result);
๋ก๊ทธ ํ์ธ
์ํฌํ๋ก์ฐ๋ ์๋์ผ๋ก LogTape๋ฅผ ํตํด ๋ก๊น ๋ฉ๋๋ค:๋ณต์ฌ
[INFO] [workflow:process_order] Processing order {"orderId":123}
[DEBUG] [workflow:process_order] Step: charge-payment
[INFO] [workflow:process_order] Payment successful {"paymentId":"pay_123"}
์์ ๋ชจ์
- ์ฃผ๋ฌธ ์ฒ๋ฆฌ
- ๋ฐ์ดํฐ ๋๊ธฐํ
- ๋ณด๊ณ ์ ์์ฑ
- ๋ฆฌ๋ง์ธ๋
๋ณต์ฌ
import { workflow } from "sonamu";
import { z } from "zod";
const OrderSchema = z.object({
orderId: z.number(),
userId: z.number(),
items: z.array(z.object({
productId: z.number(),
quantity: z.number(),
price: z.number()
})),
totalAmount: z.number(),
shippingAddress: z.string()
});
export const processOrder = workflow(
{
name: "process_order",
version: "1.0",
schema: OrderSchema
},
async ({ input, step, logger }) => {
logger.info("Processing order", { orderId: input.orderId });
// Step 1: ์ฌ๊ณ ํ์ธ
const inventory = await step.define(
{ name: "check-inventory" },
async () => {
for (const item of input.items) {
const available = await InventoryModel.check(
item.productId,
item.quantity
);
if (!available) {
throw new Error(`Product ${item.productId} out of stock`);
}
}
return { available: true };
}
).run();
// Step 2: ๊ฒฐ์ ์ฒ๋ฆฌ
const payment = await step.define(
{ name: "process-payment" },
async () => {
return await PaymentService.charge({
userId: input.userId,
amount: input.totalAmount,
orderId: input.orderId
});
}
).run();
logger.info("Payment successful", { paymentId: payment.id });
// Step 3: ์ฌ๊ณ ์ฐจ๊ฐ
await step.define(
{ name: "decrease-inventory" },
async () => {
for (const item of input.items) {
await InventoryModel.decrease(item.productId, item.quantity);
}
}
).run();
// Step 4: ๋ฐฐ์ก ์์
const shipping = await step.define(
{ name: "start-shipping" },
async () => {
return await ShippingService.createShipment({
orderId: input.orderId,
address: input.shippingAddress,
items: input.items
});
}
).run();
// Step 5: ์๋ฆผ ์ ์ก
await step.define(
{ name: "send-notifications" },
async () => {
await NotificationService.send({
userId: input.userId,
type: "order_confirmed",
data: {
orderId: input.orderId,
trackingNumber: shipping.trackingNumber
}
});
}
).run();
return {
orderId: input.orderId,
paymentId: payment.id,
trackingNumber: shipping.trackingNumber,
status: "completed"
};
}
);
๋ณต์ฌ
export const syncUserData = workflow(
{
name: "sync_user_data",
schedules: [
{
name: "hourly-sync",
expression: "0 * * * *", // ๋งค์๊ฐ
input: () => ({
timestamp: new Date().toISOString()
})
}
]
},
async ({ input, step, logger }) => {
logger.info("Starting user data sync", { timestamp: input.timestamp });
// Step 1: ์ธ๋ถ API์์ ์ฌ์ฉ์ ๊ฐ์ ธ์ค๊ธฐ
const users = await step.define(
{ name: "fetch-external-users" },
async () => {
const response = await fetch("https://api.external.com/users");
return response.json();
}
).run();
logger.info("Fetched users", { count: users.length });
// Step 2: ๋ฐฐ์น๋ก ์ฒ๋ฆฌ
const batchSize = 100;
const batches = Math.ceil(users.length / batchSize);
for (let i = 0; i < batches; i++) {
await step.define(
{ name: `process-batch-${i}` },
async () => {
const batch = users.slice(i * batchSize, (i + 1) * batchSize);
for (const user of batch) {
await UserModel.upsert({
external_id: user.id,
email: user.email,
name: user.name,
synced_at: new Date()
});
}
return { processed: batch.length };
}
).run();
// ๋ฐฐ์น ๊ฐ 1์ด ๋๊ธฐ
await step.sleep(`wait-after-batch-${i}`, "1s");
}
return {
totalUsers: users.length,
batches,
timestamp: input.timestamp
};
}
);
๋ณต์ฌ
export const generateMonthlyReport = workflow(
{
name: "generate_monthly_report",
schedules: [
{
name: "first-of-month",
expression: "0 3 1 * *", // ๋งค์ 1์ผ 3์
input: () => {
const now = new Date();
const lastMonth = new Date(now.getFullYear(), now.getMonth() - 1);
return {
year: lastMonth.getFullYear(),
month: lastMonth.getMonth() + 1
};
}
}
]
},
async ({ input, step, logger }) => {
logger.info("Generating monthly report", input);
// Step 1: ์ฃผ๋ฌธ ๋ฐ์ดํฐ ์ง๊ณ
const orderStats = await step.define(
{ name: "aggregate-orders" },
async () => {
const rdb = OrderModel.getPuri("r");
return rdb.table("orders")
.whereRaw("YEAR(created_at) = ?", [input.year])
.whereRaw("MONTH(created_at) = ?", [input.month])
.select(
rdb.raw("COUNT(*) as total_orders"),
rdb.raw("SUM(total_amount) as total_revenue"),
rdb.raw("AVG(total_amount) as avg_order_value")
)
.first();
}
).run();
// Step 2: ์ฌ์ฉ์ ํต๊ณ
const userStats = await step.define(
{ name: "aggregate-users" },
async () => {
const rdb = UserModel.getPuri("r");
return rdb.table("users")
.whereRaw("YEAR(created_at) = ?", [input.year])
.whereRaw("MONTH(created_at) = ?", [input.month])
.count("* as new_users")
.first();
}
).run();
// Step 3: ์ํ ์์
const topProducts = await step.define(
{ name: "rank-products" },
async () => {
const rdb = OrderModel.getPuri("r");
return rdb.table("order_items")
.join("orders", "orders.id", "order_items.order_id")
.whereRaw("YEAR(orders.created_at) = ?", [input.year])
.whereRaw("MONTH(orders.created_at) = ?", [input.month])
.groupBy("order_items.product_id")
.select(
"order_items.product_id",
rdb.raw("SUM(order_items.quantity) as total_quantity"),
rdb.raw("SUM(order_items.price * order_items.quantity) as total_sales")
)
.orderBy("total_sales", "desc")
.limit(10);
}
).run();
// Step 4: ๋ณด๊ณ ์ ์ ์ฅ
const report = await step.define(
{ name: "save-report" },
async () => {
const wdb = ReportModel.getDB("w");
return wdb.table("reports").insert({
type: "monthly",
year: input.year,
month: input.month,
data: JSON.stringify({
orders: orderStats,
users: userStats,
products: topProducts
}),
created_at: new Date()
}).returning("*");
}
).run();
// Step 5: ๊ด๋ฆฌ์์๊ฒ ์ด๋ฉ์ผ ์ ์ก
await step.define(
{ name: "send-email" },
async () => {
await EmailService.send({
to: "[email protected]",
subject: `Monthly Report - ${input.year}/${input.month}`,
html: generateReportHTML({
orders: orderStats,
users: userStats,
products: topProducts
})
});
}
).run();
logger.info("Report generated", { reportId: report[0].id });
return {
reportId: report[0].id,
year: input.year,
month: input.month
};
}
);
๋ณต์ฌ
export const sendReminders = workflow(
{
name: "send_reminders",
schedules: [
{
name: "daily-9am",
expression: "0 9 * * *", // ๋งค์ผ ์ค์ 9์
input: () => ({
date: new Date().toISOString().split('T')[0]
})
}
]
},
async ({ input, step, logger }) => {
logger.info("Sending reminders", { date: input.date });
// Step 1: ๋ฆฌ๋ง์ธ๋ ๋์ ์กฐํ
const tasks = await step.get(
{ name: "fetch-tasks" },
TaskModel,
"findPendingReminders"
).run();
logger.info("Found tasks", { count: tasks.length });
if (tasks.length === 0) {
return { sent: 0 };
}
// Step 2: ๊ฐ ์์
์ ๋ํด ๋ฆฌ๋ง์ธ๋ ์ ์ก
for (const task of tasks) {
await step.define(
{ name: `send-reminder-${task.id}` },
async () => {
// ์๋ฆผ ์ ์ก
await NotificationService.send({
userId: task.user_id,
type: "task_reminder",
data: {
taskId: task.id,
title: task.title,
dueDate: task.due_date
}
});
// ์ํ ์
๋ฐ์ดํธ
const wdb = TaskModel.getDB("w");
await wdb.table("tasks")
.where("id", task.id)
.update({ reminder_sent: true });
return { taskId: task.id };
}
).run();
// ๊ฐ ๋ฆฌ๋ง์ธ๋ ์ฌ์ด 0.5์ด ๋๊ธฐ (rate limiting)
await step.sleep(`wait-after-${task.id}`, "500ms");
}
return { sent: tasks.length };
}
);