๋ฉ”์ธ ์ฝ˜ํ…์ธ ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
Workflow๋Š” ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ์‹œ์Šคํ…œ์ž…๋‹ˆ๋‹ค. ์ด๋ฉ”์ผ ๋ฐœ์†ก, ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ, ์Šค์ผ€์ค„ ์ž‘์—… ๋“ฑ์— ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

Tasks ์„ค์ •

// sonamu.config.ts
import { defineConfig } from "sonamu";

export default defineConfig({
  // ...
  tasks: {
    enableWorker: true,  // Worker ํ™œ์„ฑํ™”
    workerOptions: {
      concurrency: 4,      // ๋™์‹œ ์‹คํ–‰ ์ˆ˜
      usePubSub: true,     // Pub/Sub ์‚ฌ์šฉ
      listenDelay: 500,    // ์ˆ˜์‹  ์ง€์—ฐ (ms)
    },
    contextProvider: async (baseContext) => {
      // Workflow์—์„œ ์‚ฌ์šฉํ•  Context ์„ค์ •
      return {
        ...baseContext,
        // ์ปค์Šคํ…€ ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€ ๊ฐ€๋Šฅ
      };
    },
  },
});
์˜ต์…˜:
  • enableWorker: Worker ํ™œ์„ฑํ™” ์—ฌ๋ถ€ (๊ธฐ๋ณธ: daemon ๋ชจ๋“œ์—์„œ๋งŒ true)
  • workerOptions.concurrency: ๋™์‹œ ์‹คํ–‰ ์ž‘์—… ์ˆ˜ (๊ธฐ๋ณธ: CPU ์ฝ”์–ด - 1)
  • workerOptions.usePubSub: PostgreSQL LISTEN/NOTIFY ์‚ฌ์šฉ (๊ธฐ๋ณธ: true)
  • workerOptions.listenDelay: Pub/Sub ๋ฉ”์‹œ์ง€ ์ˆ˜์‹  ํ›„ ๋Œ€๊ธฐ ์‹œ๊ฐ„ (๊ธฐ๋ณธ: 500ms)
  • contextProvider: Workflow ๋‚ด์—์„œ ์‚ฌ์šฉํ•  Context ์ƒ์„ฑ ํ•จ์ˆ˜

WorkflowManager

Sonamu๋Š” WorkflowManager๋ฅผ ์ž๋™์œผ๋กœ ์ƒ์„ฑํ•˜๊ณ  ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค. Sonamu.workflows๋กœ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
import { Sonamu } from "sonamu";

// Workflow ์‹คํ–‰
const workflowManager = Sonamu.workflows;
const handle = await workflowManager.run(
  {
    name: "send_email",
    version: null,
  },
  { email: "[email protected]" }
);

// ๊ฒฐ๊ณผ ๋Œ€๊ธฐ
const result = await handle.result();

Worker ๋ชจ๋“œ

1. Daemon ๋ชจ๋“œ (์ž๋™ ํ™œ์„ฑํ™”)

# Worker๊ฐ€ ์ž๋™์œผ๋กœ ์‹œ์ž‘๋จ
pnpm sonamu daemon
Daemon ๋ชจ๋“œ์—์„œ๋Š” enableWorker: true๊ฐ€ ๊ธฐ๋ณธ๊ฐ’์ž…๋‹ˆ๋‹ค.

2. ๋ณ„๋„ Worker ํ”„๋กœ์„ธ์Šค

// src/worker.ts
import { Sonamu } from "sonamu";

async function startWorker() {
  await Sonamu.init(true, false);
  
  await Sonamu.workflows.startWorker();
  
  console.log("Worker started");
  
  // Graceful shutdown
  const shutdown = async () => {
    console.log("Stopping worker...");
    await Sonamu.workflows.stopWorker();
    process.exit(0);
  };
  
  process.on("SIGTERM", shutdown);
  process.on("SIGINT", shutdown);
}

startWorker();
// package.json
{
  "scripts": {
    "worker": "tsx src/worker.ts",
    "worker:dev": "tsx watch src/worker.ts"
  }
}
# Worker ์‹คํ–‰
pnpm worker

3. API ์„œ๋ฒ„ + Worker

// sonamu.config.ts
export default defineConfig({
  tasks: {
    enableWorker: true,  // API ์„œ๋ฒ„์—์„œ๋„ Worker ์‹คํ–‰
    workerOptions: {
      concurrency: 2,  // API ์„œ๋ฒ„๋Š” ์ ์€ concurrency
    },
    contextProvider: async (baseContext) => baseContext,
  },
});

๋ถ„์‚ฐ Worker

์—ฌ๋Ÿฌ ์„œ๋ฒ„์—์„œ Worker๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
// Server 1, 2, 3...
tasks: {
  enableWorker: true,
  workerOptions: {
    concurrency: 4,
    usePubSub: true,
  },
}
PostgreSQL์ด ์ž‘์—… ํ ์—ญํ• ์„ ํ•˜๋ฉฐ, Pub/Sub์œผ๋กœ Worker ๊ฐ„ ํ†ต์‹ ํ•ฉ๋‹ˆ๋‹ค.

Context Provider

Workflow์—์„œ ์‚ฌ์šฉํ•  Context๋ฅผ ์ปค์Šคํ„ฐ๋งˆ์ด์ง•ํ•ฉ๋‹ˆ๋‹ค.
// sonamu.config.ts
tasks: {
  enableWorker: true,
  contextProvider: async (baseContext) => {
    // ์‹œ์Šคํ…œ ์‚ฌ์šฉ์ž ์กฐํšŒ
    const systemUser = await UserModel.findOne({
      wq: [['email', '=', '[email protected]']],
    });
    
    return {
      ...baseContext,
      user: systemUser,
      customData: "...",
    };
  },
}
Workflow ํ•จ์ˆ˜ ๋‚ด์—์„œ Sonamu.getContext()๋กœ ์ ‘๊ทผ:
import { workflow } from "sonamu";
import { Sonamu } from "sonamu";

export const myWorkflow = workflow(
  { name: "my_workflow" },
  async ({ input, step }) => {
    const context = Sonamu.getContext();
    console.log(context.user);  // systemUser
  }
);

ํ…Œ์ด๋ธ” ๊ตฌ์กฐ

Sonamu๋Š” ๋‹ค์Œ ํ…Œ์ด๋ธ”์„ ์ž๋™ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค:
-- Workflow ์ •์˜
CREATE TABLE workflows (
  id UUID PRIMARY KEY,
  name TEXT NOT NULL,
  version TEXT,
  ...
);

-- Workflow ์‹คํ–‰ ๊ธฐ๋ก
CREATE TABLE workflow_runs (
  id UUID PRIMARY KEY,
  workflow_id UUID REFERENCES workflows(id),
  status TEXT,
  input JSONB,
  output JSONB,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  ...
);

-- Step ์‹คํ–‰ ๊ธฐ๋ก
CREATE TABLE workflow_steps (
  id UUID PRIMARY KEY,
  workflow_run_id UUID REFERENCES workflow_runs(id),
  name TEXT,
  status TEXT,
  ...
);

๋ชจ๋‹ˆํ„ฐ๋ง

์‹คํ–‰ ์ค‘์ธ Workflow ํ™•์ธ

// DB ์ง์ ‘ ์กฐํšŒ
import { DB } from "sonamu";

const knex = DB.getDB();
const runs = await knex('workflow_runs')
  .where('status', 'running')
  .select('*');

๋กœ๊ทธ

export const myWorkflow = workflow(
  { name: "my_workflow" },
  async ({ logger, input }) => {
    logger.info("Workflow started", { input });
    logger.error("Error occurred", { error });
  }
);

๋‹ค์Œ ๋‹จ๊ณ„