Step์ Workflow๋ฅผ ์์ ์คํ ๋จ์๋ก ๋๋๋ ํต์ฌ ๊ฐ๋
์
๋๋ค. ๊ฐ Step์ ๋
๋ฆฝ์ ์ผ๋ก ์คํ๋๊ณ ์คํจ ์ ํด๋น Step๋ถํฐ ์ฌ์๋ํ ์ ์์ด, ๊ธด ์์
์ ์์ ํ๊ฒ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
Step์ด ํ์ํ ์ด์
Workflow์์ Step์ ์ฌ์ฉํ์ง ์์ผ๋ฉด, ์์
์ค๊ฐ์ ์คํจํ์ ๋ ์ฒ์๋ถํฐ ๋ค์ ์คํํด์ผ ํฉ๋๋ค. Step์ ์ฌ์ฉํ๋ฉด ์คํจํ ์ง์ ๋ถํฐ ์ฌ๊ฐํ ์ ์์ต๋๋ค.
// โ Step ์์ด: ์ค๊ฐ์ ์คํจํ๋ฉด ์ฒ์๋ถํฐ
export const processOrder = workflow(
{ name: "process_order" },
async ({ input }) => {
await validatePayment(input.orderId); // 1. ๊ฒฐ์ ๊ฒ์ฆ
await updateInventory(input.items); // 2. ์ฌ๊ณ ์
๋ฐ์ดํธ
await sendConfirmationEmail(input.email); // 3. ์ด๋ฉ์ผ ๋ฐ์ก
// 3๋ฒ์์ ์คํจํ๋ฉด? โ 1๋ฒ๋ถํฐ ๋ค์ ์คํ
}
);
// โ
Step์ผ๋ก ๋๋๊ธฐ: ์คํจํ step๋ถํฐ ์ฌ์๋
export const processOrder = workflow(
{ name: "process_order" },
async ({ input, step }) => {
await step.define({ name: "validate_payment" }, async () => {
await validatePayment(input.orderId);
}).run();
await step.define({ name: "update_inventory" }, async () => {
await updateInventory(input.items);
}).run();
await step.define({ name: "send_email" }, async () => {
await sendConfirmationEmail(input.email);
}).run();
// 3๋ฒ์์ ์คํจํ๋ฉด? โ 3๋ฒ๋ถํฐ ์ฌ์๋
}
);
Step ์ฌ์ฉ์ ์ฅ์ :
- ๋ถ๋ถ ์ฌ์๋: ์คํจํ step๋ถํฐ ๋ค์ ์คํ
- ์งํ ์ํฉ ์ถ์ : ๊ฐ step์ ์คํ ์๊ฐ๊ณผ ์ํ ํ์ธ
- ๋๋ฒ๊น
์ฉ์ด: ์ด๋ step์์ ์คํจํ๋์ง ๋ช
ํํ ํ์
step.define()
step.define()์ ํจ์๋ฅผ step์ผ๋ก ๊ฐ์ธ๋ ๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ์
๋๋ค. ๋น๋๊ธฐ ํจ์๋ฅผ step์ผ๋ก ๋ง๋ค๋ฉด ์คํ ์ด๋ ฅ์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅ๋ฉ๋๋ค.
export const sendEmail = workflow(
{ name: "send_email" },
async ({ input, step }) => {
// Step ์ ์ ๋ฐ ์คํ
await step.define(
{ name: "send" },
async () => {
await emailService.send({
to: input.email,
subject: "Hello",
body: "...",
});
}
).run();
}
);
ํต์ฌ ๊ฐ๋
:
name: Step ์๋ณ์ (workflow ๋ด์์ ๊ณ ์ ํด์ผ ํจ)
- ๋น๋๊ธฐ ํจ์: ์ค์ ์์
์ ์ํํ๋ ์ฝ๋
.run(): Step์ ์คํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํ
๋ฐํ๊ฐ ์ฌ์ฉ
Step์ ๋ฐํ๊ฐ์ ๋ค์ step์์ ์ฌ์ฉํ ์ ์์ต๋๋ค. ๊ฐ step์ ๋
๋ฆฝ์ ์ผ๋ก ์ ์ฅ๋์ง๋ง, ๋ฐ์ดํฐ๋ ๋ฉ๋ชจ๋ฆฌ์์ ์ ๋ฌ๋ฉ๋๋ค.
export const processData = workflow(
{ name: "process_data" },
async ({ input, step }) => {
// Step 1: ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
const data = await step.define(
{ name: "fetch_data" },
async () => {
return await fetchDataFromAPI(input.userId);
}
).run();
// Step 2: ๋ฐ์ดํฐ ๋ณํ (์ด์ step ๊ฒฐ๊ณผ ์ฌ์ฉ)
const transformed = await step.define(
{ name: "transform" },
async () => {
return transformData(data);
}
).run();
// Step 3: ์ ์ฅ
await step.define(
{ name: "save" },
async () => {
await saveToDatabase(transformed);
}
).run();
return { count: transformed.length };
}
);
๋ฐ์ดํฐ ํ๋ฆ:
fetch_data step์ด data ๋ฐํ
- data๊ฐ ๋ฉ๋ชจ๋ฆฌ์์
transform step์ผ๋ก ์ ๋ฌ
- transformed๊ฐ ๋ฉ๋ชจ๋ฆฌ์์
save step์ผ๋ก ์ ๋ฌ
Step ์ฌ์๋ ์์๋ ์ด์ step์ ๊ฒฐ๊ณผ๊ฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋ณต์๋์ด ์ฌ์ฉ๋ฉ๋๋ค.
step.get()
step.get()์ Model ๋ฉ์๋๋ฅผ step์ผ๋ก ์คํํ๋ ํธ๋ฆฌํ ๋ฐฉ๋ฒ์
๋๋ค. ๋ณ๋๋ก ํจ์๋ฅผ ๊ฐ์ธ์ง ์์๋ ๋์ด ์ฝ๋๊ฐ ๊ฐ๊ฒฐํด์ง๋๋ค.
import { BaseModel, workflow } from "sonamu";
class UserModelClass extends BaseModel {
async sendWelcomeEmail(userId: number) {
const user = await this.findById(userId);
await emailService.send({
to: user.email,
subject: "Welcome!",
body: "...",
});
}
}
export const onboardUser = workflow(
{ name: "onboard_user" },
async ({ input, step }) => {
// Model ๋ฉ์๋๋ฅผ step์ผ๋ก ์คํ
await step.get(
UserModel,
"sendWelcomeEmail"
).run(input.userId);
}
);
์ฌ์ฉ ์๋๋ฆฌ์ค:
- Model์ ๋น์ฆ๋์ค ๋ก์ง์ ์ฌ์ฌ์ฉ
- ์ฝ๋ ์ค๋ณต ์ ๊ฑฐ
- ํ์
์์ ์ฑ ์ ์ง
์ปค์คํ
์ด๋ฆ ์ง์
Step ์ด๋ฆ์ ๋ช
์์ ์ผ๋ก ์ง์ ํ์ฌ ๋ก๊ทธ๋ฅผ ์ฝ๊ธฐ ์ฝ๊ฒ ๋ง๋ค ์ ์์ต๋๋ค.
await step.get(
{ name: "send_email" }, // ์ปค์คํ
์ด๋ฆ
UserModel,
"sendWelcomeEmail"
).run(userId);
๋ฐํ๊ฐ ํ์ฉ
Model ๋ฉ์๋์ ๋ฐํ๊ฐ๋ ๋์ผํ๊ฒ ์ฌ์ฉํ ์ ์์ต๋๋ค.
const user = await step.get(
UserModel,
"findById"
).run(userId);
console.log(user.email);
step.sleep()
step.sleep()์ ์ผ์ ์๊ฐ ๋๊ธฐํ๋ step์
๋๋ค. ์ฌ์๋ ๊ฐ ์ง์ฐ, API rate limit ํํผ, ์ ๊ธฐ ํด๋ง ๋ฑ์ ํ์ฉํฉ๋๋ค.
export const retryWithDelay = workflow(
{ name: "retry_with_delay" },
async ({ input, step, logger }) => {
for (let i = 0; i < 3; i++) {
try {
await step.define(
{ name: `attempt_${i + 1}` },
async () => {
await unreliableAPICall(input.data);
}
).run();
break; // ์ฑ๊ณต
} catch (error) {
logger.warn(`Attempt ${i + 1} failed`, { error });
if (i < 2) {
// ์ฌ์๋ ์ ๋๊ธฐ
await step.sleep("retry_delay", "5s");
} else {
throw error; // ๋ง์ง๋ง ์๋ ์คํจ
}
}
}
}
);
์ง์ํ๋ ์๊ฐ ํ์:
| ํ์ | ์ค๋ช
| ์์ |
|---|
s | ์ด | "5s" = 5์ด |
m | ๋ถ | "10m" = 10๋ถ |
h | ์๊ฐ | "2h" = 2์๊ฐ |
d | ์ผ | "1d" = 1์ผ |
์ค์ ํ์ฉ:
- ์ฌ์๋ ์ง์ฐ: ์คํจ ํ 5์ด ๋๊ธฐ
- Rate limiting: API ํธ์ถ ๊ฐ 1์ด ๋๊ธฐ
- ๋ฐฐ์น ๊ฐ๊ฒฉ: 100๊ฐ์ฉ ์ฒ๋ฆฌ ํ 10์ด ํด์
step.sleep()๋ ํ๋์ step์ด๋ฏ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ธฐ๋ก๋ฉ๋๋ค. ์ฌ์๋ ์ sleep์ ๋ค์ ์คํํ์ง ์์ต๋๋ค.
์ค์ ์์
1. ๋ฐฐ์น ์ฒ๋ฆฌ
๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋๋ ์์ ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌํฉ๋๋ค. ๊ฐ ๋ฐฐ์น๋ฅผ ๋ณ๋ step์ผ๋ก ๋ง๋ค๋ฉด ์ค๊ฐ์ ์คํจํด๋ ์ฒ๋ฆฌ๋ ๋ฐฐ์น๋ ์ฌ์คํํ์ง ์์ต๋๋ค.
export const processBatch = workflow(
{ name: "process_batch" },
async ({ input, step, logger }) => {
const batchSize = 100;
const total = input.items.length;
for (let i = 0; i < total; i += batchSize) {
const batch = input.items.slice(i, i + batchSize);
await step.define(
{ name: `process_batch_${i / batchSize}` },
async () => {
await Promise.all(
batch.map(item => processItem(item))
);
}
).run();
logger.info("Batch processed", {
batch: i / batchSize,
progress: `${Math.min(i + batchSize, total)}/${total}`,
});
}
return { processed: total };
}
);
ํต์ฌ ํฌ์ธํธ:
- 100๊ฐ์ฉ ๋ฐฐ์น๋ก ๋๋์ด ์ฒ๋ฆฌ
- ๊ฐ ๋ฐฐ์น๊ฐ ๋
๋ฆฝ์ ์ธ step
- ์งํ๋ฅ ์ ๋ก๊ทธ๋ก ๊ธฐ๋ก
2. ์ธ๋ถ API ํธ์ถ
์ธ๋ถ API ํธ์ถ์ ๋คํธ์ํฌ ๋ฌธ์ ๋ก ์คํจํ ์ ์์ต๋๋ค. Step์ผ๋ก ๋๋๋ฉด ์คํจํ API ํธ์ถ๋ง ์ฌ์๋ํ ์ ์์ต๋๋ค.
export const syncData = workflow(
{ name: "sync_data" },
async ({ input, step, logger }) => {
// Step 1: API์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
const externalData = await step.define(
{ name: "fetch_from_api" },
async () => {
const response = await fetch(
`https://api.example.com/data?userId=${input.userId}`
);
return await response.json();
}
).run();
// Step 2: ๋ฐ์ดํฐ ์ ๊ทํ
const normalized = await step.define(
{ name: "normalize_data" },
async () => {
return normalizeData(externalData);
}
).run();
// Step 3: DB์ ์ ์ฅ
await step.get(
{ name: "save_to_db" },
DataModel,
"saveMany"
).run(normalized);
logger.info("Sync completed", { count: normalized.length });
}
);
์ฅ์ :
- API ํธ์ถ ์คํจ ์ ๋ฐ์ดํฐ ์ ๊ทํ๋ ์คํต
- ์ ๊ทํ ์คํจ ์ API ์ฌํธ์ถ ์ํจ
- ๊ฐ ๋จ๊ณ์ ์คํ ์๊ฐ ์ธก์ ๊ฐ๋ฅ
3. ํ์ผ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ
ํ์ผ์ ๋ค์ด๋ก๋ํ๊ณ ํ์ฑํ์ฌ ์ ์ฅํ๋ ํ์ดํ๋ผ์ธ์
๋๋ค. ๊ฐ ๋จ๊ณ๋ฅผ step์ผ๋ก ๋๋๋ฉด ๋๋ฒ๊น
๊ณผ ์ฌ์๋๊ฐ ์ฌ์์ง๋๋ค.
export const processUploadedFile = workflow(
{ name: "process_uploaded_file" },
async ({ input, step, logger }) => {
// Step 1: ํ์ผ ๋ค์ด๋ก๋
const fileContent = await step.define(
{ name: "download_file" },
async () => {
return await downloadFromS3(input.fileKey);
}
).run();
// Step 2: ํ์ฑ
const parsed = await step.define(
{ name: "parse_file" },
async () => {
return parseCSV(fileContent);
}
).run();
// Step 3: ๊ฒ์ฆ
const validated = await step.define(
{ name: "validate" },
async () => {
return validateData(parsed);
}
).run();
// Step 4: DB์ ์ ์ฅ
await step.define(
{ name: "save_to_db" },
async () => {
await bulkInsert(validated);
}
).run();
return { imported: validated.length };
}
);
ํ์ดํ๋ผ์ธ ๊ตฌ์กฐ:
- ๋ค์ด๋ก๋ โ 2. ํ์ฑ โ 3. ๊ฒ์ฆ โ 4. ์ ์ฅ
- ๊ฐ ๋จ๊ณ๊ฐ ์คํจํ๋ฉด ํด๋น step๋ถํฐ ์ฌ์๋
- ํ์ผ์ ํ ๋ฒ๋ง ๋ค์ด๋ก๋
4. Rate Limiting์ด ์๋ API
API์ rate limit์ด ์์ ๋๋ ์์ฒญ ๊ฐ ์ง์ฐ์ ์ถ๊ฐํฉ๋๋ค. step.sleep()์ ์ฌ์ฉํ๋ฉด ์ฌ์๋ ์ ์ง์ฐ์ด ๋ฐ๋ณต๋์ง ์์ต๋๋ค.
export const sendNotifications = workflow(
{ name: "send_notifications" },
async ({ input, step, logger }) => {
const users = input.userIds;
for (const userId of users) {
// ๊ฐ ์ฌ์ฉ์์๊ฒ ์๋ฆผ
await step.define(
{ name: `notify_user_${userId}` },
async () => {
const user = await UserModel.findById(userId);
await sendPushNotification(user.deviceToken, input.message);
}
).run();
// ์ฌ์ฉ์ ๊ฐ ๋๋ ์ด (API ์ ํ ํํผ)
if (userId !== users[users.length - 1]) {
await step.sleep("rate_limit_delay", "100ms");
}
}
return { sent: users.length };
}
);
Rate limiting ์ ๋ต:
- ๊ฐ ์๋ฆผ ์ฌ์ด์ 100ms ๋๊ธฐ
- API ์๋ฒ ๊ณผ๋ถํ ๋ฐฉ์ง
- ์ฌ์๋ ์ ์ด๋ฏธ ์ฑ๊ณตํ ์๋ฆผ์ ์คํต
Step ๋ช
๋ช
๊ท์น
๊ณ ์ ํ ์ด๋ฆ ์ฌ์ฉ
๊ฐ์ workflow ๋ด์์ step ์ด๋ฆ์ ๊ณ ์ ํด์ผ ํฉ๋๋ค. ์ค๋ณต๋ ์ด๋ฆ์ ์ฌ์ฉํ๋ฉด ๋ง์ง๋ง step๋ง ์คํ๋ฉ๋๋ค.
// โ ๊ฐ์ ์ด๋ฆ (๋ง์ง๋ง ๊ฒ๋ง ์คํ๋จ)
await step.define({ name: "process" }, async () => { ... }).run();
await step.define({ name: "process" }, async () => { ... }).run();
// โ
๋ค๋ฅธ ์ด๋ฆ
await step.define({ name: "process_payment" }, async () => { ... }).run();
await step.define({ name: "process_shipping" }, async () => { ... }).run();
๋ฐ๋ณต๋ฌธ์์ ๋์ ์ด๋ฆ
๋ฐ๋ณต๋ฌธ์์ step์ ์์ฑํ ๋๋ ์ธ๋ฑ์ค๋ฅผ ํฌํจํ์ฌ ์ด๋ฆ์ ๊ณ ์ ํ๊ฒ ๋ง๋ญ๋๋ค.
// ๋ฐ๋ณต๋ฌธ์์
for (let i = 0; i < items.length; i++) {
await step.define(
{ name: `process_item_${i}` }, // ๊ฐ step ๋ค๋ฅธ ์ด๋ฆ
async () => {
await processItem(items[i]);
}
).run();
}
๋ช
๋ช
ํ:
- ์์
๋ด์ฉ์ ๋ํ๋ด๋ ๋ช
์ฌ ์ฌ์ฉ:
fetch_user, send_email
- ์ธ๋ฑ์ค ํฌํจ:
process_batch_0, process_batch_1
- Snake case ๊ถ์ฅ:
send_welcome_email
์ฃผ์์ฌํญ
Step ์ฌ์ฉ ์ ์ฃผ์์ฌํญ:
-
๊ณ ์ ํ ์ด๋ฆ: Step ์ด๋ฆ์ workflow ๋ด์์ ๊ณ ์ ํด์ผ ํฉ๋๋ค.
{ name: "step_1" } // โ
{ name: "step_1" } // โ ์ค๋ณต
-
์คํ ์์: Step์ ์ฝ๋์ ์์ฑ๋ ์์๋๋ก ์คํ๋ฉ๋๋ค.
await step.define({ name: "first" }, ...).run();
await step.define({ name: "second" }, ...).run();
-
์๋ฌ ์ ํ: Step์ด ์คํจํ๋ฉด workflow๊ฐ ์ค๋จ๋ฉ๋๋ค. ์ ํ์ ์์
์ try-catch๋ก ๊ฐ์ธ์ธ์.
try {
await step.define({ name: "optional" }, ...).run();
} catch (error) {
logger.warn("Optional step failed", { error });
}
-
ํ์
์ถ๋ก : TypeScript๋ step์ ๋ฐํ ํ์
์ ์๋์ผ๋ก ์ถ๋ก ํฉ๋๋ค.
const data: UserData = await step.define(...).run();
-
Sleep ์๊ฐ ์ ํ: ๋๋ฌด ๊ธด sleep์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์ ์งํ๋ฏ๋ก ๊ถ์ฅํ์ง ์์ต๋๋ค.
await step.sleep("delay", "1h"); // OK
await step.sleep("delay", "30d"); // ๋น์ถ์ฒ
๋ค์ ๋จ๊ณ