@stream ๋ฐ์ฝ๋ ์ดํฐ๋ Server-Sent Events(SSE)๋ฅผ ์ฌ์ฉํ ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ API๋ฅผ ์์ฑํฉ๋๋ค.
๊ธฐ๋ณธ ์ฌ์ฉ๋ฒ
๋ณต์ฌ
import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";
const NotificationEventsSchema = z.object({
message: z.object({
id: z.number(),
text: z.string(),
timestamp: z.date()
}),
status: z.object({
type: z.enum(["connected", "disconnected"]),
userId: z.number()
})
});
class NotificationModelClass extends BaseModelClass {
@stream({
type: "sse",
events: NotificationEventsSchema
})
async subscribe(userId: number) {
const sse = Sonamu.getContext().createSSE(NotificationEventsSchema);
// ์ฐ๊ฒฐ ์์
sse.publish("status", {
type: "connected",
userId
});
// ์ฃผ๊ธฐ์ ์ผ๋ก ์ด๋ฒคํธ ์ ์ก
const interval = setInterval(() => {
sse.publish("message", {
id: Date.now(),
text: "New notification",
timestamp: new Date()
});
}, 5000);
// 5๋ถ ํ ์ฐ๊ฒฐ ์ข
๋ฃ
setTimeout(async () => {
clearInterval(interval);
sse.publish("status", {
type: "disconnected",
userId
});
await sse.end();
}, 300000);
return sse;
}
}
์ต์
type
์คํธ๋ฆฌ๋ฐ ํ์ ์ ์ง์ ํฉ๋๋ค. ํ์ ์ต์ ๋ณต์ฌ
type: "sse" // Server-Sent Events (ํ์ฌ ์ ์ผํ ์ต์
)
ํฅํ WebSocket ์ง์์ด ์ถ๊ฐ๋ ์ ์์ต๋๋ค.
events
์ ์กํ ์ด๋ฒคํธ์ ํ์ ์ Zod ์คํค๋ง๋ก ์ ์ํฉ๋๋ค. ํ์ ์ต์ ๋ณต์ฌ
import { z } from "zod";
const EventsSchema = z.object({
// ์ด๋ฒคํธ ์ด๋ฆ: ์ด๋ฒคํธ ๋ฐ์ดํฐ ์คํค๋ง
progress: z.object({
percent: z.number(),
message: z.string()
}),
complete: z.object({
result: z.string()
}),
error: z.object({
message: z.string(),
code: z.string()
})
});
@stream({
type: "sse",
events: EventsSchema
})
async process() {
const sse = Sonamu.getContext().createSSE(EventsSchema);
// ์ด๋ฒคํธ ์ ์ก
sse.publish("progress", { percent: 50, message: "Processing..." });
sse.publish("complete", { result: "Success" });
// ์คํธ๋ฆผ ์ข
๋ฃ
await sse.end();
return sse;
}
path
์คํธ๋ฆฌ๋ฐ ์๋ํฌ์ธํธ ๊ฒฝ๋ก๋ฅผ ์ง์ ํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ:/{modelName}/{methodName} (camelCase)
๋ณต์ฌ
@stream({
type: "sse",
events: EventsSchema,
path: "/stream/notifications"
})
async subscribe() {
// /stream/notifications๋ก ์ ๊ทผ
}
guards
์ ๊ทผ ๊ถํ์ ์ง์ ํฉ๋๋ค.๋ณต์ฌ
type GuardKey = "query" | "admin" | "user";
๋ณต์ฌ
@stream({
type: "sse",
events: EventsSchema,
guards: ["user"]
})
async subscribe() {
// ๋ก๊ทธ์ธํ ์ฌ์ฉ์๋ง ์ ๊ทผ ๊ฐ๋ฅ
}
description
์คํธ๋ฆฌ๋ฐ API ์ค๋ช ์ ์ถ๊ฐํฉ๋๋ค.๋ณต์ฌ
@stream({
type: "sse",
events: EventsSchema,
description: "์ค์๊ฐ ์๋ฆผ์ ์คํธ๋ฆฌ๋ฐํฉ๋๋ค."
})
async notifications() {
// ...
}
resourceName
์์ฑ๋๋ ์๋น์ค ํ์ผ์ ๋ฆฌ์์ค ์ด๋ฆ์ ์ง์ ํฉ๋๋ค.๋ณต์ฌ
@stream({
type: "sse",
events: EventsSchema,
resourceName: "Notifications"
})
async subscribe() {
// NotificationsService.ts ํ์ผ์ ํฌํจ
}
SSE ๊ฐ์ฒด ์ฌ์ฉ๋ฒ
createSSE
Context์์ SSE ๊ฐ์ฒด๋ฅผ ์์ฑํฉ๋๋ค.๋ณต์ฌ
const sse = Sonamu.getContext().createSSE(EventsSchema);
publish
์ด๋ฒคํธ๋ฅผ ํด๋ผ์ด์ธํธ๋ก ์ ์กํฉ๋๋ค.๋ณต์ฌ
sse.publish("eventName", data);
- ์ด๋ฒคํธ ์ด๋ฆ์ ์คํค๋ง์ ์ ์๋ ๊ฒ๋ง ์ฌ์ฉ ๊ฐ๋ฅ
- ๋ฐ์ดํฐ๋ ์คํค๋ง์ ๋ง์ถฐ ์๋ ๊ฒ์ฆ
๋ณต์ฌ
const EventsSchema = z.object({
message: z.object({
text: z.string()
})
});
const sse = context.createSSE(EventsSchema);
// โ
์ฌ๋ฐ๋ฅธ ์ฌ์ฉ
sse.publish("message", { text: "Hello" });
// โ ํ์
์๋ฌ
sse.publish("unknown", { text: "Hello" }); // 'unknown'์ ์คํค๋ง์ ์์
sse.publish("message", { text: 123 }); // text๋ string์ด์ด์ผ ํจ
end
์คํธ๋ฆผ ์ฐ๊ฒฐ์ ์ข ๋ฃํฉ๋๋ค.๋ณต์ฌ
await sse.end();
end()๋ ํด๋ผ์ด์ธํธ์ โendโ ์ด๋ฒคํธ๋ฅผ ์ ์กํ๊ณ 200ms ๋๊ธฐ ํ ์ฐ๊ฒฐ์ ๋ซ์ต๋๋ค.์ ์ฒด ์์
๋ณต์ฌ
import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";
const LogEventsSchema = z.object({
log: z.object({
level: z.enum(["info", "warn", "error"]),
message: z.string(),
timestamp: z.string()
}),
progress: z.object({
current: z.number(),
total: z.number()
}),
complete: z.object({
success: z.boolean(),
duration: z.number()
})
});
class TaskModelClass extends BaseModelClass {
@stream({
type: "sse",
events: LogEventsSchema,
path: "/tasks/logs",
guards: ["admin"],
description: "์์
์คํ ๋ก๊ทธ๋ฅผ ์ค์๊ฐ์ผ๋ก ์คํธ๋ฆฌ๋ฐํฉ๋๋ค."
})
async streamLogs(taskId: number) {
const sse = Sonamu.getContext().createSSE(LogEventsSchema);
const startTime = Date.now();
// ์์
์์
sse.publish("log", {
level: "info",
message: "Task started",
timestamp: new Date().toISOString()
});
// ์์
์คํ
const total = 100;
for (let i = 0; i <= total; i++) {
await this.processItem(i);
// ์งํ๋ฅ ์ ์ก
sse.publish("progress", {
current: i,
total
});
// ๋ก๊ทธ ์ ์ก
if (i % 10 === 0) {
sse.publish("log", {
level: "info",
message: `Processed ${i}/${total} items`,
timestamp: new Date().toISOString()
});
}
}
// ์์
์๋ฃ
sse.publish("complete", {
success: true,
duration: Date.now() - startTime
});
// ์ฐ๊ฒฐ ์ข
๋ฃ
await sse.end();
return sse;
}
private async processItem(index: number) {
// ์์
์ฒ๋ฆฌ
await new Promise(resolve => setTimeout(resolve, 50));
}
}
๋ค๋ฅธ ๋ฐ์ฝ๋ ์ดํฐ์ ํจ๊ป ์ฌ์ฉ
@transactional
๋ณต์ฌ
@stream({ type: "sse", events: EventsSchema })
@transactional()
async process() {
const sse = Sonamu.getContext().createSSE(EventsSchema);
const wdb = this.getDB("w");
// ํธ๋์ญ์
๋ด์์ ์คํ
// SSE ์ ์ก๊ณผ DB ์์
ํจ๊ป ์ํ
await sse.end();
return sse;
}
@stream์ ๋จผ์ , @transactional์ ๋์ค์ ์์ฑํ์ธ์.์ ์ฝ์ฌํญ
1. @api์ ์ค๋ณต ์ฌ์ฉ ๋ถ๊ฐ
๋ณต์ฌ
// โ ์๋ฌ ๋ฐ์
@api()
@stream({ type: "sse", events: EventsSchema })
async subscribe() {}
๋ณต์ฌ
@stream decorator can only be used once on NotificationModel.subscribe.
You can use only one of @api or @stream decorator on the same method.
2. events๋ ํ์
๋ณต์ฌ
// โ ์๋ฌ ๋ฐ์
@stream({ type: "sse" })
async subscribe() {}
// events ์ต์
์ด ํ์ํฉ๋๋ค
3. httpMethod๋ ํญ์ GET
@stream์ ์๋์ผ๋ก httpMethod: "GET"์ ์ค์ ํฉ๋๋ค.
๋ณต์ฌ
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
// GET /notification/subscribe๋ก ๋
ธ์ถ๋จ
}
ํด๋ผ์ด์ธํธ ์ฌ์ฉ (Web)
Sonamu๋ ์๋์ผ๋ก SSE ํด๋ผ์ด์ธํธ ์ฝ๋๋ฅผ ์์ฑํฉ๋๋ค.๋ณต์ฌ
// ์๋ ์์ฑ๋ ์ฝ๋
import { NotificationService } from "@/services/NotificationService";
// SSE ์ฐ๊ฒฐ
const eventSource = NotificationService.subscribe(userId);
// ์ด๋ฒคํธ ๋ฆฌ์ค๋ ๋ฑ๋ก
eventSource.addEventListener("message", (event) => {
const data = JSON.parse(event.data);
console.log("New message:", data);
});
eventSource.addEventListener("status", (event) => {
const data = JSON.parse(event.data);
console.log("Status:", data);
});
// ์ฐ๊ฒฐ ์ข
๋ฃ ์ด๋ฒคํธ
eventSource.addEventListener("end", () => {
console.log("Stream ended");
eventSource.close();
});
// ์๋ ์ฐ๊ฒฐ ์ข
๋ฃ
eventSource.close();
React ์์
๋ณต์ฌ
import { useEffect, useState } from "react";
function NotificationList({ userId }: { userId: number }) {
const [messages, setMessages] = useState<Message[]>([]);
const [status, setStatus] = useState<string>("disconnected");
useEffect(() => {
const eventSource = NotificationService.subscribe(userId);
eventSource.addEventListener("message", (event) => {
const message = JSON.parse(event.data);
setMessages(prev => [...prev, message]);
});
eventSource.addEventListener("status", (event) => {
const { type } = JSON.parse(event.data);
setStatus(type);
});
eventSource.addEventListener("end", () => {
console.log("Stream ended by server");
setStatus("disconnected");
});
return () => {
eventSource.close();
};
}, [userId]);
return (
<div>
<div>Status: {status}</div>
<ul>
{messages.map(msg => (
<li key={msg.id}>{msg.text}</li>
))}
</ul>
</div>
);
}
๋ก๊น
@stream ๋ฐ์ฝ๋ ์ดํฐ๋ ์๋์ผ๋ก ๋ก๊ทธ๋ฅผ ๋จ๊น๋๋ค:
๋ณต์ฌ
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
// ์๋ ๋ก๊ทธ:
// [DEBUG] stream: NotificationModel.subscribe
}
์์ ๋ชจ์
- ์ฑํ
- ์งํ๋ฅ ๋ชจ๋ํฐ๋ง
- ์ค์๊ฐ ๋ฐ์ดํฐ
๋ณต์ฌ
const ChatEventsSchema = z.object({
message: z.object({
id: z.string(),
userId: z.number(),
text: z.string(),
timestamp: z.date()
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean()
}),
joined: z.object({
userId: z.number(),
username: z.string()
}),
left: z.object({
userId: z.number()
})
});
class ChatModelClass extends BaseModelClass {
@stream({
type: "sse",
events: ChatEventsSchema,
guards: ["user"]
})
async subscribe(roomId: number) {
const sse = Sonamu.getContext().createSSE(ChatEventsSchema);
const { user } = Sonamu.getContext();
// ์
์ฅ ์๋ฆผ
sse.publish("joined", {
userId: user.id,
username: user.name
});
// ๋ฉ์์ง ์์ ๋ฆฌ์ค๋ ๋ฑ๋ก
const unsubscribe = this.onMessage(roomId, (message) => {
sse.publish("message", message);
});
// 1์๊ฐ ํ ์๋ ์ข
๋ฃ
setTimeout(async () => {
unsubscribe();
sse.publish("left", { userId: user.id });
await sse.end();
}, 3600000);
return sse;
}
}
๋ณต์ฌ
const ProcessEventsSchema = z.object({
started: z.object({
taskId: z.string(),
totalSteps: z.number()
}),
progress: z.object({
step: z.number(),
message: z.string(),
percent: z.number()
}),
completed: z.object({
success: z.boolean(),
result: z.any(),
duration: z.number()
}),
error: z.object({
step: z.number(),
message: z.string()
})
});
class ProcessModelClass extends BaseModelClass {
@stream({
type: "sse",
events: ProcessEventsSchema,
guards: ["admin"]
})
async monitor(processId: string) {
const sse = Sonamu.getContext().createSSE(ProcessEventsSchema);
const startTime = Date.now();
try {
const steps = await this.getProcessSteps(processId);
sse.publish("started", {
taskId: processId,
totalSteps: steps.length
});
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
sse.publish("progress", {
step: i + 1,
message: step.description,
percent: ((i + 1) / steps.length) * 100
});
await this.executeStep(step);
}
sse.publish("completed", {
success: true,
result: await this.getProcessResult(processId),
duration: Date.now() - startTime
});
} catch (error) {
sse.publish("error", {
step: -1,
message: error.message
});
}
await sse.end();
return sse;
}
}
๋ณต์ฌ
const StockEventsSchema = z.object({
price: z.object({
symbol: z.string(),
price: z.number(),
change: z.number(),
timestamp: z.date()
}),
volume: z.object({
symbol: z.string(),
volume: z.number()
}),
alert: z.object({
type: z.enum(["high", "low"]),
symbol: z.string(),
message: z.string()
})
});
class StockModelClass extends BaseModelClass {
@stream({
type: "sse",
events: StockEventsSchema,
guards: ["user"]
})
async watchSymbol(symbol: string) {
const sse = Sonamu.getContext().createSSE(StockEventsSchema);
// 1์ด๋ง๋ค ๊ฐ๊ฒฉ ์
๋ฐ์ดํธ (1๋ถ๊ฐ)
for (let i = 0; i < 60; i++) {
const quote = await this.getQuote(symbol);
sse.publish("price", {
symbol,
price: quote.price,
change: quote.change,
timestamp: new Date()
});
// ๊ฐ๊ฒฉ ์๋ฆผ
if (quote.price > quote.highThreshold) {
sse.publish("alert", {
type: "high",
symbol,
message: `Price exceeded ${quote.highThreshold}`
});
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
await sse.end();
return sse;
}
}