Step은 Workflow를 작은 실행 단위로 나누는 핵심 개념입니다. 각 Step은 독립적으로 실행되고 실패 시 해당 Step부터 재시도할 수 있어, 긴 작업을 안전하게 처리할 수 있습니다.
Step이 필요한 이유
Workflow에서 Step을 사용하지 않으면, 작업 중간에 실패했을 때 처음부터 다시 실행해야 합니다. Step을 사용하면 실패한 지점부터 재개할 수 있습니다.
// ❌ Step 없이: 중간에 실패하면 처음부터
export const processOrder = workflow(
{ name: "process_order" },
async ({ input }) => {
await validatePayment(input.orderId); // 1. 결제 검증
await updateInventory(input.items); // 2. 재고 업데이트
await sendConfirmationEmail(input.email); // 3. 이메일 발송
// 3번에서 실패하면? → 1번부터 다시 실행
}
);
// ✅ Step으로 나누기: 실패한 step부터 재시도
export const processOrder = workflow(
{ name: "process_order" },
async ({ input, step }) => {
await step.define({ name: "validate_payment" }, async () => {
await validatePayment(input.orderId);
}).run();
await step.define({ name: "update_inventory" }, async () => {
await updateInventory(input.items);
}).run();
await step.define({ name: "send_email" }, async () => {
await sendConfirmationEmail(input.email);
}).run();
// 3번에서 실패하면? → 3번부터 재시도
}
);
Step 사용의 장점:
- 부분 재시도: 실패한 step부터 다시 실행
- 진행 상황 추적: 각 step의 실행 시간과 상태 확인
- 디버깅 용이: 어느 step에서 실패했는지 명확히 파악
step.define()
step.define()은 함수를 step으로 감싸는 가장 기본적인 방법입니다. 비동기 함수를 step으로 만들면 실행 이력이 데이터베이스에 저장됩니다.
export const sendEmail = workflow(
{ name: "send_email" },
async ({ input, step }) => {
// Step 정의 및 실행
await step.define(
{ name: "send" },
async () => {
await emailService.send({
to: input.email,
subject: "Hello",
body: "...",
});
}
).run();
}
);
핵심 개념:
name: Step 식별자 (workflow 내에서 고유해야 함)
- 비동기 함수: 실제 작업을 수행하는 코드
.run(): Step을 실행하고 결과를 반환
반환값 사용
Step의 반환값을 다음 step에서 사용할 수 있습니다. 각 step은 독립적으로 저장되지만, 데이터는 메모리에서 전달됩니다.
export const processData = workflow(
{ name: "process_data" },
async ({ input, step }) => {
// Step 1: 데이터 가져오기
const data = await step.define(
{ name: "fetch_data" },
async () => {
return await fetchDataFromAPI(input.userId);
}
).run();
// Step 2: 데이터 변환 (이전 step 결과 사용)
const transformed = await step.define(
{ name: "transform" },
async () => {
return transformData(data);
}
).run();
// Step 3: 저장
await step.define(
{ name: "save" },
async () => {
await saveToDatabase(transformed);
}
).run();
return { count: transformed.length };
}
);
데이터 흐름:
fetch_data step이 data 반환
- data가 메모리에서
transform step으로 전달
- transformed가 메모리에서
save step으로 전달
Step 재시도 시에는 이전 step의 결과가 데이터베이스에서 복원되어 사용됩니다.
step.get()
step.get()은 Model 메서드를 step으로 실행하는 편리한 방법입니다. 별도로 함수를 감싸지 않아도 되어 코드가 간결해집니다.
import { BaseModel, workflow } from "sonamu";
class UserModelClass extends BaseModel {
async sendWelcomeEmail(userId: number) {
const user = await this.findById(userId);
await emailService.send({
to: user.email,
subject: "Welcome!",
body: "...",
});
}
}
export const onboardUser = workflow(
{ name: "onboard_user" },
async ({ input, step }) => {
// Model 메서드를 step으로 실행
await step.get(
UserModel,
"sendWelcomeEmail"
).run(input.userId);
}
);
사용 시나리오:
- Model의 비즈니스 로직을 재사용
- 코드 중복 제거
- 타입 안전성 유지
커스텀 이름 지정
Step 이름을 명시적으로 지정하여 로그를 읽기 쉽게 만들 수 있습니다.
await step.get(
{ name: "send_email" }, // 커스텀 이름
UserModel,
"sendWelcomeEmail"
).run(userId);
반환값 활용
Model 메서드의 반환값도 동일하게 사용할 수 있습니다.
const user = await step.get(
UserModel,
"findById"
).run(userId);
console.log(user.email);
step.sleep()
step.sleep()은 일정 시간 대기하는 step입니다. 재시도 간 지연, API rate limit 회피, 정기 폴링 등에 활용합니다.
export const retryWithDelay = workflow(
{ name: "retry_with_delay" },
async ({ input, step, logger }) => {
for (let i = 0; i < 3; i++) {
try {
await step.define(
{ name: `attempt_${i + 1}` },
async () => {
await unreliableAPICall(input.data);
}
).run();
break; // 성공
} catch (error) {
logger.warn(`Attempt ${i + 1} failed`, { error });
if (i < 2) {
// 재시도 전 대기
await step.sleep("retry_delay", "5s");
} else {
throw error; // 마지막 시도 실패
}
}
}
}
);
지원하는 시간 형식:
| 형식 | 설명 | 예시 |
|---|
s | 초 | "5s" = 5초 |
m | 분 | "10m" = 10분 |
h | 시간 | "2h" = 2시간 |
d | 일 | "1d" = 1일 |
실전 활용:
- 재시도 지연: 실패 후 5초 대기
- Rate limiting: API 호출 간 1초 대기
- 배치 간격: 100개씩 처리 후 10초 휴식
step.sleep()도 하나의 step이므로 데이터베이스에 기록됩니다. 재시도 시 sleep을 다시 실행하지 않습니다.
실전 예제
1. 배치 처리
대량의 데이터를 처리할 때는 작은 배치로 나누어 처리합니다. 각 배치를 별도 step으로 만들면 중간에 실패해도 처리된 배치는 재실행하지 않습니다.
export const processBatch = workflow(
{ name: "process_batch" },
async ({ input, step, logger }) => {
const batchSize = 100;
const total = input.items.length;
for (let i = 0; i < total; i += batchSize) {
const batch = input.items.slice(i, i + batchSize);
await step.define(
{ name: `process_batch_${i / batchSize}` },
async () => {
await Promise.all(
batch.map(item => processItem(item))
);
}
).run();
logger.info("Batch processed", {
batch: i / batchSize,
progress: `${Math.min(i + batchSize, total)}/${total}`,
});
}
return { processed: total };
}
);
핵심 포인트:
- 100개씩 배치로 나누어 처리
- 각 배치가 독립적인 step
- 진행률을 로그로 기록
2. 외부 API 호출
외부 API 호출은 네트워크 문제로 실패할 수 있습니다. Step으로 나누면 실패한 API 호출만 재시도할 수 있습니다.
export const syncData = workflow(
{ name: "sync_data" },
async ({ input, step, logger }) => {
// Step 1: API에서 데이터 가져오기
const externalData = await step.define(
{ name: "fetch_from_api" },
async () => {
const response = await fetch(
`https://api.example.com/data?userId=${input.userId}`
);
return await response.json();
}
).run();
// Step 2: 데이터 정규화
const normalized = await step.define(
{ name: "normalize_data" },
async () => {
return normalizeData(externalData);
}
).run();
// Step 3: DB에 저장
await step.get(
{ name: "save_to_db" },
DataModel,
"saveMany"
).run(normalized);
logger.info("Sync completed", { count: normalized.length });
}
);
장점:
- API 호출 실패 시 데이터 정규화는 스킵
- 정규화 실패 시 API 재호출 안함
- 각 단계의 실행 시간 측정 가능
3. 파일 처리 파이프라인
파일을 다운로드하고 파싱하여 저장하는 파이프라인입니다. 각 단계를 step으로 나누면 디버깅과 재시도가 쉬워집니다.
export const processUploadedFile = workflow(
{ name: "process_uploaded_file" },
async ({ input, step, logger }) => {
// Step 1: 파일 다운로드
const fileContent = await step.define(
{ name: "download_file" },
async () => {
return await downloadFromS3(input.fileKey);
}
).run();
// Step 2: 파싱
const parsed = await step.define(
{ name: "parse_file" },
async () => {
return parseCSV(fileContent);
}
).run();
// Step 3: 검증
const validated = await step.define(
{ name: "validate" },
async () => {
return validateData(parsed);
}
).run();
// Step 4: DB에 저장
await step.define(
{ name: "save_to_db" },
async () => {
await bulkInsert(validated);
}
).run();
return { imported: validated.length };
}
);
파이프라인 구조:
- 다운로드 → 2. 파싱 → 3. 검증 → 4. 저장
- 각 단계가 실패하면 해당 step부터 재시도
- 파일은 한 번만 다운로드
4. Rate Limiting이 있는 API
API에 rate limit이 있을 때는 요청 간 지연을 추가합니다. step.sleep()을 사용하면 재시도 시 지연이 반복되지 않습니다.
export const sendNotifications = workflow(
{ name: "send_notifications" },
async ({ input, step, logger }) => {
const users = input.userIds;
for (const userId of users) {
// 각 사용자에게 알림
await step.define(
{ name: `notify_user_${userId}` },
async () => {
const user = await UserModel.findById(userId);
await sendPushNotification(user.deviceToken, input.message);
}
).run();
// 사용자 간 딜레이 (API 제한 회피)
if (userId !== users[users.length - 1]) {
await step.sleep("rate_limit_delay", "100ms");
}
}
return { sent: users.length };
}
);
Rate limiting 전략:
- 각 알림 사이에 100ms 대기
- API 서버 과부하 방지
- 재시도 시 이미 성공한 알림은 스킵
Step 명명 규칙
고유한 이름 사용
같은 workflow 내에서 step 이름은 고유해야 합니다. 중복된 이름을 사용하면 마지막 step만 실행됩니다.
// ❌ 같은 이름 (마지막 것만 실행됨)
await step.define({ name: "process" }, async () => { ... }).run();
await step.define({ name: "process" }, async () => { ... }).run();
// ✅ 다른 이름
await step.define({ name: "process_payment" }, async () => { ... }).run();
await step.define({ name: "process_shipping" }, async () => { ... }).run();
반복문에서 동적 이름
반복문에서 step을 생성할 때는 인덱스를 포함하여 이름을 고유하게 만듭니다.
// 반복문에서
for (let i = 0; i < items.length; i++) {
await step.define(
{ name: `process_item_${i}` }, // 각 step 다른 이름
async () => {
await processItem(items[i]);
}
).run();
}
명명 팁:
- 작업 내용을 나타내는 명사 사용:
fetch_user, send_email
- 인덱스 포함:
process_batch_0, process_batch_1
- Snake case 권장:
send_welcome_email
주의사항
Step 사용 시 주의사항:
-
고유한 이름: Step 이름은 workflow 내에서 고유해야 합니다.
{ name: "step_1" } // ✅
{ name: "step_1" } // ❌ 중복
-
실행 순서: Step은 코드에 작성된 순서대로 실행됩니다.
await step.define({ name: "first" }, ...).run();
await step.define({ name: "second" }, ...).run();
-
에러 전파: Step이 실패하면 workflow가 중단됩니다. 선택적 작업은 try-catch로 감싸세요.
try {
await step.define({ name: "optional" }, ...).run();
} catch (error) {
logger.warn("Optional step failed", { error });
}
-
타입 추론: TypeScript는 step의 반환 타입을 자동으로 추론합니다.
const data: UserData = await step.define(...).run();
-
Sleep 시간 제한: 너무 긴 sleep은 데이터베이스 연결을 유지하므로 권장하지 않습니다.
await step.sleep("delay", "1h"); // OK
await step.sleep("delay", "30d"); // 비추천
다음 단계