메인 μ½˜ν…μΈ λ‘œ κ±΄λ„ˆλ›°κΈ°
@workflow λ°μ½”λ ˆμ΄ν„°λŠ” μž₯κΈ° μ‹€ν–‰ μž‘μ—…, λ°±κ·ΈλΌμš΄λ“œ μž‘μ—…, μŠ€μΌ€μ€„λ§ μž‘μ—…μ„ μ •μ˜ν•©λ‹ˆλ‹€. OpenWorkflowλ₯Ό 기반으둜 내ꡬ성 μžˆλŠ” μ‹€ν–‰, μž¬μ‹œλ„, λͺ¨λ‹ˆν„°λ§μ„ μ œκ³΅ν•©λ‹ˆλ‹€.

κΈ°λ³Έ μ‚¬μš©λ²•

import { workflow } from "sonamu";

export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
  },
  async ({ input, step, logger }) => {
    logger.info("Processing order", { orderId: input.orderId });

    // Step 1: 결제 처리
    const payment = await step
      .define({ name: "process-payment" }, async () => {
        return await PaymentService.charge(input.amount);
      })
      .run();

    // Step 2: 재고 μ—…λ°μ΄νŠΈ
    await step
      .define({ name: "update-inventory" }, async () => {
        return await InventoryModel.decrease(input.productId, input.quantity);
      })
      .run();

    // Step 3: 배솑 μ‹œμž‘
    const shipping = await step
      .define({ name: "start-shipping" }, async () => {
        return await ShippingService.ship(input.address);
      })
      .run();

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      trackingNumber: shipping.trackingNumber,
    };
  },
);

μ„€μ • (sonamu.config.ts)

μ›Œν¬ν”Œλ‘œμš°λ₯Ό μ‚¬μš©ν•˜λ €λ©΄ PostgreSQL 섀정이 ν•„μš”ν•©λ‹ˆλ‹€.
import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    dbConfig: {
      client: "pg",
      connection: {
        host: process.env.DB_HOST,
        port: 5432,
        database: process.env.DB_NAME,
        user: process.env.DB_USER,
        password: process.env.DB_PASSWORD,
      },
    },
    worker: {
      concurrency: 4, // λ™μ‹œ μ‹€ν–‰ 수
      usePubSub: true, // Pub/Sub μ‚¬μš©
      listenDelay: 500, // μˆ˜μ‹  μ§€μ—° (ms)
    },
  },
});

μ˜΅μ…˜

name

μ›Œν¬ν”Œλ‘œμš° 이름을 μ§€μ •ν•©λ‹ˆλ‹€. κΈ°λ³Έκ°’: ν•¨μˆ˜ 이름을 snake_case둜 λ³€ν™˜
// λͺ…μ‹œμ  이름
export const myTask = workflow({ name: "process_order" }, async ({ input }) => {
  /* ... */
});

// μžλ™ 이름 (ν•¨μˆ˜ 이름 μ‚¬μš©)
export const processOrder = workflow(
  {}, // name μƒλž΅ β†’ "process_order"
  async ({ input }) => {
    /* ... */
  },
);

version

μ›Œν¬ν”Œλ‘œμš° 버전을 μ§€μ •ν•©λ‹ˆλ‹€. κΈ°λ³Έκ°’: null
export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
  },
  async ({ input }) => {
    /* ... */
  },
);

// 버전 μ—…λ°μ΄νŠΈ μ‹œ
export const processOrderV2 = workflow(
  {
    name: "process_order",
    version: "2.0",
  },
  async ({ input }) => {
    /* ... */
  },
);
같은 이름에 λ‹€λ₯Έ λ²„μ „μ˜ μ›Œν¬ν”Œλ‘œμš°λ₯Ό λ³‘λ ¬λ‘œ μ‹€ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

schema

μž…λ ₯ λ°μ΄ν„°μ˜ Zod μŠ€ν‚€λ§ˆλ₯Ό μ •μ˜ν•©λ‹ˆλ‹€.
import { z } from "zod";

const OrderInputSchema = z.object({
  orderId: z.number(),
  productId: z.number(),
  quantity: z.number(),
  amount: z.number(),
  address: z.string(),
});

export const processOrder = workflow(
  {
    name: "process_order",
    schema: OrderInputSchema,
  },
  async ({ input }) => {
    // input은 μžλ™μœΌλ‘œ νƒ€μž… 검증됨
    input.orderId; // number
    input.address; // string
  },
);

schedules

Cron ν‘œν˜„μ‹μœΌλ‘œ μŠ€μΌ€μ€„μ„ μ •μ˜ν•©λ‹ˆλ‹€.
export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily-at-midnight",
        expression: "0 0 * * *", // 맀일 μžμ •
        input: { date: new Date().toISOString().split("T")[0] },
      },
    ],
  },
  async ({ input, logger }) => {
    logger.info("Generating daily report", { date: input.date });
    // λ³΄κ³ μ„œ 생성
  },
);

// 동적 input
export const weeklyBackup = workflow(
  {
    name: "weekly_backup",
    schedules: [
      {
        name: "weekly-sunday",
        expression: "0 2 * * 0", // λ§€μ£Ό μΌμš”μΌ 2μ‹œ
        input: () => ({
          timestamp: new Date().toISOString(),
          backupPath: `/backups/${Date.now()}`,
        }),
      },
    ],
  },
  async ({ input }) => {
    // λ°±μ—… μˆ˜ν–‰
  },
);
Cron ν‘œν˜„μ‹ μ˜ˆμ‹œ:
* * * * *    - λ§€λΆ„
0 * * * *    - λ§€μ‹œκ°„
0 0 * * *    - 맀일 μžμ •
0 0 * * 0    - λ§€μ£Ό μΌμš”μΌ μžμ •
0 0 1 * *    - λ§€μ›” 1일 μžμ •
0 9 * * 1-5  - 평일 μ˜€μ „ 9μ‹œ
*/5 * * * *  - 5λΆ„λ§ˆλ‹€

μ›Œν¬ν”Œλ‘œμš° ν•¨μˆ˜

μ›Œν¬ν”Œλ‘œμš° ν•¨μˆ˜λŠ” 4개의 λ§€κ°œλ³€μˆ˜λ₯Ό λ°›μŠ΅λ‹ˆλ‹€:
async function workflowFn({ input, step, logger, version }) {
  // input: μž…λ ₯ 데이터
  // step: Step API
  // logger: LogTape Logger
  // version: μ›Œν¬ν”Œλ‘œμš° 버전
}

input

μ›Œν¬ν”Œλ‘œμš° μ‹€ν–‰ μ‹œ μ „λ‹¬λœ μž…λ ₯ λ°μ΄ν„°μž…λ‹ˆλ‹€.
export const processOrder = workflow({ name: "process_order" }, async ({ input }) => {
  console.log("Order ID:", input.orderId);
  console.log("Amount:", input.amount);
});

// μ‹€ν–‰
await WorkflowManager.run({ name: "process_order" }, { orderId: 123, amount: 50000 });

step

Step APIλŠ” μ›Œν¬ν”Œλ‘œμš°μ˜ 각 단계λ₯Ό μ •μ˜ν•©λ‹ˆλ‹€. 각 step은 λ…λ¦½μ μœΌλ‘œ μž¬μ‹œλ„λ˜κ³  볡ꡬ될 수 μžˆμŠ΅λ‹ˆλ‹€.
export const processOrder = workflow({ name: "process_order" }, async ({ step }) => {
  // Step 1
  const payment = await step
    .define({ name: "charge-payment" }, async () => {
      return await PaymentService.charge(100);
    })
    .run();

  // Step 2
  const inventory = await step
    .define({ name: "update-inventory" }, async () => {
      return await InventoryModel.decrease(1);
    })
    .run();

  // Step 3
  await step
    .define({ name: "send-email" }, async () => {
      return await EmailService.send({
        to: "user@example.com",
        subject: "Order confirmed",
      });
    })
    .run();

  return { success: true };
});
Step의 μž₯점:
  • 각 step은 ν•œ 번만 싀행됨 (λ©±λ“±μ„±)
  • μ‹€νŒ¨ μ‹œ ν•΄λ‹Ή step만 μž¬μ‹œλ„
  • 이미 μ™„λ£Œλœ step은 κ±΄λ„ˆλœ€

logger

LogTape Logger μΈμŠ€ν„΄μŠ€μž…λ‹ˆλ‹€.
export const processData = workflow({ name: "process_data" }, async ({ logger }) => {
  logger.info("Starting workflow");
  logger.debug("Processing item", { itemId: 123 });
  logger.warn("High memory usage", { usage: "80%" });
  logger.error("Failed to process", { error: "Connection timeout" });
});

version

ν˜„μž¬ μ›Œν¬ν”Œλ‘œμš°μ˜ λ²„μ „μž…λ‹ˆλ‹€.
export const processOrder = workflow(
  {
    name: "process_order",
    version: "2.0",
  },
  async ({ version }) => {
    console.log("Running version:", version); // "2.0"
  },
);

μ›Œν¬ν”Œλ‘œμš° μ‹€ν–‰

μˆ˜λ™ μ‹€ν–‰

// WorkflowManager μ‚¬μš©
import { Sonamu } from "sonamu";

const handle = await Sonamu.workflows.run(
  {
    name: "process_order",
    version: "1.0",
  },
  {
    orderId: 123,
    amount: 50000,
  },
);

// κ²°κ³Ό λŒ€κΈ°
const result = await handle.result();
console.log("Result:", result);

μŠ€μΌ€μ€„ μ‹€ν–‰

export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily-at-midnight",
        expression: "0 0 * * *",
        input: { date: new Date().toISOString().split("T")[0] },
      },
    ],
  },
  async ({ input, logger }) => {
    logger.info("Generating report", { date: input.date });
    // μžλ™μœΌλ‘œ 맀일 μžμ • 싀행됨
  },
);

Step API 상세

step.define().run()

ν•¨μˆ˜λ₯Ό step으둜 κ°μ‹Έμ„œ μ‹€ν–‰ν•©λ‹ˆλ‹€.
const result = await step
  .define({ name: "step-name" }, async () => {
    // μž‘μ—… μˆ˜ν–‰
    return { data: "result" };
  })
  .run();
μ‚¬μš© μ˜ˆμ‹œ:
export const processOrder = workflow({ name: "process_order" }, async ({ step }) => {
  // 결제 처리
  const payment = await step
    .define({ name: "process-payment" }, async () => {
      return await PaymentService.charge(100);
    })
    .run();

  // 재고 μ—…λ°μ΄νŠΈ
  await step
    .define({ name: "update-inventory" }, async () => {
      return await InventoryModel.decrease(1);
    })
    .run();

  return { paymentId: payment.id };
});

step.get().run()

Model λ©”μ„œλ“œλ₯Ό step으둜 μ‹€ν–‰ν•©λ‹ˆλ‹€.
// name λͺ…μ‹œ
const result = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 10 });

// name μƒλž΅ (λ©”μ„œλ“œ 이름 μžλ™ μ‚¬μš©: "list")
const result = await step.get(UserModel, "list").run({ limit: 10 });
μ‚¬μš© μ˜ˆμ‹œ:
export const processUsers = workflow({ name: "process_users" }, async ({ step }) => {
  // Model λ©”μ„œλ“œλ₯Ό step으둜 μ‹€ν–‰
  const users = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 100 });

  // 각 μ‚¬μš©μž 처리
  for (const user of users) {
    await step
      .define({ name: `process-user-${user.id}` }, async () => {
        return await UserModel.updateStatus(user.id, "processed");
      })
      .run();
  }

  return { processed: users.length };
});

step.sleep()

μ§€μ •λœ μ‹œκ°„ λ™μ•ˆ λŒ€κΈ°ν•©λ‹ˆλ‹€.
await step.sleep("wait-1-hour", "1h");
await step.sleep("wait-30-minutes", "30m");
await step.sleep("wait-10-seconds", "10s");
μ‹œκ°„ ν˜•μ‹: "10s", "5m", "1h", "1d" μ‚¬μš© μ˜ˆμ‹œ:
export const delayedTask = workflow({ name: "delayed_task" }, async ({ step, logger }) => {
  logger.info("Starting task");

  // 1μ‹œκ°„ λŒ€κΈ°
  await step.sleep("wait-1-hour", "1h");

  logger.info("Continuing after 1 hour");

  // 30λΆ„ λŒ€κΈ°
  await step.sleep("wait-30-minutes", "30m");

  logger.info("Task complete");
});

μž¬μ‹œλ„ μ •μ±…

Step의 μž¬μ‹œλ„λŠ” OpenWorkflow λ°±μ—”λ“œμ—μ„œ μžλ™μœΌλ‘œ μ²˜λ¦¬λ©λ‹ˆλ‹€. λ³„λ„μ˜ retry μ˜΅μ…˜ 섀정이 ν•„μš”ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.
export const resilientTask = workflow({ name: "resilient_task" }, async ({ step }) => {
  // Step이 μ‹€νŒ¨ν•˜λ©΄ μžλ™μœΌλ‘œ μž¬μ‹œλ„λ¨
  const result = await step
    .define({ name: "flaky-operation" }, async () => {
      const response = await fetch("https://api.example.com");
      return response.json();
    })
    .run();

  return result;
});

λ°μ΄ν„°λ² μ΄μŠ€ μ ‘κ·Ό

μ›Œν¬ν”Œλ‘œμš° λ‚΄μ—μ„œ Sonamu Model을 자유둭게 μ‚¬μš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
export const processUsers = workflow({ name: "process_users" }, async ({ step, logger }) => {
  // Step 1: μ‚¬μš©μž 쑰회
  const users = await step.get({ name: "fetch-users" }, UserModel, "list").run({ limit: 100 });

  logger.info("Found users", { count: users.length });

  // Step 2: 각 μ‚¬μš©μž 처리
  for (const user of users) {
    await step
      .define({ name: `process-user-${user.id}` }, async () => {
        return await UserModel.processUser(user.id);
      })
      .run();
  }

  return { processed: users.length };
});

μ—λŸ¬ 처리

μžλ™ μž¬μ‹œλ„

Step이 μ‹€νŒ¨ν•˜λ©΄ μžλ™μœΌλ‘œ μž¬μ‹œλ„λ©λ‹ˆλ‹€.
export const resilientTask = workflow({ name: "resilient_task" }, async ({ step }) => {
  const result = await step
    .define({ name: "flaky-operation" }, async () => {
      // μ‹€νŒ¨ν•  수 μžˆλŠ” μž‘μ—…
      const response = await fetch("https://api.example.com");
      return response.json();
    })
    .run();

  return result;
});

μˆ˜λ™ μ—λŸ¬ 처리

export const handleErrors = workflow({ name: "handle_errors" }, async ({ step, logger }) => {
  try {
    await step
      .define({ name: "risky-operation" }, async () => {
        throw new Error("Something went wrong");
      })
      .run();
  } catch (error) {
    logger.error("Operation failed", { error });

    // λŒ€μ²΄ 둜직
    await step
      .define({ name: "fallback" }, async () => {
        return await AlternativeService.process();
      })
      .run();
  }
});

λ°μ½”λ ˆμ΄ν„° μŠ€νƒ€μΌ (TypeScript Decorator)

ν•¨μˆ˜ μŠ€νƒ€μΌ λŒ€μ‹  λ°μ½”λ ˆμ΄ν„° μŠ€νƒ€μΌλ„ μ‚¬μš© κ°€λŠ₯ν•©λ‹ˆλ‹€:
@workflow({
  name: "process_order",
  version: "1.0"
})
async function processOrder({ input, step, logger }) {
  logger.info("Processing order", { orderId: input.orderId });

  const payment = await step.define(
    { name: "charge" },
    async () => {
      return await PaymentService.charge(input.amount);
    }
  ).run();

  return { paymentId: payment.id };
}

μ£Όμ˜μ‚¬ν•­

1. λ©±λ“±μ„±

Step은 λ©±λ“±ν•΄μ•Ό ν•©λ‹ˆλ‹€. 같은 μž…λ ₯으둜 μ—¬λŸ¬ 번 싀행해도 같은 κ²°κ³Όλ₯Ό 보μž₯ν•΄μ•Ό ν•©λ‹ˆλ‹€.
// ❌ λ‚˜μœ 예: λ©±λ“±ν•˜μ§€ μ•ŠμŒ
await step
  .define({ name: "increment-counter" }, async () => {
    counter++; // μž¬μ‹œλ„ μ‹œ 쀑볡 증가
  })
  .run();

// βœ… 쒋은 예: 멱등함
await step
  .define({ name: "set-counter" }, async () => {
    return await CounterModel.set(10); // 항상 같은 κ°’
  })
  .run();

2. κΈ΄ μž‘μ—…μ€ step으둜 λ‚˜λˆ„κΈ°

// ❌ λ‚˜μœ 예: κ±°λŒ€ν•œ step
await step
  .define({ name: "process-all" }, async () => {
    // 1μ‹œκ°„ κ±Έλ¦¬λŠ” μž‘μ—…
    for (let i = 0; i < 1000000; i++) {
      await processItem(i);
    }
  })
  .run();

// βœ… 쒋은 예: μž‘μ€ stepλ“€
for (let i = 0; i < 1000; i += 100) {
  await step
    .define({ name: `process-batch-${i}` }, async () => {
      const batch = items.slice(i, i + 100);
      await processBatch(batch);
    })
    .run();
}

3. Step 이름 κ³ μœ μ„±

Step 이름은 μ›Œν¬ν”Œλ‘œμš° λ‚΄μ—μ„œ κ³ μœ ν•΄μ•Ό ν•©λ‹ˆλ‹€.
// ❌ λ‚˜μœ 예: μ€‘λ³΅λœ 이름
await step
  .define({ name: "process" }, async () => {
    /* ... */
  })
  .run();
await step
  .define({ name: "process" }, async () => {
    /* ... */
  })
  .run(); // μ—λŸ¬!

// βœ… 쒋은 예: κ³ μœ ν•œ 이름
await step
  .define({ name: "process-payment" }, async () => {
    /* ... */
  })
  .run();
await step
  .define({ name: "process-shipping" }, async () => {
    /* ... */
  })
  .run();

λͺ¨λ‹ˆν„°λ§

μ‹€ν–‰ μƒνƒœ 확인

const handle = await Sonamu.workflows.run({ name: "process_order" }, { orderId: 123 });

// μƒνƒœ 확인
console.log("Status:", handle.status);

// κ²°κ³Ό λŒ€κΈ° (μ™„λ£Œλ  λ•ŒκΉŒμ§€)
const result = await handle.result();
console.log("Result:", result);

둜그 확인

μ›Œν¬ν”Œλ‘œμš°λŠ” μžλ™μœΌλ‘œ LogTapeλ₯Ό 톡해 λ‘œκΉ…λ©λ‹ˆλ‹€:
[INFO] [workflow:process_order] Processing order {"orderId":123}
[DEBUG] [workflow:process_order] Step: charge-payment
[INFO] [workflow:process_order] Payment successful {"paymentId":"pay_123"}

μ˜ˆμ‹œ λͺ¨μŒ

import { workflow } from "sonamu";
import { z } from "zod";

const OrderSchema = z.object({
  orderId: z.number(),
  userId: z.number(),
  items: z.array(z.object({
    productId: z.number(),
    quantity: z.number(),
    price: z.number()
  })),
  totalAmount: z.number(),
  shippingAddress: z.string()
});

export const processOrder = workflow(
  {
    name: "process_order",
    version: "1.0",
    schema: OrderSchema
  },
  async ({ input, step, logger }) => {
    logger.info("Processing order", { orderId: input.orderId });

    // Step 1: 재고 확인
    const inventory = await step.define(
      { name: "check-inventory" },
      async () => {
        for (const item of input.items) {
          const available = await InventoryModel.check(
            item.productId,
            item.quantity
          );
          if (!available) {
            throw new Error(`Product ${item.productId} out of stock`);
          }
        }
        return { available: true };
      }
    ).run();

    // Step 2: 결제 처리
    const payment = await step.define(
      { name: "process-payment" },
      async () => {
        return await PaymentService.charge({
          userId: input.userId,
          amount: input.totalAmount,
          orderId: input.orderId
        });
      }
    ).run();

    logger.info("Payment successful", { paymentId: payment.id });

    // Step 3: 재고 차감
    await step.define(
      { name: "decrease-inventory" },
      async () => {
        for (const item of input.items) {
          await InventoryModel.decrease(item.productId, item.quantity);
        }
      }
    ).run();

    // Step 4: 배솑 μ‹œμž‘
    const shipping = await step.define(
      { name: "start-shipping" },
      async () => {
        return await ShippingService.createShipment({
          orderId: input.orderId,
          address: input.shippingAddress,
          items: input.items
        });
      }
    ).run();

    // Step 5: μ•Œλ¦Ό 전솑
    await step.define(
      { name: "send-notifications" },
      async () => {
        await NotificationService.send({
          userId: input.userId,
          type: "order_confirmed",
          data: {
            orderId: input.orderId,
            trackingNumber: shipping.trackingNumber
          }
        });
      }
    ).run();

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      trackingNumber: shipping.trackingNumber,
      status: "completed"
    };
  }
);

λ‹€μŒ 단계

@api

API μ—”λ“œν¬μΈνŠΈ λ§Œλ“€κΈ°

@transactional

νŠΈλžœμž­μ…˜ μ‚¬μš©ν•˜κΈ°

μŠ€μΌ€μ€„λ§

μž‘μ—… μŠ€μΌ€μ€„λ§ κ°€μ΄λ“œ

OpenWorkflow

OpenWorkflow λ¬Έμ„œ