๋ฉ”์ธ ์ฝ˜ํ…์ธ ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
Sonamu๋Š” ๋น„๋™๊ธฐ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” Workflow ์‹œ์Šคํ…œ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ๊ธด ์ž‘์—…์„ ๋ฐฑ๊ทธ๋ผ์šด๋“œ๋กœ ๋ถ„๋ฆฌํ•˜์—ฌ API ์‘๋‹ต ์†๋„๋ฅผ ๊ฐœ์„ ํ•˜๊ณ , Worker ํ”„๋กœ์„ธ์Šค๋กœ ์ž‘์—…์„ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ธฐ๋ณธ ๊ตฌ์กฐ

import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 1,
      usePubSub: true,
      listenDelay: 500,
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "127.0.0.1",
      session: {},
    }),
  },
  // ...
});

enableWorker

Worker ํ”„๋กœ์„ธ์Šค๋ฅผ ํ™œ์„ฑํ™”ํ• ์ง€ ๊ฒฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ํƒ€์ž…: boolean (์„ ํƒ์ ) ๊ธฐ๋ณธ๊ฐ’: false
export default defineConfig({
  tasks: {
    enableWorker: true,  // Worker ํ™œ์„ฑํ™”
  },
});
Worker๋ž€?
  • ๋ณ„๋„ ํ”„๋กœ์„ธ์Šค์—์„œ ์ž‘์—…์„ ์ฒ˜๋ฆฌ
  • ๋ฉ”์ธ API ์„œ๋ฒ„์— ์˜ํ–ฅ ์—†์Œ
  • ์—ฌ๋Ÿฌ Worker๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์—์„œ๋Š” enableWorker: true๋กœ ์„ค์ •ํ•˜์—ฌ Worker ์ „์šฉ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅํ•ฉ๋‹ˆ๋‹ค.

ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋กœ ์ œ์–ด

export default defineConfig({
  tasks: {
    enableWorker: !["true", "1"].includes(
      process.env.DISABLE_WORKER ?? "false"
    ),
  },
});
.env:
# Worker ๋น„ํ™œ์„ฑํ™” (๊ฐœ๋ฐœ)
DISABLE_WORKER=true

# Worker ํ™œ์„ฑํ™” (ํ”„๋กœ๋•์…˜)
DISABLE_WORKER=false

workerOptions

Worker์˜ ๋™์ž‘ ๋ฐฉ์‹์„ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค. ํƒ€์ž…: WorkflowOptions (์„ ํƒ์ )
type WorkflowOptions = {
  concurrency?: number;
  usePubSub?: boolean;
  listenDelay?: number;
};

concurrency

Worker๊ฐ€ ๋™์‹œ์— ์ฒ˜๋ฆฌํ•  ์ž‘์—… ์ˆ˜์ž…๋‹ˆ๋‹ค. ํƒ€์ž…: number ๊ธฐ๋ณธ๊ฐ’: 1
export default defineConfig({
  tasks: {
    workerOptions: {
      concurrency: 1,  // ํ•œ ๋ฒˆ์— 1๊ฐœ ์ž‘์—…๋งŒ
    },
  },
});
๊ถŒ์žฅ๊ฐ’:
  • 1 - ๋‹จ์ˆœํ•œ ์ž‘์—…, ์ˆœ์ฐจ ์ฒ˜๋ฆฌ ํ•„์š”
  • 2-5 - ์ผ๋ฐ˜์ ์ธ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…
  • 10+ - ๊ฐ€๋ฒผ์šด I/O ์œ„์ฃผ ์ž‘์—…
concurrency๋ฅผ ๋†’์ด๋ฉด ๋” ๋งŽ์€ ์ž‘์—…์„ ๋™์‹œ์— ์ฒ˜๋ฆฌํ•˜์ง€๋งŒ, CPU์™€ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ์ฆ๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

usePubSub

๋‹ค์ค‘ Worker ๊ฐ„ ์ž‘์—… ๋ถ„์‚ฐ์„ ์œ„ํ•ด Pub/Sub์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ํƒ€์ž…: boolean ๊ธฐ๋ณธ๊ฐ’: false
export default defineConfig({
  tasks: {
    workerOptions: {
      usePubSub: true,  // Redis Pub/Sub ์‚ฌ์šฉ
    },
  },
});
๋™์ž‘ ๋ฐฉ์‹:
  • false: ๊ฐ Worker๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ์ž‘์—… ํ’€๋ง
  • true: Redis Pub/Sub์œผ๋กœ ์ž‘์—… ์•Œ๋ฆผ ์ „ํŒŒ
์—ฌ๋Ÿฌ Worker๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ฒฝ์šฐ usePubSub: true๋ฅผ ๊ถŒ์žฅํ•ฉ๋‹ˆ๋‹ค. ์ž‘์—… ํ• ๋‹น์ด ๋” ํšจ์œจ์ ์ž…๋‹ˆ๋‹ค.

listenDelay

์ž‘์—…์„ ํ™•์ธํ•˜๋Š” ์ฃผ๊ธฐ(๋ฐ€๋ฆฌ์ดˆ)์ž…๋‹ˆ๋‹ค. ํƒ€์ž…: number (ms) ๊ธฐ๋ณธ๊ฐ’: 500 (0.5์ดˆ)
export default defineConfig({
  tasks: {
    workerOptions: {
      listenDelay: 500,  // 0.5์ดˆ๋งˆ๋‹ค ์ž‘์—… ํ™•์ธ
    },
  },
});
๊ถŒ์žฅ๊ฐ’:
  • 100-500ms - ๋น ๋ฅธ ์‘๋‹ต์ด ํ•„์š”ํ•œ ์ž‘์—…
  • 500-1000ms - ์ผ๋ฐ˜์ ์ธ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…
  • 1000-5000ms - ๊ธด๊ธ‰ํ•˜์ง€ ์•Š์€ ์ž‘์—…

contextProvider

Worker์—์„œ ์‹คํ–‰๋˜๋Š” ์ž‘์—…์˜ Context๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ํƒ€์ž…: (defaultContext) => Context | Promise<Context>
export default defineConfig({
  tasks: {
    contextProvider: (defaultContext) => {
      return {
        ...defaultContext,
        ip: "127.0.0.1",
        session: {},
      };
    },
  },
});
defaultContext ํฌํ•จ ํ•„๋“œ:
  • reply - Fastify reply ๊ฐ์ฒด (null in worker)
  • request - Fastify request ๊ฐ์ฒด (null in worker)
  • headers - ์š”์ฒญ ํ—ค๋”
  • createSSE - SSE ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ
  • naiteStore - Naite ๋กœ๊น… ์Šคํ† ์–ด

์ปค์Šคํ…€ Context

export default defineConfig({
  tasks: {
    contextProvider: (defaultContext) => {
      return {
        ...defaultContext,
        ip: "worker",
        session: {},
        config: {
          apiUrl: process.env.API_URL,
        },
        logger: console,
      };
    },
  },
});
Worker Context์—์„œ๋Š” request์™€ reply๊ฐ€ null์ž…๋‹ˆ๋‹ค. HTTP ๊ด€๋ จ ๊ธฐ๋Šฅ์€ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

๊ธฐ๋ณธ ์˜ˆ์‹œ

๋‹จ์ผ Worker (๊ฐœ๋ฐœ)

import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    enableWorker: process.env.NODE_ENV === "production",
    workerOptions: {
      concurrency: 1,
      usePubSub: false,  // ๋‹จ์ผ Worker
      listenDelay: 500,
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "127.0.0.1",
      session: {},
    }),
  },
});

๋‹ค์ค‘ Worker (ํ”„๋กœ๋•์…˜)

import { defineConfig } from "sonamu";

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 5,
      usePubSub: true,   // Redis Pub/Sub์œผ๋กœ ๋ถ„์‚ฐ
      listenDelay: 500,
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "worker",
      session: {},
    }),
  },
});

์‹ค์ „ ์˜ˆ์‹œ

ํ™˜๊ฒฝ๋ณ„ ์„ค์ •

import { defineConfig } from "sonamu";

const isDev = process.env.NODE_ENV === "development";
const isProd = process.env.NODE_ENV === "production";

export default defineConfig({
  tasks: {
    // ๊ฐœ๋ฐœ: Worker ์—†์Œ (๋ฉ”์ธ ํ”„๋กœ์„ธ์Šค์—์„œ ์ฒ˜๋ฆฌ)
    // ํ”„๋กœ๋•์…˜: Worker ํ™œ์„ฑํ™”
    enableWorker: isProd,
    
    workerOptions: {
      concurrency: isProd ? 5 : 1,
      usePubSub: isProd,
      listenDelay: isDev ? 100 : 500,  // ๊ฐœ๋ฐœ: ๋น ๋ฅธ ์‘๋‹ต
    },
    
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: isProd ? "worker" : "127.0.0.1",
      session: {},
    }),
  },
});

๊ณ ๋ถ€ํ•˜ ์›Œํฌํ”Œ๋กœ์šฐ

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 10,   // ๋™์‹œ์— 10๊ฐœ ์ž‘์—…
      usePubSub: true,   // ๋‹ค์ค‘ Worker ์ง€์›
      listenDelay: 1000, // 1์ดˆ ๊ฐ„๊ฒฉ
    },
    contextProvider: (defaultContext) => ({
      ...defaultContext,
      ip: "worker",
      session: {},
      logger: {
        info: (msg) => console.log(`[Worker] ${msg}`),
        error: (msg) => console.error(`[Worker Error] ${msg}`),
      },
    }),
  },
});

์ปค์Šคํ…€ Context

export default defineConfig({
  tasks: {
    enableWorker: true,
    workerOptions: {
      concurrency: 3,
      usePubSub: true,
      listenDelay: 500,
    },
    contextProvider: async (defaultContext) => {
      // ๋น„๋™๊ธฐ ์ดˆ๊ธฐํ™”
      const config = await loadConfig();
      
      return {
        ...defaultContext,
        ip: "worker",
        session: {},
        config,
        services: {
          email: new EmailService(),
          notification: new NotificationService(),
        },
      };
    },
  },
});

Workflow ์‚ฌ์šฉ

tasks ์„ค์ • ํ›„ @workflow ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋กœ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.
import { workflow, step } from "sonamu";

export class EmailModel {
  @workflow()
  static async sendWelcomeEmail(userId: number) {
    const user = await step("fetch-user", async () => {
      return UserModel.findById(userId);
    });
    
    await step("send-email", async () => {
      return sendEmail(user.email, "Welcome!");
    });
  }
}
ํ˜ธ์ถœ:
// ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ ์‹คํ–‰
await EmailModel.sendWelcomeEmail(123);
โ†’ Workflow ์‚ฌ์šฉ๋ฒ•

Worker ์‹คํ–‰

๊ฐœ๋ฐœ ํ™˜๊ฒฝ

๊ฐœ๋ฐœ ํ™˜๊ฒฝ์—์„œ๋Š” enableWorker: false๋กœ ๋‘๊ณ  ๋ฉ”์ธ ํ”„๋กœ์„ธ์Šค์—์„œ ์ฒ˜๋ฆฌ:
pnpm dev

ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ

Worker ์ „์šฉ ํ”„๋กœ์„ธ์Šค๋ฅผ ๋ณ„๋„๋กœ ์‹คํ–‰:
# API ์„œ๋ฒ„
pnpm start

# Worker ํ”„๋กœ์„ธ์Šค (๋ณ„๋„ ํ„ฐ๋ฏธ๋„)
pnpm start:worker
package.json:
{
  "scripts": {
    "start": "node dist/server.js",
    "start:worker": "ENABLE_WORKER=true node dist/worker.js"
  }
}

๋‹ค์ค‘ Worker

์—ฌ๋Ÿฌ Worker๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ์ฒ˜๋ฆฌ๋Ÿ‰ ์ฆ๊ฐ€:
# Worker 1
ENABLE_WORKER=true node dist/worker.js

# Worker 2
ENABLE_WORKER=true node dist/worker.js

# Worker 3
ENABLE_WORKER=true node dist/worker.js
PM2๋‚˜ Docker๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์—ฌ๋Ÿฌ Worker๋ฅผ ์‰ฝ๊ฒŒ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Redis ์„ค์ • (usePubSub: true)

usePubSub: true๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด Redis๊ฐ€ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

Redis ์—ฐ๊ฒฐ

Redis๋Š” cache ์„ค์ •์—์„œ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค:
import { createClient } from "redis";

const redisConnection = createClient({
  url: process.env.REDIS_URL ?? "redis://localhost:6379",
});

await redisConnection.connect();

export default defineConfig({
  server: {
    cache: {
      default: "main",
      stores: {
        main: store()
          .useL2Layer(drivers.redis({ connection: redisConnection }))
          .useBus(drivers.redisBus({ connection: redisConnection })),
      },
    },
  },
  
  tasks: {
    enableWorker: true,
    workerOptions: {
      usePubSub: true,  // Redis Pub/Sub ์‚ฌ์šฉ
    },
  },
});

์ฃผ์˜์‚ฌํ•ญ

1. Worker Context ์ œํ•œ

// โŒ ๋‚˜์œ ์˜ˆ: Worker์—์„œ HTTP ์‘๋‹ต ์‹œ๋„
@workflow()
static async processData() {
  const ctx = getContext();
  ctx.reply.send({ done: true });  // Worker์—์„œ๋Š” null!
}

// โœ… ์ข‹์€ ์˜ˆ: ๊ฒฐ๊ณผ๋ฅผ DB์— ์ €์žฅ
@workflow()
static async processData() {
  const result = await heavyProcessing();
  await ResultModel.save({ data: result });
}

2. enableWorker ์„ค์ •

// โŒ ๋‚˜์œ ์˜ˆ: ํ”„๋กœ๋•์…˜์—์„œ Worker ์—†์Œ
enableWorker: false  // ๋ชจ๋“  ์ž‘์—…์ด ๋ฉ”์ธ ํ”„๋กœ์„ธ์Šค์—์„œ!

// โœ… ์ข‹์€ ์˜ˆ: ํ™˜๊ฒฝ๋ณ„๋กœ ๋‹ค๋ฅด๊ฒŒ
enableWorker: process.env.NODE_ENV === "production"

3. concurrency ๊ณผ๋‹ค ์„ค์ •

// โŒ ๋‚˜์œ ์˜ˆ: ๋„ˆ๋ฌด ๋†’์€ ๋™์‹œ์„ฑ
workerOptions: {
  concurrency: 100,  // CPU์™€ ๋ฉ”๋ชจ๋ฆฌ ๋ถ€์กฑ ์œ„ํ—˜
}

// โœ… ์ข‹์€ ์˜ˆ: ์ ์ ˆํ•œ ์ˆ˜์ค€
workerOptions: {
  concurrency: 5,  // ์„œ๋ฒ„ ๋ฆฌ์†Œ์Šค์— ๋งž๊ฒŒ
}

๋‹ค์Œ ๋‹จ๊ณ„

Task ์„ค์ •์„ ์™„๋ฃŒํ–ˆ๋‹ค๋ฉด:
  • workflows - Workflow ์ •์˜ํ•˜๊ธฐ
  • steps - Step์œผ๋กœ ์ž‘์—… ๋ถ„๋ฆฌ
  • error-handling - ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๋ฐ ์žฌ์‹œ๋„