메인 μ½˜ν…μΈ λ‘œ κ±΄λ„ˆλ›°κΈ°
Workflow μ‹€ν–‰ 쀑 λ°œμƒν•˜λŠ” μ—λŸ¬λ₯Ό μ²˜λ¦¬ν•˜λŠ” 방법을 λ‹€λ£Ήλ‹ˆλ‹€. μ™ΈλΆ€ API 호좜, λ„€νŠΈμ›Œν¬ 문제, μΌμ‹œμ  μž₯μ•  λ“±μœΌλ‘œ μΈν•œ μ‹€νŒ¨λ₯Ό μ•ˆμ „ν•˜κ²Œ μ²˜λ¦¬ν•˜κ³ , ν•„μš”ν•œ 경우 μžλ™μœΌλ‘œ μž¬μ‹œλ„ν•˜λŠ” νŒ¨ν„΄μ„ κ΅¬ν˜„ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

κΈ°λ³Έ μ—λŸ¬ 처리

Workflowμ—μ„œ λ°œμƒν•œ μ—λŸ¬λŠ” 일반적인 try-catch 문으둜 μ²˜λ¦¬ν•©λ‹ˆλ‹€. μ—λŸ¬λ₯Ό λ‘œκΉ…ν•˜κ³ , ν•„μš”ν•œ 경우 μž¬λ°œμƒμ‹œμΌœ workflowλ₯Ό μ‹€νŒ¨ μƒνƒœλ‘œ λ§Œλ“­λ‹ˆλ‹€.
import { workflow } from "sonamu";

export const processPayment = workflow(
  { name: "process_payment" },
  async ({ input, step, logger }) => {
    try {
      await step.define({ name: "charge" }, async () => {
        await paymentGateway.charge(input.amount);
      }).run();
      
      return { success: true };
    } catch (error) {
      logger.error("Payment failed", { error, input });
      
      // μ—λŸ¬ μž¬λ°œμƒ (workflow μ‹€νŒ¨)
      throw error;
    }
  }
);
핡심 κ°œλ…:
  • logger.error()둜 μ—λŸ¬ 기둝
  • throw error둜 workflow μ‹€νŒ¨ 처리
  • μ‹€νŒ¨ν•œ workflowλŠ” Workerκ°€ μžλ™μœΌλ‘œ μž¬μ‹œλ„

Step별 μ—λŸ¬ 처리

λͺ¨λ“  step이 ν•„μˆ˜λŠ” μ•„λ‹™λ‹ˆλ‹€. 일뢀 step은 μ‹€νŒ¨ν•΄λ„ workflowλ₯Ό 계속 μ§„ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 이런 선택적 step은 try-catch둜 κ°μ‹Έμ„œ μ—λŸ¬λ₯Ό ν‘μˆ˜ν•©λ‹ˆλ‹€.
export const syncData = workflow(
  { name: "sync_data" },
  async ({ input, step, logger }) => {
    // Step 1: ν•„μˆ˜ μž‘μ—…
    const data = await step.define({ name: "fetch_data" }, async () => {
      return await fetchFromAPI(input.url);
    }).run();
    
    // Step 2: 선택적 μž‘μ—… (μ‹€νŒ¨ν•΄λ„ 계속 μ§„ν–‰)
    let cached = false;
    try {
      await step.define({ name: "cache_data" }, async () => {
        await cacheData(data);
      }).run();
      cached = true;
    } catch (error) {
      logger.warn("Cache failed, continuing...", { error });
    }
    
    // Step 3: μ΅œμ’… μ €μž₯
    await step.define({ name: "save_data" }, async () => {
      await saveToDatabase(data);
    }).run();
    
    return { saved: true, cached };
  }
);
μ‚¬μš© μ‹œλ‚˜λ¦¬μ˜€:
  • 캐싱 μ‹€νŒ¨ (λ°μ΄ν„°λŠ” μ €μž₯λ˜μ–΄μ•Ό 함)
  • μ•Œλ¦Ό λ°œμ†‘ μ‹€νŒ¨ (주문은 μ™„λ£Œλ˜μ–΄μ•Ό 함)
  • 둜그 기둝 μ‹€νŒ¨ (λΉ„μ¦ˆλ‹ˆμŠ€ λ‘œμ§μ€ 계속)

μž¬μ‹œλ„ νŒ¨ν„΄

μˆ˜λ™ μž¬μ‹œλ„

μ™ΈλΆ€ APIλ‚˜ λ„€νŠΈμ›Œν¬ μš”μ²­μ€ μΌμ‹œμ μœΌλ‘œ μ‹€νŒ¨ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 이런 경우 μ—¬λŸ¬ 번 μž¬μ‹œλ„ν•˜μ—¬ 성곡λ₯ μ„ 높일 수 μžˆμŠ΅λ‹ˆλ‹€.
export const fetchWithRetry = workflow(
  { name: "fetch_with_retry" },
  async ({ input, step, logger }) => {
    const maxRetries = 3;
    let lastError: Error | null = null;
    
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        const data = await step.define(
          { name: `fetch_attempt_${attempt}` },
          async () => {
            return await unstableAPICall(input.url);
          }
        ).run();
        
        logger.info("Fetch succeeded", { attempt });
        return { data, attempts: attempt };
      } catch (error) {
        lastError = error;
        logger.warn("Fetch failed", { attempt, error });
        
        // λ§ˆμ§€λ§‰ μ‹œλ„ μ „μ—λŠ” λŒ€κΈ°
        if (attempt < maxRetries) {
          await step.sleep("retry_delay", "5s");
        }
      }
    }
    
    // λͺ¨λ“  μ‹œλ„ μ‹€νŒ¨
    throw new Error(`Failed after ${maxRetries} attempts: ${lastError?.message}`);
  }
);
μž¬μ‹œλ„ μ „λž΅:
  1. μ΅œλŒ€ 3번 μ‹œλ„
  2. μ‹œλ„ 사이에 5초 λŒ€κΈ°
  3. λͺ¨λ“  μ‹œλ„ μ‹€νŒ¨ μ‹œ μ—λŸ¬ λ°œμƒ

μ§€μˆ˜ λ°±μ˜€ν”„

μž¬μ‹œλ„ 간격을 점점 λŠ˜λ¦¬λŠ” μ§€μˆ˜ λ°±μ˜€ν”„(Exponential Backoff) μ „λž΅μ€ μ„œλ²„ κ³ΌλΆ€ν•˜λ₯Ό λ°©μ§€ν•˜λ©΄μ„œ μž¬μ‹œλ„ 성곡λ₯ μ„ λ†’μž…λ‹ˆλ‹€.
export const fetchWithBackoff = workflow(
  { name: "fetch_with_backoff" },
  async ({ input, step, logger }) => {
    const maxRetries = 5;
    
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await step.define(
          { name: `attempt_${attempt}` },
          async () => {
            return await externalAPI.fetch(input.url);
          }
        ).run();
      } catch (error) {
        logger.warn("Attempt failed", { attempt, error });
        
        if (attempt < maxRetries) {
          // μ§€μˆ˜ λ°±μ˜€ν”„: 2초, 4초, 8초, 16초
          const delay = Math.pow(2, attempt);
          await step.sleep(`backoff_${attempt}`, `${delay}s`);
        } else {
          throw error;
        }
      }
    }
  }
);
λ°±μ˜€ν”„ 간격:
  • 1μ°¨ μ‹€νŒ¨ β†’ 2초 λŒ€κΈ°
  • 2μ°¨ μ‹€νŒ¨ β†’ 4초 λŒ€κΈ°
  • 3μ°¨ μ‹€νŒ¨ β†’ 8초 λŒ€κΈ°
  • 4μ°¨ μ‹€νŒ¨ β†’ 16초 λŒ€κΈ°
μž₯점:
  • μΌμ‹œμ  κ³ΌλΆ€ν•˜μ—μ„œ νšŒλ³΅ν•  μ‹œκ°„ 제곡
  • μ„œλ²„ λΆ€ν•˜ λΆ„μ‚°
  • μž¬μ‹œλ„ 성곡λ₯  ν–₯상

보상 νŠΈλžœμž­μ…˜

λΆ„μ‚° νŠΈλžœμž­μ…˜μ—μ„œλŠ” 일뢀 μž‘μ—…μ΄ μ‹€νŒ¨ν•˜λ©΄ 이미 μ™„λ£Œλœ μž‘μ—…μ„ λ˜λŒλ €μ•Ό ν•©λ‹ˆλ‹€. 이λ₯Ό 보상 νŠΈλžœμž­μ…˜(Compensating Transaction)이라고 ν•©λ‹ˆλ‹€.
export const processOrder = workflow(
  { name: "process_order" },
  async ({ input, step, logger }) => {
    let paymentId: string | null = null;
    let inventoryReserved = false;
    
    try {
      // Step 1: 결제
      paymentId = await step.define({ name: "charge_payment" }, async () => {
        return await paymentService.charge(input.amount);
      }).run();
      
      // Step 2: 재고 확보
      await step.define({ name: "reserve_inventory" }, async () => {
        await inventoryService.reserve(input.items);
      }).run();
      inventoryReserved = true;
      
      // Step 3: μ£Όλ¬Έ 생성
      const orderId = await step.define({ name: "create_order" }, async () => {
        return await orderService.create({
          paymentId,
          items: input.items,
        });
      }).run();
      
      return { orderId, success: true };
    } catch (error) {
      logger.error("Order processing failed, rolling back...", { error });
      
      // 보상: 재고 ν•΄μ œ
      if (inventoryReserved) {
        await step.define({ name: "rollback_inventory" }, async () => {
          await inventoryService.release(input.items);
        }).run();
      }
      
      // 보상: 결제 μ·¨μ†Œ
      if (paymentId) {
        await step.define({ name: "rollback_payment" }, async () => {
          await paymentService.refund(paymentId);
        }).run();
      }
      
      throw error;
    }
  }
);
보상 νŠΈλžœμž­μ…˜ νŒ¨ν„΄:
  1. 각 μž‘μ—…μ˜ μ™„λ£Œ μƒνƒœ 좔적
  2. μ‹€νŒ¨ λ°œμƒ μ‹œ μ™„λ£Œλœ μž‘μ—… 확인
  3. μ—­μˆœμœΌλ‘œ μž‘μ—… μ·¨μ†Œ
  4. μ—λŸ¬ μž¬λ°œμƒ
μ‹€μ „ μ‚¬μš©:
  • 결제 μ·¨μ†Œ
  • 재고 볡원
  • μ˜ˆμ•½ μ·¨μ†Œ
  • 파일 μ‚­μ œ

νƒ€μž„μ•„μ›ƒ 처리

μ™ΈλΆ€ APIκ°€ μ‘λ‹΅ν•˜μ§€ μ•ŠμœΌλ©΄ workflowκ°€ λ¬΄ν•œμ • λŒ€κΈ°ν•  수 μžˆμŠ΅λ‹ˆλ‹€. νƒ€μž„μ•„μ›ƒμ„ μ„€μ •ν•˜μ—¬ 일정 μ‹œκ°„ ν›„ μ‹€νŒ¨ μ²˜λ¦¬ν•©λ‹ˆλ‹€.
async function withTimeout<T>(
  promise: Promise<T>,
  timeoutMs: number
): Promise<T> {
  return Promise.race([
    promise,
    new Promise<T>((_, reject) =>
      setTimeout(() => reject(new Error("Timeout")), timeoutMs)
    ),
  ]);
}

export const fetchWithTimeout = workflow(
  { name: "fetch_with_timeout" },
  async ({ input, step, logger }) => {
    try {
      const data = await step.define({ name: "fetch" }, async () => {
        return await withTimeout(
          externalAPI.fetch(input.url),
          30000  // 30초
        );
      }).run();
      
      return { data };
    } catch (error) {
      if (error.message === "Timeout") {
        logger.error("Request timed out");
      }
      throw error;
    }
  }
);
νƒ€μž„μ•„μ›ƒ μ „λž΅:
  • 짧은 νƒ€μž„μ•„μ›ƒ (5-10초): λΉ λ₯Έ API
  • 쀑간 νƒ€μž„μ•„μ›ƒ (30-60초): 일반 API
  • κΈ΄ νƒ€μž„μ•„μ›ƒ (5-10λΆ„): 파일 처리

μ—λŸ¬ νƒ€μž…λ³„ 처리

μ—λŸ¬μ˜ μ’…λ₯˜μ— 따라 λ‹€λ₯Έ 처리 μ „λž΅μ„ μ‚¬μš©ν•©λ‹ˆλ‹€. λ„€νŠΈμ›Œν¬ μ—λŸ¬λŠ” μž¬μ‹œλ„ν•˜μ§€λ§Œ, 데이터 검증 μ—λŸ¬λŠ” μ¦‰μ‹œ μ‹€νŒ¨ μ²˜λ¦¬ν•©λ‹ˆλ‹€.
export const processData = workflow(
  { name: "process_data" },
  async ({ input, step, logger }) => {
    try {
      await step.define({ name: "process" }, async () => {
        await dataService.process(input.data);
      }).run();
    } catch (error) {
      // λ„€νŠΈμ›Œν¬ μ—λŸ¬: μž¬μ‹œλ„
      if (error.code === 'ECONNREFUSED') {
        logger.warn("Connection refused, retrying...");
        await step.sleep("retry_delay", "10s");
        throw error;  // μž¬μ‹œλ„
      }
      
      // 검증 μ—λŸ¬: μ¦‰μ‹œ μ‹€νŒ¨
      if (error.code === 'VALIDATION_ERROR') {
        logger.error("Validation failed", { error });
        throw new Error("Invalid data, not retrying");
      }
      
      // 기타 μ—λŸ¬
      throw error;
    }
  }
);
μ—λŸ¬ λΆ„λ₯˜:
μ—λŸ¬ νƒ€μž…μ²˜λ¦¬ λ°©λ²•μ˜ˆμ‹œ
μΌμ‹œμ  μ—λŸ¬μž¬μ‹œλ„λ„€νŠΈμ›Œν¬, νƒ€μž„μ•„μ›ƒ, 503
영ꡬ적 μ—λŸ¬μ¦‰μ‹œ μ‹€νŒ¨κ²€μ¦, 인증, 404
λΆ€λΆ„ μ‹€νŒ¨μ„ νƒμ  μ²˜λ¦¬μΌλΆ€ 데이터 손상

μ‹€μ „ 예제

1. 이메일 λ°œμ†‘ with Dead Letter Queue

이메일 λ°œμ†‘μ΄ 3번 μ‹€νŒ¨ν•˜λ©΄ Dead Letter Queue에 μΆ”κ°€ν•˜μ—¬ λ‚˜μ€‘μ— μˆ˜λ™μœΌλ‘œ μ²˜λ¦¬ν•©λ‹ˆλ‹€.
export const sendEmail = workflow(
  { name: "send_email" },
  async ({ input, step, logger }) => {
    const maxRetries = 3;
    
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        await step.define(
          { name: `send_attempt_${attempt}` },
          async () => {
            await emailService.send({
              to: input.email,
              subject: input.subject,
              body: input.body,
            });
          }
        ).run();
        
        logger.info("Email sent", { attempt });
        return { success: true, attempts: attempt };
      } catch (error) {
        logger.warn("Email send failed", { attempt, error });
        
        if (attempt < maxRetries) {
          await step.sleep("retry_delay", "30s");
        } else {
          // μ΅œμ’… μ‹€νŒ¨: Dead Letter Queue둜
          await step.define({ name: "move_to_dlq" }, async () => {
            await deadLetterQueue.add({
              type: "email",
              data: input,
              error: error.message,
            });
          }).run();
          
          throw error;
        }
      }
    }
  }
);
DLQ νŒ¨ν„΄:
  • 3번 μž¬μ‹œλ„ μ‹€νŒ¨
  • DLQ에 μΆ”κ°€ (λ‚˜μ€‘μ— 처리)
  • κ΄€λ¦¬μžμ—κ²Œ μ•Œλ¦Ό

2. API 호좜 with Circuit Breaker

μ™ΈλΆ€ APIκ°€ 계속 μ‹€νŒ¨ν•˜λ©΄ **회둜 차단기(Circuit Breaker)**둜 μš”μ²­μ„ μ°¨λ‹¨ν•˜μ—¬ μ‹œμŠ€ν…œμ„ λ³΄ν˜Έν•©λ‹ˆλ‹€.
class CircuitBreaker {
  private failures = 0;
  private lastFailureTime = 0;
  private readonly threshold = 5;
  private readonly timeout = 60000;  // 1λΆ„
  
  async call<T>(fn: () => Promise<T>): Promise<T> {
    // νšŒλ‘œκ°€ μ—΄λ¦° μƒνƒœ
    if (this.failures >= this.threshold) {
      const elapsed = Date.now() - this.lastFailureTime;
      if (elapsed < this.timeout) {
        throw new Error("Circuit breaker is open");
      }
      // νƒ€μž„μ•„μ›ƒ ν›„ μž¬μ‹œλ„
      this.failures = 0;
    }
    
    try {
      const result = await fn();
      this.failures = 0;  // 성곡 μ‹œ 리셋
      return result;
    } catch (error) {
      this.failures++;
      this.lastFailureTime = Date.now();
      throw error;
    }
  }
}

const circuitBreaker = new CircuitBreaker();

export const callAPI = workflow(
  { name: "call_api" },
  async ({ input, step, logger }) => {
    try {
      const data = await step.define({ name: "api_call" }, async () => {
        return await circuitBreaker.call(() =>
          fetch(input.url).then(r => r.json())
        );
      }).run();
      
      return { data };
    } catch (error) {
      if (error.message === "Circuit breaker is open") {
        logger.error("Circuit breaker open, API unavailable");
      }
      throw error;
    }
  }
);
Circuit Breaker μƒνƒœ:
  • Closed: 정상 μž‘λ™
  • Open: 5번 μ‹€νŒ¨ ν›„ 차단 (1λΆ„κ°„)
  • Half-Open: 1λΆ„ ν›„ μž¬μ‹œλ„

3. 파일 μ—…λ‘œλ“œ with λΆ€λΆ„ μž¬μ‹œλ„

μ—¬λŸ¬ νŒŒμΌμ„ μ—…λ‘œλ“œν•  λ•Œ, 각 νŒŒμΌμ„ λ…λ¦½μ μœΌλ‘œ μž¬μ‹œλ„ν•˜μ—¬ 일뢀 μ‹€νŒ¨ν•΄λ„ λ‚˜λ¨Έμ§€λŠ” μ„±κ³΅ν•˜λ„λ‘ ν•©λ‹ˆλ‹€.
export const uploadFiles = workflow(
  { name: "upload_files" },
  async ({ input, step, logger }) => {
    const results = [];
    
    for (let i = 0; i < input.files.length; i++) {
      const file = input.files[i];
      let uploaded = false;
      
      // 각 νŒŒμΌλ³„λ‘œ μž¬μ‹œλ„
      for (let attempt = 1; attempt <= 3; attempt++) {
        try {
          await step.define(
            { name: `upload_${i}_attempt_${attempt}` },
            async () => {
              await s3.upload(file.key, file.content);
            }
          ).run();
          
          uploaded = true;
          results.push({ file: file.key, success: true });
          break;
        } catch (error) {
          logger.warn("Upload failed", { file: file.key, attempt, error });
          
          if (attempt < 3) {
            await step.sleep(`retry_${i}_${attempt}`, "5s");
          }
        }
      }
      
      if (!uploaded) {
        results.push({ file: file.key, success: false });
      }
    }
    
    const failed = results.filter(r => !r.success);
    if (failed.length > 0) {
      logger.error("Some uploads failed", { failed });
    }
    
    return {
      total: input.files.length,
      succeeded: results.filter(r => r.success).length,
      failed: failed.length,
    };
  }
);
λΆ€λΆ„ μž¬μ‹œλ„ μ „λž΅:
  • 각 파일이 독립적인 stepλ“€
  • νŒŒμΌλ³„ 3번 μž¬μ‹œλ„
  • 일뢀 μ‹€νŒ¨ν•΄λ„ workflow 성곡
  • μ‹€νŒ¨ν•œ 파일 λͺ©λ‘ λ°˜ν™˜

μ£Όμ˜μ‚¬ν•­

μ—λŸ¬ 처리 μ‹œ μ£Όμ˜μ‚¬ν•­:
  1. μ—λŸ¬ λ‘œκΉ…: 항상 μ—λŸ¬λ₯Ό λ‘œκΉ…ν•˜μ—¬ 문제λ₯Ό 좔적할 수 μžˆμ–΄μ•Ό ν•©λ‹ˆλ‹€.
    catch (error) {
      logger.error("Operation failed", { error, context });
    }
    
  2. μž¬μ‹œλ„ μ œν•œ: λ¬΄ν•œ μž¬μ‹œλ„λ₯Ό λ°©μ§€ν•˜κ³  μ΅œλŒ€ 횟수λ₯Ό μ„€μ •ν•˜μ„Έμš”.
    const maxRetries = 3;  // λͺ…ν™•ν•œ μ œν•œ
    
  3. 보상 νŠΈλžœμž­μ…˜: μ‹€νŒ¨ μ‹œ 이미 μ™„λ£Œλœ μž‘μ—…μ„ μ •λ¦¬ν•˜μ„Έμš”.
    catch (error) {
      await rollbackCompletedSteps();
      throw error;
    }
    
  4. νƒ€μž„μ•„μ›ƒ μ„€μ •: λ¬΄ν•œ λŒ€κΈ°λ₯Ό λ°©μ§€ν•˜μ„Έμš”.
    await withTimeout(promise, 30000);
    
  5. Dead Letter Queue: μ΅œμ’… μ‹€νŒ¨ μ‹œ μˆ˜λ™ 처리λ₯Ό μœ„ν•΄ λ³„λ„λ‘œ μ €μž₯ν•˜μ„Έμš”.
    await deadLetterQueue.add(failedItem);
    
  6. μ—λŸ¬ νƒ€μž… ꡬ뢄: μž¬μ‹œλ„ κ°€λŠ₯ν•œ μ—λŸ¬μ™€ λΆˆκ°€λŠ₯ν•œ μ—λŸ¬λ₯Ό κ΅¬λΆ„ν•˜μ„Έμš”.
    if (isRetryable(error)) {
      throw error;  // μž¬μ‹œλ„
    } else {
      throw new NonRetryableError();  // μ¦‰μ‹œ μ‹€νŒ¨
    }
    

λ‹€μŒ 단계

@workflow λ°μ½”λ ˆμ΄ν„°

Workflow μ •μ˜μ™€ μŠ€μΌ€μ€„λ§ 배우기

Step

Step으둜 μž‘μ—…μ„ λ‚˜λˆ„κ³  κ΄€λ¦¬ν•˜κΈ°

Worker μ„€μ •

Worker ν”„λ‘œμ„ΈμŠ€ μ„€μ •ν•˜κΈ°