메인 콘텐츠로 건너뛰기
Workflow는 백그라운드에서 실행되는 장기 실행 작업을 정의하는 시스템입니다. @workflow 데코레이터를 사용하면 이메일 발송, 데이터 처리, 정기 작업 등을 안정적으로 실행할 수 있습니다.

기본 개념

Workflow는 비동기 작업을 안전하게 실행하는 것이 목적입니다. API 요청과 달리 Workflow는:
  • 장시간 실행: 몇 분에서 몇 시간까지 실행 가능
  • 재시도 가능: 실패 시 자동으로 재시도
  • 모니터링: 실행 상태를 데이터베이스에 기록
  • 스케줄링: Cron 표현식으로 정기 실행

기본 사용법

Workflow를 정의할 때는 이름과 실행 함수를 제공합니다. 실행 함수는 input, step, logger 등의 파라미터를 받습니다.
// src/workflows/email.workflow.ts
import { workflow } from "sonamu";

export const sendWelcomeEmail = workflow(
  { name: "send_welcome_email" },
  async ({ input, step, logger }) => {
    logger.info("Sending welcome email", { email: input.email });
    
    // 이메일 발송
    await step.define({ name: "send_email" }, async () => {
      await sendEmail({
        to: input.email,
        subject: "Welcome!",
        body: "...",
      });
    }).run();
    
    return { success: true };
  }
);
주요 파라미터:
  • name: Workflow 식별자 (고유해야 함)
  • input: Workflow에 전달되는 입력 데이터
  • step: Step 실행 객체 (작업을 나누어 관리)
  • logger: 로깅 객체 (실행 상태 기록)
Workflow 이름을 생략하면 함수명을 snake_case로 변환하여 자동 설정합니다. 예: sendWelcomeEmailsend_welcome_email

Workflow 실행

API에서 실행

API 엔드포인트에서 Workflow를 실행하면 즉시 응답을 반환하고 백그라운드에서 작업이 진행됩니다. 사용자는 긴 작업을 기다리지 않아도 됩니다.
import { BaseModel, api, Sonamu } from "sonamu";
import { sendWelcomeEmail } from "../workflows/email.workflow";

class UserModelClass extends BaseModel {
  @api({ httpMethod: 'POST' })
  async register(email: string, password: string) {
    // 사용자 저장
    const wdb = this.getPuri("w");
    wdb.ubRegister("users", { email, password });
    
    const [userId] = await wdb.transaction(async (trx) => {
      return trx.ubUpsert("users");
    });
    
    // Workflow 실행 (백그라운드)
    await Sonamu.workflows.run(
      {
        name: "send_welcome_email",
        version: null,
      },
      { email }
    );
    
    return userId;
  }
}
실행 흐름:
  1. API가 Sonamu.workflows.run()을 호출
  2. Workflow가 큐에 등록되고 즉시 리턴
  3. Worker가 큐에서 Workflow를 가져와 실행
  4. 실행 결과는 데이터베이스에 저장

직접 실행

스크립트나 다른 Workflow에서 직접 실행할 수도 있습니다. handle.result()를 사용하면 완료될 때까지 기다릴 수 있습니다.
import { Sonamu } from "sonamu";

const handle = await Sonamu.workflows.run(
  {
    name: "send_welcome_email",
    version: null,
  },
  { email: "[email protected]" }
);

// 결과 대기 (선택)
const result = await handle.result();
console.log(result);  // { success: true }
handle.result()는 Workflow가 완료될 때까지 대기합니다. API 응답에서는 사용하지 마세요!

스키마 검증

Zod 스키마를 사용하면 입력과 출력 데이터를 자동으로 검증할 수 있습니다. 잘못된 데이터로 인한 런타임 에러를 방지하고, TypeScript 타입 추론도 정확해집니다.
import { workflow } from "sonamu";
import { z } from "zod";

const inputSchema = z.object({
  userId: z.number(),
  amount: z.number().positive(),
});

const outputSchema = z.object({
  orderId: z.number(),
  status: z.enum(["success", "failed"]),
});

export const processPayment = workflow(
  {
    name: "process_payment",
    schema: {
      input: inputSchema,
      output: outputSchema,
    },
  },
  async ({ input, step, logger }) => {
    // input은 자동으로 검증됨
    logger.info("Processing payment", { userId: input.userId });
    
    const orderId = await step.define({ name: "create_order" }, async () => {
      return await createOrder(input.userId, input.amount);
    }).run();
    
    // output도 자동으로 검증됨
    return {
      orderId,
      status: "success" as const,
    };
  }
);
스키마 검증 장점:
  • 타입 안전성: TypeScript가 input/output 타입을 정확히 추론
  • 런타임 검증: 실행 전에 데이터 형식 확인
  • 명확한 계약: Workflow의 인터페이스가 명확해짐

버전 관리

Workflow 로직을 변경해야 할 때, 버전을 지정하면 기존 실행 중인 작업은 이전 로직으로 완료되고 새로운 실행은 새 로직을 사용합니다.
export const sendWelcomeEmail = workflow(
  {
    name: "send_welcome_email",
    version: "v2",
  },
  async ({ input, step, version, logger }) => {
    logger.info("Version", { version });  // "v2"
    
    // 버전별 로직
    if (version === "v2") {
      // 새로운 로직: HTML 이메일
      await sendHTMLEmail(input.email);
    } else {
      // 이전 로직: Plain text
      await sendPlainEmail(input.email);
    }
  }
);
사용 시나리오:
  • 이메일 템플릿 변경
  • 데이터 처리 로직 개선
  • 외부 API 연동 방식 변경
버전을 변경하면 기존 실행 중인 Workflow는 코드에 이전 버전 로직이 있어야 완료됩니다. 이전 버전 코드를 삭제하기 전에 모든 실행이 완료되었는지 확인하세요.

스케줄링

Cron 표현식을 사용하면 Workflow를 정기적으로 자동 실행할 수 있습니다. 매일 리포트 생성, 주기적 데이터 백업 등에 활용합니다.
export const dailyReport = workflow(
  {
    name: "daily_report",
    schedules: [
      {
        name: "daily_at_9am",
        expression: "0 9 * * *",  // 매일 오전 9시
        input: () => ({
          date: new Date().toISOString().split('T')[0],
        }),
      },
    ],
  },
  async ({ input, step, logger }) => {
    logger.info("Generating daily report", { date: input.date });
    
    // 리포트 생성
    const report = await step.define({ name: "generate_report" }, async () => {
      return await generateReport(input.date);
    }).run();
    
    // 이메일 발송
    await step.define({ name: "send_report" }, async () => {
      await sendReportEmail(report);
    }).run();
    
    return { success: true };
  }
);
Cron 표현식 가이드:
표현식실행 시기사용 예시
* * * * *매 분테스트용
0 * * * *매 시간시간별 집계
0 9 * * *매일 오전 9시일일 리포트
0 9 * * 1매주 월요일 9시주간 리포트
0 0 1 * *매월 1일 자정월간 정산
input 파라미터에 함수를 제공하면 실행 시점의 데이터를 동적으로 생성할 수 있습니다.

여러 스케줄 등록

하나의 Workflow에 여러 스케줄을 등록할 수 있습니다. 각 스케줄은 다른 입력 데이터를 전달할 수 있어, 같은 로직으로 다른 작업을 수행할 수 있습니다.
export const backupData = workflow(
  {
    name: "backup_data",
    schedules: [
      {
        name: "hourly_backup",
        expression: "0 * * * *",  // 매 시간
        input: { type: "incremental" },
      },
      {
        name: "daily_full_backup",
        expression: "0 2 * * *",  // 매일 오전 2시
        input: { type: "full" },
      },
    ],
  },
  async ({ input, step, logger }) => {
    logger.info("Backup started", { type: input.type });
    
    if (input.type === "full") {
      // 전체 백업
      await fullBackup();
    } else {
      // 증분 백업
      await incrementalBackup();
    }
  }
);
실전 활용:
  • 증분 백업: 매 시간 변경된 데이터만
  • 전체 백업: 매일 전체 데이터
  • 다른 타임존: 지역별로 다른 시간에 실행

실전 예제

1. 대량 이메일 발송

수천 명의 사용자에게 이메일을 보낼 때, Workflow를 사용하면 API 요청 타임아웃 없이 안전하게 처리할 수 있습니다.
// src/workflows/email.workflow.ts
import { workflow } from "sonamu";
import { z } from "zod";

export const sendBulkEmail = workflow(
  {
    name: "send_bulk_email",
    schema: {
      input: z.object({
        userIds: z.array(z.number()),
        subject: z.string(),
        body: z.string(),
      }),
    },
  },
  async ({ input, step, logger }) => {
    logger.info("Sending bulk email", { count: input.userIds.length });
    
    // 사용자 조회
    const users = await step.define({ name: "fetch_users" }, async () => {
      return await UserModel.findMany({
        wq: [['id', 'IN', input.userIds]],
      });
    }).run();
    
    // 이메일 발송
    for (const user of users) {
      await step.define(
        { name: `send_email_${user.id}` },
        async () => {
          await sendEmail({
            to: user.email,
            subject: input.subject,
            body: input.body,
          });
        }
      ).run();
    }
    
    return { sent: users.length };
  }
);
핵심 포인트:
  • 사용자 조회와 이메일 발송을 별도 Step으로 분리
  • 각 이메일마다 Step을 생성하여 개별 재시도 가능
  • 진행 상황을 DB에 기록하여 모니터링 가능

2. 데이터 파이프라인

외부 API에서 데이터를 가져와 변환하고 저장하는 파이프라인을 구성할 수 있습니다.
// src/workflows/data-processing.workflow.ts
import { workflow } from "sonamu";

export const processUserData = workflow(
  {
    name: "process_user_data",
  },
  async ({ input, step, logger }) => {
    // 1단계: 데이터 수집
    const rawData = await step.define({ name: "collect_data" }, async () => {
      return await fetchRawData();
    }).run();
    
    // 2단계: 변환
    const transformed = await step.define({ name: "transform" }, async () => {
      return transformData(rawData);
    }).run();
    
    // 3단계: 저장
    await step.define({ name: "save" }, async () => {
      await saveToDatabase(transformed);
    }).run();
    
    logger.info("Processing completed", { count: transformed.length });
    
    return { processed: transformed.length };
  }
);
장점:
  • 각 단계가 실패해도 처음부터 다시 시작하지 않음
  • 각 단계의 실행 시간을 측정하여 병목 구간 파악
  • 변환 로직 변경 시 수집 단계는 스킵 가능

3. 정기 정리 작업

오래된 데이터를 자동으로 삭제하는 작업을 스케줄링합니다.
// src/workflows/cleanup.workflow.ts
import { workflow } from "sonamu";

export const cleanupOldData = workflow(
  {
    name: "cleanup_old_data",
    schedules: [
      {
        name: "daily_cleanup",
        expression: "0 3 * * *",  // 매일 오전 3시
        input: {
          daysToKeep: 90,
        },
      },
    ],
  },
  async ({ input, step, logger }) => {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - input.daysToKeep);
    
    // 오래된 데이터 삭제
    const deleted = await step.define({ name: "delete_old_records" }, async () => {
      return await knex('logs')
        .where('created_at', '<', cutoffDate)
        .delete();
    }).run();
    
    logger.info("Cleanup completed", { deleted });
    
    return { deleted };
  }
);
사용 시나리오:
  • 로그 데이터 정리
  • 임시 파일 삭제
  • 만료된 세션 제거

주의사항

Workflow 사용 시 주의사항:
  1. Worker 필수: Workflow를 실행하려면 Worker 프로세스가 실행 중이어야 합니다.
    pnpm worker
    
  2. Step으로 나누기: 긴 작업은 여러 Step으로 분할하세요. 실패 시 전체를 재실행하지 않아도 됩니다.
    // ❌ Step 없이 긴 작업
    await longRunningTask();
    
    // ✅ Step으로 분할
    await step.define({ name: "task" }, longRunningTask).run();
    
  3. 스케줄 이름 중복 방지: 같은 Workflow 내에서 스케줄 이름은 고유해야 합니다.
    schedules: [
      { name: "backup_hourly", ... },
      { name: "backup_daily", ... },  // 다른 이름
    ]
    
  4. 타임존 설정: 스케줄은 Sonamu.config.api.timezone을 따릅니다.
    // sonamu.config.ts
    api: {
      timezone: "Asia/Seoul",
    }
    
  5. 에러 처리: Workflow가 실패하면 자동으로 재시도됩니다. 중요한 작업은 try-catch로 보호하세요.
    await step.define({ name: "task" }, async () => {
      try {
        await riskyOperation();
      } catch (error) {
        logger.error("Operation failed", { error });
        throw error;
      }
    }).run();
    

다음 단계