메인 콘텐츠로 건너뛰기
Workflow는 백그라운드 작업을 실행하는 시스템입니다. 이메일 발송, 데이터 처리, 스케줄 작업 등에 사용합니다.

Tasks 설정

// sonamu.config.ts
import { defineConfig } from "sonamu";

export default defineConfig({
  // ...
  tasks: {
    enableWorker: true,  // Worker 활성화
    workerOptions: {
      concurrency: 4,      // 동시 실행 수
      usePubSub: true,     // Pub/Sub 사용
      listenDelay: 500,    // 수신 지연 (ms)
    },
    contextProvider: async (baseContext) => {
      // Workflow에서 사용할 Context 설정
      return {
        ...baseContext,
        // 커스텀 데이터 추가 가능
      };
    },
  },
});
옵션:
  • enableWorker: Worker 활성화 여부 (기본: daemon 모드에서만 true)
  • workerOptions.concurrency: 동시 실행 작업 수 (기본: CPU 코어 - 1)
  • workerOptions.usePubSub: PostgreSQL LISTEN/NOTIFY 사용 (기본: true)
  • workerOptions.listenDelay: Pub/Sub 메시지 수신 후 대기 시간 (기본: 500ms)
  • contextProvider: Workflow 내에서 사용할 Context 생성 함수

WorkflowManager

Sonamu는 WorkflowManager를 자동으로 생성하고 관리합니다. Sonamu.workflows로 접근할 수 있습니다.
import { Sonamu } from "sonamu";

// Workflow 실행
const workflowManager = Sonamu.workflows;
const handle = await workflowManager.run(
  {
    name: "send_email",
    version: null,
  },
  { email: "[email protected]" }
);

// 결과 대기
const result = await handle.result();

Worker 모드

1. Daemon 모드 (자동 활성화)

# Worker가 자동으로 시작됨
pnpm sonamu daemon
Daemon 모드에서는 enableWorker: true가 기본값입니다.

2. 별도 Worker 프로세스

// src/worker.ts
import { Sonamu } from "sonamu";

async function startWorker() {
  await Sonamu.init(true, false);
  
  await Sonamu.workflows.startWorker();
  
  console.log("Worker started");
  
  // Graceful shutdown
  const shutdown = async () => {
    console.log("Stopping worker...");
    await Sonamu.workflows.stopWorker();
    process.exit(0);
  };
  
  process.on("SIGTERM", shutdown);
  process.on("SIGINT", shutdown);
}

startWorker();
// package.json
{
  "scripts": {
    "worker": "tsx src/worker.ts",
    "worker:dev": "tsx watch src/worker.ts"
  }
}
# Worker 실행
pnpm worker

3. API 서버 + Worker

// sonamu.config.ts
export default defineConfig({
  tasks: {
    enableWorker: true,  // API 서버에서도 Worker 실행
    workerOptions: {
      concurrency: 2,  // API 서버는 적은 concurrency
    },
    contextProvider: async (baseContext) => baseContext,
  },
});

분산 Worker

여러 서버에서 Worker를 실행할 수 있습니다.
// Server 1, 2, 3...
tasks: {
  enableWorker: true,
  workerOptions: {
    concurrency: 4,
    usePubSub: true,
  },
}
PostgreSQL이 작업 큐 역할을 하며, Pub/Sub으로 Worker 간 통신합니다.

Context Provider

Workflow에서 사용할 Context를 커스터마이징합니다.
// sonamu.config.ts
tasks: {
  enableWorker: true,
  contextProvider: async (baseContext) => {
    // 시스템 사용자 조회
    const systemUser = await UserModel.findOne({
      wq: [['email', '=', '[email protected]']],
    });
    
    return {
      ...baseContext,
      user: systemUser,
      customData: "...",
    };
  },
}
Workflow 함수 내에서 Sonamu.getContext()로 접근:
import { workflow } from "sonamu";
import { Sonamu } from "sonamu";

export const myWorkflow = workflow(
  { name: "my_workflow" },
  async ({ input, step }) => {
    const context = Sonamu.getContext();
    console.log(context.user);  // systemUser
  }
);

테이블 구조

Sonamu는 다음 테이블을 자동 생성합니다:
-- Workflow 정의
CREATE TABLE workflows (
  id UUID PRIMARY KEY,
  name TEXT NOT NULL,
  version TEXT,
  ...
);

-- Workflow 실행 기록
CREATE TABLE workflow_runs (
  id UUID PRIMARY KEY,
  workflow_id UUID REFERENCES workflows(id),
  status TEXT,
  input JSONB,
  output JSONB,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  ...
);

-- Step 실행 기록
CREATE TABLE workflow_steps (
  id UUID PRIMARY KEY,
  workflow_run_id UUID REFERENCES workflow_runs(id),
  name TEXT,
  status TEXT,
  ...
);

모니터링

실행 중인 Workflow 확인

// DB 직접 조회
import { DB } from "sonamu";

const knex = DB.getDB();
const runs = await knex('workflow_runs')
  .where('status', 'running')
  .select('*');

로그

export const myWorkflow = workflow(
  { name: "my_workflow" },
  async ({ logger, input }) => {
    logger.info("Workflow started", { input });
    logger.error("Error occurred", { error });
  }
);

다음 단계