메인 콘텐츠로 건너뛰기
Step은 Workflow를 작은 실행 단위로 나누는 핵심 개념입니다. 각 Step은 독립적으로 실행되고 실패 시 해당 Step부터 재시도할 수 있어, 긴 작업을 안전하게 처리할 수 있습니다.

Step이 필요한 이유

Workflow에서 Step을 사용하지 않으면, 작업 중간에 실패했을 때 처음부터 다시 실행해야 합니다. Step을 사용하면 실패한 지점부터 재개할 수 있습니다.
// ❌ Step 없이: 중간에 실패하면 처음부터
export const processOrder = workflow(
  { name: "process_order" },
  async ({ input }) => {
    await validatePayment(input.orderId);      // 1. 결제 검증
    await updateInventory(input.items);        // 2. 재고 업데이트
    await sendConfirmationEmail(input.email);  // 3. 이메일 발송
    // 3번에서 실패하면? → 1번부터 다시 실행
  }
);

// ✅ Step으로 나누기: 실패한 step부터 재시도
export const processOrder = workflow(
  { name: "process_order" },
  async ({ input, step }) => {
    await step.define({ name: "validate_payment" }, async () => {
      await validatePayment(input.orderId);
    }).run();
    
    await step.define({ name: "update_inventory" }, async () => {
      await updateInventory(input.items);
    }).run();
    
    await step.define({ name: "send_email" }, async () => {
      await sendConfirmationEmail(input.email);
    }).run();
    // 3번에서 실패하면? → 3번부터 재시도
  }
);
Step 사용의 장점:
  • 부분 재시도: 실패한 step부터 다시 실행
  • 진행 상황 추적: 각 step의 실행 시간과 상태 확인
  • 디버깅 용이: 어느 step에서 실패했는지 명확히 파악

step.define()

step.define()은 함수를 step으로 감싸는 가장 기본적인 방법입니다. 비동기 함수를 step으로 만들면 실행 이력이 데이터베이스에 저장됩니다.
export const sendEmail = workflow(
  { name: "send_email" },
  async ({ input, step }) => {
    // Step 정의 및 실행
    await step.define(
      { name: "send" },
      async () => {
        await emailService.send({
          to: input.email,
          subject: "Hello",
          body: "...",
        });
      }
    ).run();
  }
);
핵심 개념:
  • name: Step 식별자 (workflow 내에서 고유해야 함)
  • 비동기 함수: 실제 작업을 수행하는 코드
  • .run(): Step을 실행하고 결과를 반환

반환값 사용

Step의 반환값을 다음 step에서 사용할 수 있습니다. 각 step은 독립적으로 저장되지만, 데이터는 메모리에서 전달됩니다.
export const processData = workflow(
  { name: "process_data" },
  async ({ input, step }) => {
    // Step 1: 데이터 가져오기
    const data = await step.define(
      { name: "fetch_data" },
      async () => {
        return await fetchDataFromAPI(input.userId);
      }
    ).run();
    
    // Step 2: 데이터 변환 (이전 step 결과 사용)
    const transformed = await step.define(
      { name: "transform" },
      async () => {
        return transformData(data);
      }
    ).run();
    
    // Step 3: 저장
    await step.define(
      { name: "save" },
      async () => {
        await saveToDatabase(transformed);
      }
    ).run();
    
    return { count: transformed.length };
  }
);
데이터 흐름:
  1. fetch_data step이 data 반환
  2. data가 메모리에서 transform step으로 전달
  3. transformed가 메모리에서 save step으로 전달
Step 재시도 시에는 이전 step의 결과가 데이터베이스에서 복원되어 사용됩니다.

step.get()

step.get()은 Model 메서드를 step으로 실행하는 편리한 방법입니다. 별도로 함수를 감싸지 않아도 되어 코드가 간결해집니다.
import { BaseModel, workflow } from "sonamu";

class UserModelClass extends BaseModel {
  async sendWelcomeEmail(userId: number) {
    const user = await this.findById(userId);
    await emailService.send({
      to: user.email,
      subject: "Welcome!",
      body: "...",
    });
  }
}

export const onboardUser = workflow(
  { name: "onboard_user" },
  async ({ input, step }) => {
    // Model 메서드를 step으로 실행
    await step.get(
      UserModel,
      "sendWelcomeEmail"
    ).run(input.userId);
  }
);
사용 시나리오:
  • Model의 비즈니스 로직을 재사용
  • 코드 중복 제거
  • 타입 안전성 유지

커스텀 이름 지정

Step 이름을 명시적으로 지정하여 로그를 읽기 쉽게 만들 수 있습니다.
await step.get(
  { name: "send_email" },  // 커스텀 이름
  UserModel,
  "sendWelcomeEmail"
).run(userId);

반환값 활용

Model 메서드의 반환값도 동일하게 사용할 수 있습니다.
const user = await step.get(
  UserModel,
  "findById"
).run(userId);

console.log(user.email);

step.sleep()

step.sleep()은 일정 시간 대기하는 step입니다. 재시도 간 지연, API rate limit 회피, 정기 폴링 등에 활용합니다.
export const retryWithDelay = workflow(
  { name: "retry_with_delay" },
  async ({ input, step, logger }) => {
    for (let i = 0; i < 3; i++) {
      try {
        await step.define(
          { name: `attempt_${i + 1}` },
          async () => {
            await unreliableAPICall(input.data);
          }
        ).run();
        
        break;  // 성공
      } catch (error) {
        logger.warn(`Attempt ${i + 1} failed`, { error });
        
        if (i < 2) {
          // 재시도 전 대기
          await step.sleep("retry_delay", "5s");
        } else {
          throw error;  // 마지막 시도 실패
        }
      }
    }
  }
);
지원하는 시간 형식:
형식설명예시
s"5s" = 5초
m"10m" = 10분
h시간"2h" = 2시간
d"1d" = 1일
실전 활용:
  • 재시도 지연: 실패 후 5초 대기
  • Rate limiting: API 호출 간 1초 대기
  • 배치 간격: 100개씩 처리 후 10초 휴식
step.sleep()도 하나의 step이므로 데이터베이스에 기록됩니다. 재시도 시 sleep을 다시 실행하지 않습니다.

실전 예제

1. 배치 처리

대량의 데이터를 처리할 때는 작은 배치로 나누어 처리합니다. 각 배치를 별도 step으로 만들면 중간에 실패해도 처리된 배치는 재실행하지 않습니다.
export const processBatch = workflow(
  { name: "process_batch" },
  async ({ input, step, logger }) => {
    const batchSize = 100;
    const total = input.items.length;
    
    for (let i = 0; i < total; i += batchSize) {
      const batch = input.items.slice(i, i + batchSize);
      
      await step.define(
        { name: `process_batch_${i / batchSize}` },
        async () => {
          await Promise.all(
            batch.map(item => processItem(item))
          );
        }
      ).run();
      
      logger.info("Batch processed", {
        batch: i / batchSize,
        progress: `${Math.min(i + batchSize, total)}/${total}`,
      });
    }
    
    return { processed: total };
  }
);
핵심 포인트:
  • 100개씩 배치로 나누어 처리
  • 각 배치가 독립적인 step
  • 진행률을 로그로 기록

2. 외부 API 호출

외부 API 호출은 네트워크 문제로 실패할 수 있습니다. Step으로 나누면 실패한 API 호출만 재시도할 수 있습니다.
export const syncData = workflow(
  { name: "sync_data" },
  async ({ input, step, logger }) => {
    // Step 1: API에서 데이터 가져오기
    const externalData = await step.define(
      { name: "fetch_from_api" },
      async () => {
        const response = await fetch(
          `https://api.example.com/data?userId=${input.userId}`
        );
        return await response.json();
      }
    ).run();
    
    // Step 2: 데이터 정규화
    const normalized = await step.define(
      { name: "normalize_data" },
      async () => {
        return normalizeData(externalData);
      }
    ).run();
    
    // Step 3: DB에 저장
    await step.get(
      { name: "save_to_db" },
      DataModel,
      "saveMany"
    ).run(normalized);
    
    logger.info("Sync completed", { count: normalized.length });
  }
);
장점:
  • API 호출 실패 시 데이터 정규화는 스킵
  • 정규화 실패 시 API 재호출 안함
  • 각 단계의 실행 시간 측정 가능

3. 파일 처리 파이프라인

파일을 다운로드하고 파싱하여 저장하는 파이프라인입니다. 각 단계를 step으로 나누면 디버깅과 재시도가 쉬워집니다.
export const processUploadedFile = workflow(
  { name: "process_uploaded_file" },
  async ({ input, step, logger }) => {
    // Step 1: 파일 다운로드
    const fileContent = await step.define(
      { name: "download_file" },
      async () => {
        return await downloadFromS3(input.fileKey);
      }
    ).run();
    
    // Step 2: 파싱
    const parsed = await step.define(
      { name: "parse_file" },
      async () => {
        return parseCSV(fileContent);
      }
    ).run();
    
    // Step 3: 검증
    const validated = await step.define(
      { name: "validate" },
      async () => {
        return validateData(parsed);
      }
    ).run();
    
    // Step 4: DB에 저장
    await step.define(
      { name: "save_to_db" },
      async () => {
        await bulkInsert(validated);
      }
    ).run();
    
    return { imported: validated.length };
  }
);
파이프라인 구조:
  1. 다운로드 → 2. 파싱 → 3. 검증 → 4. 저장
  2. 각 단계가 실패하면 해당 step부터 재시도
  3. 파일은 한 번만 다운로드

4. Rate Limiting이 있는 API

API에 rate limit이 있을 때는 요청 간 지연을 추가합니다. step.sleep()을 사용하면 재시도 시 지연이 반복되지 않습니다.
export const sendNotifications = workflow(
  { name: "send_notifications" },
  async ({ input, step, logger }) => {
    const users = input.userIds;
    
    for (const userId of users) {
      // 각 사용자에게 알림
      await step.define(
        { name: `notify_user_${userId}` },
        async () => {
          const user = await UserModel.findById(userId);
          await sendPushNotification(user.deviceToken, input.message);
        }
      ).run();
      
      // 사용자 간 딜레이 (API 제한 회피)
      if (userId !== users[users.length - 1]) {
        await step.sleep("rate_limit_delay", "100ms");
      }
    }
    
    return { sent: users.length };
  }
);
Rate limiting 전략:
  • 각 알림 사이에 100ms 대기
  • API 서버 과부하 방지
  • 재시도 시 이미 성공한 알림은 스킵

Step 명명 규칙

고유한 이름 사용

같은 workflow 내에서 step 이름은 고유해야 합니다. 중복된 이름을 사용하면 마지막 step만 실행됩니다.
// ❌ 같은 이름 (마지막 것만 실행됨)
await step.define({ name: "process" }, async () => { ... }).run();
await step.define({ name: "process" }, async () => { ... }).run();

// ✅ 다른 이름
await step.define({ name: "process_payment" }, async () => { ... }).run();
await step.define({ name: "process_shipping" }, async () => { ... }).run();

반복문에서 동적 이름

반복문에서 step을 생성할 때는 인덱스를 포함하여 이름을 고유하게 만듭니다.
// 반복문에서
for (let i = 0; i < items.length; i++) {
  await step.define(
    { name: `process_item_${i}` },  // 각 step 다른 이름
    async () => {
      await processItem(items[i]);
    }
  ).run();
}
명명 팁:
  • 작업 내용을 나타내는 명사 사용: fetch_user, send_email
  • 인덱스 포함: process_batch_0, process_batch_1
  • Snake case 권장: send_welcome_email

주의사항

Step 사용 시 주의사항:
  1. 고유한 이름: Step 이름은 workflow 내에서 고유해야 합니다.
    { name: "step_1" }  // ✅
    { name: "step_1" }  // ❌ 중복
    
  2. 실행 순서: Step은 코드에 작성된 순서대로 실행됩니다.
    await step.define({ name: "first" }, ...).run();
    await step.define({ name: "second" }, ...).run();
    
  3. 에러 전파: Step이 실패하면 workflow가 중단됩니다. 선택적 작업은 try-catch로 감싸세요.
    try {
      await step.define({ name: "optional" }, ...).run();
    } catch (error) {
      logger.warn("Optional step failed", { error });
    }
    
  4. 타입 추론: TypeScript는 step의 반환 타입을 자동으로 추론합니다.
    const data: UserData = await step.define(...).run();
    
  5. Sleep 시간 제한: 너무 긴 sleep은 데이터베이스 연결을 유지하므로 권장하지 않습니다.
    await step.sleep("delay", "1h");   // OK
    await step.sleep("delay", "30d");  // 비추천
    

다음 단계