Sonamu๋ ๋น๋๊ธฐ ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
์ ์ฒ๋ฆฌํ๋ Workflow ์์คํ
์ ์ ๊ณตํฉ๋๋ค. ๊ธด ์์
์ ๋ฐฑ๊ทธ๋ผ์ด๋๋ก ๋ถ๋ฆฌํ์ฌ API ์๋ต ์๋๋ฅผ ๊ฐ์ ํ๊ณ , Worker ํ๋ก์ธ์ค๋ก ์์
์ ๋ถ์ฐ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
๊ธฐ๋ณธ ๊ตฌ์กฐ
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 1,
usePubSub: true,
listenDelay: 500,
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "127.0.0.1",
session: {},
}),
},
// ...
});
enableWorker
Worker ํ๋ก์ธ์ค๋ฅผ ํ์ฑํํ ์ง ๊ฒฐ์ ํฉ๋๋ค.
ํ์
: boolean (์ ํ์ )
๊ธฐ๋ณธ๊ฐ: false
export default defineConfig({
tasks: {
enableWorker: true, // Worker ํ์ฑํ
},
});
Worker๋?
- ๋ณ๋ ํ๋ก์ธ์ค์์ ์์
์ ์ฒ๋ฆฌ
- ๋ฉ์ธ API ์๋ฒ์ ์ํฅ ์์
- ์ฌ๋ฌ Worker๋ฅผ ์คํํ์ฌ ๋ถ์ฐ ์ฒ๋ฆฌ ๊ฐ๋ฅ
ํ๋ก๋์
ํ๊ฒฝ์์๋ enableWorker: true๋ก ์ค์ ํ์ฌ Worker ์ ์ฉ ํ๋ก์ธ์ค๋ฅผ ์คํํ๋ ๊ฒ์ ๊ถ์ฅํฉ๋๋ค.
ํ๊ฒฝ ๋ณ์๋ก ์ ์ด
export default defineConfig({
tasks: {
enableWorker: !["true", "1"].includes(
process.env.DISABLE_WORKER ?? "false"
),
},
});
.env:
# Worker ๋นํ์ฑํ (๊ฐ๋ฐ)
DISABLE_WORKER=true
# Worker ํ์ฑํ (ํ๋ก๋์
)
DISABLE_WORKER=false
workerOptions
Worker์ ๋์ ๋ฐฉ์์ ์ค์ ํฉ๋๋ค.
ํ์
: WorkflowOptions (์ ํ์ )
type WorkflowOptions = {
concurrency?: number;
usePubSub?: boolean;
listenDelay?: number;
};
concurrency
Worker๊ฐ ๋์์ ์ฒ๋ฆฌํ ์์
์์
๋๋ค.
ํ์
: number
๊ธฐ๋ณธ๊ฐ: 1
export default defineConfig({
tasks: {
workerOptions: {
concurrency: 1, // ํ ๋ฒ์ 1๊ฐ ์์
๋ง
},
},
});
๊ถ์ฅ๊ฐ:
1 - ๋จ์ํ ์์
, ์์ฐจ ์ฒ๋ฆฌ ํ์
2-5 - ์ผ๋ฐ์ ์ธ ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
10+ - ๊ฐ๋ฒผ์ด I/O ์์ฃผ ์์
concurrency๋ฅผ ๋์ด๋ฉด ๋ ๋ง์ ์์
์ ๋์์ ์ฒ๋ฆฌํ์ง๋ง, CPU์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ฆ๊ฐํฉ๋๋ค.
usePubSub
๋ค์ค Worker ๊ฐ ์์
๋ถ์ฐ์ ์ํด Pub/Sub์ ์ฌ์ฉํฉ๋๋ค.
ํ์
: boolean
๊ธฐ๋ณธ๊ฐ: false
export default defineConfig({
tasks: {
workerOptions: {
usePubSub: true, // Redis Pub/Sub ์ฌ์ฉ
},
},
});
๋์ ๋ฐฉ์:
false: ๊ฐ Worker๊ฐ ๋
๋ฆฝ์ ์ผ๋ก ์์
ํ๋ง
true: Redis Pub/Sub์ผ๋ก ์์
์๋ฆผ ์ ํ
์ฌ๋ฌ Worker๋ฅผ ์คํํ๋ ๊ฒฝ์ฐ usePubSub: true๋ฅผ ๊ถ์ฅํฉ๋๋ค. ์์
ํ ๋น์ด ๋ ํจ์จ์ ์
๋๋ค.
listenDelay
์์
์ ํ์ธํ๋ ์ฃผ๊ธฐ(๋ฐ๋ฆฌ์ด)์
๋๋ค.
ํ์
: number (ms)
๊ธฐ๋ณธ๊ฐ: 500 (0.5์ด)
export default defineConfig({
tasks: {
workerOptions: {
listenDelay: 500, // 0.5์ด๋ง๋ค ์์
ํ์ธ
},
},
});
๊ถ์ฅ๊ฐ:
100-500ms - ๋น ๋ฅธ ์๋ต์ด ํ์ํ ์์
500-1000ms - ์ผ๋ฐ์ ์ธ ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
1000-5000ms - ๊ธด๊ธํ์ง ์์ ์์
contextProvider
Worker์์ ์คํ๋๋ ์์
์ Context๋ฅผ ์์ฑํฉ๋๋ค.
ํ์
: (defaultContext) => Context | Promise<Context>
export default defineConfig({
tasks: {
contextProvider: (defaultContext) => {
return {
...defaultContext,
ip: "127.0.0.1",
session: {},
};
},
},
});
defaultContext ํฌํจ ํ๋:
reply - Fastify reply ๊ฐ์ฒด (null in worker)
request - Fastify request ๊ฐ์ฒด (null in worker)
headers - ์์ฒญ ํค๋
createSSE - SSE ์คํธ๋ฆผ ์์ฑ
naiteStore - Naite ๋ก๊น
์คํ ์ด
์ปค์คํ
Context
export default defineConfig({
tasks: {
contextProvider: (defaultContext) => {
return {
...defaultContext,
ip: "worker",
session: {},
config: {
apiUrl: process.env.API_URL,
},
logger: console,
};
},
},
});
Worker Context์์๋ request์ reply๊ฐ null์
๋๋ค. HTTP ๊ด๋ จ ๊ธฐ๋ฅ์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๊ธฐ๋ณธ ์์
๋จ์ผ Worker (๊ฐ๋ฐ)
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
enableWorker: process.env.NODE_ENV === "production",
workerOptions: {
concurrency: 1,
usePubSub: false, // ๋จ์ผ Worker
listenDelay: 500,
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "127.0.0.1",
session: {},
}),
},
});
๋ค์ค Worker (ํ๋ก๋์
)
import { defineConfig } from "sonamu";
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 5,
usePubSub: true, // Redis Pub/Sub์ผ๋ก ๋ถ์ฐ
listenDelay: 500,
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "worker",
session: {},
}),
},
});
์ค์ ์์
ํ๊ฒฝ๋ณ ์ค์
import { defineConfig } from "sonamu";
const isDev = process.env.NODE_ENV === "development";
const isProd = process.env.NODE_ENV === "production";
export default defineConfig({
tasks: {
// ๊ฐ๋ฐ: Worker ์์ (๋ฉ์ธ ํ๋ก์ธ์ค์์ ์ฒ๋ฆฌ)
// ํ๋ก๋์
: Worker ํ์ฑํ
enableWorker: isProd,
workerOptions: {
concurrency: isProd ? 5 : 1,
usePubSub: isProd,
listenDelay: isDev ? 100 : 500, // ๊ฐ๋ฐ: ๋น ๋ฅธ ์๋ต
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: isProd ? "worker" : "127.0.0.1",
session: {},
}),
},
});
๊ณ ๋ถํ ์ํฌํ๋ก์ฐ
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 10, // ๋์์ 10๊ฐ ์์
usePubSub: true, // ๋ค์ค Worker ์ง์
listenDelay: 1000, // 1์ด ๊ฐ๊ฒฉ
},
contextProvider: (defaultContext) => ({
...defaultContext,
ip: "worker",
session: {},
logger: {
info: (msg) => console.log(`[Worker] ${msg}`),
error: (msg) => console.error(`[Worker Error] ${msg}`),
},
}),
},
});
์ปค์คํ
Context
export default defineConfig({
tasks: {
enableWorker: true,
workerOptions: {
concurrency: 3,
usePubSub: true,
listenDelay: 500,
},
contextProvider: async (defaultContext) => {
// ๋น๋๊ธฐ ์ด๊ธฐํ
const config = await loadConfig();
return {
...defaultContext,
ip: "worker",
session: {},
config,
services: {
email: new EmailService(),
notification: new NotificationService(),
},
};
},
},
});
Workflow ์ฌ์ฉ
tasks ์ค์ ํ @workflow ๋ฐ์ฝ๋ ์ดํฐ๋ก ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
์ ์ ์ํฉ๋๋ค.
import { workflow, step } from "sonamu";
export class EmailModel {
@workflow()
static async sendWelcomeEmail(userId: number) {
const user = await step("fetch-user", async () => {
return UserModel.findById(userId);
});
await step("send-email", async () => {
return sendEmail(user.email, "Welcome!");
});
}
}
ํธ์ถ:
// ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์คํ
await EmailModel.sendWelcomeEmail(123);
โ Workflow ์ฌ์ฉ๋ฒ
Worker ์คํ
๊ฐ๋ฐ ํ๊ฒฝ
๊ฐ๋ฐ ํ๊ฒฝ์์๋ enableWorker: false๋ก ๋๊ณ ๋ฉ์ธ ํ๋ก์ธ์ค์์ ์ฒ๋ฆฌ:
ํ๋ก๋์
ํ๊ฒฝ
Worker ์ ์ฉ ํ๋ก์ธ์ค๋ฅผ ๋ณ๋๋ก ์คํ:
# API ์๋ฒ
pnpm start
# Worker ํ๋ก์ธ์ค (๋ณ๋ ํฐ๋ฏธ๋)
pnpm start:worker
package.json:
{
"scripts": {
"start": "node dist/server.js",
"start:worker": "ENABLE_WORKER=true node dist/worker.js"
}
}
๋ค์ค Worker
์ฌ๋ฌ Worker๋ฅผ ์คํํ์ฌ ์ฒ๋ฆฌ๋ ์ฆ๊ฐ:
# Worker 1
ENABLE_WORKER=true node dist/worker.js
# Worker 2
ENABLE_WORKER=true node dist/worker.js
# Worker 3
ENABLE_WORKER=true node dist/worker.js
PM2๋ Docker๋ฅผ ์ฌ์ฉํ๋ฉด ์ฌ๋ฌ Worker๋ฅผ ์ฝ๊ฒ ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
Redis ์ค์ (usePubSub: true)
usePubSub: true๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด Redis๊ฐ ํ์ํฉ๋๋ค.
Redis ์ฐ๊ฒฐ
Redis๋ cache ์ค์ ์์ ๊ตฌ์ฑ๋ฉ๋๋ค:
import { createClient } from "redis";
const redisConnection = createClient({
url: process.env.REDIS_URL ?? "redis://localhost:6379",
});
await redisConnection.connect();
export default defineConfig({
server: {
cache: {
default: "main",
stores: {
main: store()
.useL2Layer(drivers.redis({ connection: redisConnection }))
.useBus(drivers.redisBus({ connection: redisConnection })),
},
},
},
tasks: {
enableWorker: true,
workerOptions: {
usePubSub: true, // Redis Pub/Sub ์ฌ์ฉ
},
},
});
์ฃผ์์ฌํญ
1. Worker Context ์ ํ
// โ ๋์ ์: Worker์์ HTTP ์๋ต ์๋
@workflow()
static async processData() {
const ctx = getContext();
ctx.reply.send({ done: true }); // Worker์์๋ null!
}
// โ
์ข์ ์: ๊ฒฐ๊ณผ๋ฅผ DB์ ์ ์ฅ
@workflow()
static async processData() {
const result = await heavyProcessing();
await ResultModel.save({ data: result });
}
2. enableWorker ์ค์
// โ ๋์ ์: ํ๋ก๋์
์์ Worker ์์
enableWorker: false // ๋ชจ๋ ์์
์ด ๋ฉ์ธ ํ๋ก์ธ์ค์์!
// โ
์ข์ ์: ํ๊ฒฝ๋ณ๋ก ๋ค๋ฅด๊ฒ
enableWorker: process.env.NODE_ENV === "production"
3. concurrency ๊ณผ๋ค ์ค์
// โ ๋์ ์: ๋๋ฌด ๋์ ๋์์ฑ
workerOptions: {
concurrency: 100, // CPU์ ๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ ์ํ
}
// โ
์ข์ ์: ์ ์ ํ ์์ค
workerOptions: {
concurrency: 5, // ์๋ฒ ๋ฆฌ์์ค์ ๋ง๊ฒ
}
๋ค์ ๋จ๊ณ
Task ์ค์ ์ ์๋ฃํ๋ค๋ฉด: