@stream 데코레이터는 Server-Sent Events(SSE)를 사용한 실시간 스트리밍 API를 생성합니다.
기본 사용법
복사
import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";
const NotificationEventsSchema = z.object({
message: z.object({
id: z.number(),
text: z.string(),
timestamp: z.date()
}),
status: z.object({
type: z.enum(["connected", "disconnected"]),
userId: z.number()
})
});
class NotificationModelClass extends BaseModelClass {
@stream({
type: "sse",
events: NotificationEventsSchema
})
async subscribe(userId: number) {
const sse = Sonamu.getContext().createSSE(NotificationEventsSchema);
// 연결 시작
sse.publish("status", {
type: "connected",
userId
});
// 주기적으로 이벤트 전송
const interval = setInterval(() => {
sse.publish("message", {
id: Date.now(),
text: "New notification",
timestamp: new Date()
});
}, 5000);
// 5분 후 연결 종료
setTimeout(async () => {
clearInterval(interval);
sse.publish("status", {
type: "disconnected",
userId
});
await sse.end();
}, 300000);
return sse;
}
}
옵션
type
스트리밍 타입을 지정합니다. 필수 옵션복사
type: "sse" // Server-Sent Events (현재 유일한 옵션)
향후 WebSocket 지원이 추가될 수 있습니다.
events
전송할 이벤트의 타입을 Zod 스키마로 정의합니다. 필수 옵션복사
import { z } from "zod";
const EventsSchema = z.object({
// 이벤트 이름: 이벤트 데이터 스키마
progress: z.object({
percent: z.number(),
message: z.string()
}),
complete: z.object({
result: z.string()
}),
error: z.object({
message: z.string(),
code: z.string()
})
});
@stream({
type: "sse",
events: EventsSchema
})
async process() {
const sse = Sonamu.getContext().createSSE(EventsSchema);
// 이벤트 전송
sse.publish("progress", { percent: 50, message: "Processing..." });
sse.publish("complete", { result: "Success" });
// 스트림 종료
await sse.end();
return sse;
}
path
스트리밍 엔드포인트 경로를 지정합니다. 기본값:/{modelName}/{methodName} (camelCase)
복사
@stream({
type: "sse",
events: EventsSchema,
path: "/stream/notifications"
})
async subscribe() {
// /stream/notifications로 접근
}
guards
접근 권한을 지정합니다.복사
type GuardKey = "query" | "admin" | "user";
복사
@stream({
type: "sse",
events: EventsSchema,
guards: ["user"]
})
async subscribe() {
// 로그인한 사용자만 접근 가능
}
description
스트리밍 API 설명을 추가합니다.복사
@stream({
type: "sse",
events: EventsSchema,
description: "실시간 알림을 스트리밍합니다."
})
async notifications() {
// ...
}
resourceName
생성되는 서비스 파일의 리소스 이름을 지정합니다.복사
@stream({
type: "sse",
events: EventsSchema,
resourceName: "Notifications"
})
async subscribe() {
// NotificationsService.ts 파일에 포함
}
SSE 객체 사용법
createSSE
Context에서 SSE 객체를 생성합니다.복사
const sse = Sonamu.getContext().createSSE(EventsSchema);
publish
이벤트를 클라이언트로 전송합니다.복사
sse.publish("eventName", data);
- 이벤트 이름은 스키마에 정의된 것만 사용 가능
- 데이터는 스키마에 맞춰 자동 검증
복사
const EventsSchema = z.object({
message: z.object({
text: z.string()
})
});
const sse = context.createSSE(EventsSchema);
// ✅ 올바른 사용
sse.publish("message", { text: "Hello" });
// ❌ 타입 에러
sse.publish("unknown", { text: "Hello" }); // 'unknown'은 스키마에 없음
sse.publish("message", { text: 123 }); // text는 string이어야 함
end
스트림 연결을 종료합니다.복사
await sse.end();
end()는 클라이언트에 “end” 이벤트를 전송하고 200ms 대기 후 연결을 닫습니다.전체 예시
복사
import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";
const LogEventsSchema = z.object({
log: z.object({
level: z.enum(["info", "warn", "error"]),
message: z.string(),
timestamp: z.string()
}),
progress: z.object({
current: z.number(),
total: z.number()
}),
complete: z.object({
success: z.boolean(),
duration: z.number()
})
});
class TaskModelClass extends BaseModelClass {
@stream({
type: "sse",
events: LogEventsSchema,
path: "/tasks/logs",
guards: ["admin"],
description: "작업 실행 로그를 실시간으로 스트리밍합니다."
})
async streamLogs(taskId: number) {
const sse = Sonamu.getContext().createSSE(LogEventsSchema);
const startTime = Date.now();
// 작업 시작
sse.publish("log", {
level: "info",
message: "Task started",
timestamp: new Date().toISOString()
});
// 작업 실행
const total = 100;
for (let i = 0; i <= total; i++) {
await this.processItem(i);
// 진행률 전송
sse.publish("progress", {
current: i,
total
});
// 로그 전송
if (i % 10 === 0) {
sse.publish("log", {
level: "info",
message: `Processed ${i}/${total} items`,
timestamp: new Date().toISOString()
});
}
}
// 작업 완료
sse.publish("complete", {
success: true,
duration: Date.now() - startTime
});
// 연결 종료
await sse.end();
return sse;
}
private async processItem(index: number) {
// 작업 처리
await new Promise(resolve => setTimeout(resolve, 50));
}
}
다른 데코레이터와 함께 사용
@transactional
복사
@stream({ type: "sse", events: EventsSchema })
@transactional()
async process() {
const sse = Sonamu.getContext().createSSE(EventsSchema);
const wdb = this.getDB("w");
// 트랜잭션 내에서 실행
// SSE 전송과 DB 작업 함께 수행
await sse.end();
return sse;
}
@stream을 먼저, @transactional을 나중에 작성하세요.제약사항
1. @api와 중복 사용 불가
복사
// ❌ 에러 발생
@api()
@stream({ type: "sse", events: EventsSchema })
async subscribe() {}
복사
@stream decorator can only be used once on NotificationModel.subscribe.
You can use only one of @api or @stream decorator on the same method.
2. events는 필수
복사
// ❌ 에러 발생
@stream({ type: "sse" })
async subscribe() {}
// events 옵션이 필요합니다
3. httpMethod는 항상 GET
@stream은 자동으로 httpMethod: "GET"을 설정합니다.
복사
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
// GET /notification/subscribe로 노출됨
}
클라이언트 사용 (Web)
Sonamu는 자동으로 SSE 클라이언트 코드를 생성합니다.복사
// 자동 생성된 코드
import { NotificationService } from "@/services/NotificationService";
// SSE 연결
const eventSource = NotificationService.subscribe(userId);
// 이벤트 리스너 등록
eventSource.addEventListener("message", (event) => {
const data = JSON.parse(event.data);
console.log("New message:", data);
});
eventSource.addEventListener("status", (event) => {
const data = JSON.parse(event.data);
console.log("Status:", data);
});
// 연결 종료 이벤트
eventSource.addEventListener("end", () => {
console.log("Stream ended");
eventSource.close();
});
// 수동 연결 종료
eventSource.close();
React 예시
복사
import { useEffect, useState } from "react";
function NotificationList({ userId }: { userId: number }) {
const [messages, setMessages] = useState<Message[]>([]);
const [status, setStatus] = useState<string>("disconnected");
useEffect(() => {
const eventSource = NotificationService.subscribe(userId);
eventSource.addEventListener("message", (event) => {
const message = JSON.parse(event.data);
setMessages(prev => [...prev, message]);
});
eventSource.addEventListener("status", (event) => {
const { type } = JSON.parse(event.data);
setStatus(type);
});
eventSource.addEventListener("end", () => {
console.log("Stream ended by server");
setStatus("disconnected");
});
return () => {
eventSource.close();
};
}, [userId]);
return (
<div>
<div>Status: {status}</div>
<ul>
{messages.map(msg => (
<li key={msg.id}>{msg.text}</li>
))}
</ul>
</div>
);
}
로깅
@stream 데코레이터는 자동으로 로그를 남깁니다:
복사
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
// 자동 로그:
// [DEBUG] stream: NotificationModel.subscribe
}
예시 모음
- 채팅
- 진행률 모니터링
- 실시간 데이터
복사
const ChatEventsSchema = z.object({
message: z.object({
id: z.string(),
userId: z.number(),
text: z.string(),
timestamp: z.date()
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean()
}),
joined: z.object({
userId: z.number(),
username: z.string()
}),
left: z.object({
userId: z.number()
})
});
class ChatModelClass extends BaseModelClass {
@stream({
type: "sse",
events: ChatEventsSchema,
guards: ["user"]
})
async subscribe(roomId: number) {
const sse = Sonamu.getContext().createSSE(ChatEventsSchema);
const { user } = Sonamu.getContext();
// 입장 알림
sse.publish("joined", {
userId: user.id,
username: user.name
});
// 메시지 수신 리스너 등록
const unsubscribe = this.onMessage(roomId, (message) => {
sse.publish("message", message);
});
// 1시간 후 자동 종료
setTimeout(async () => {
unsubscribe();
sse.publish("left", { userId: user.id });
await sse.end();
}, 3600000);
return sse;
}
}
복사
const ProcessEventsSchema = z.object({
started: z.object({
taskId: z.string(),
totalSteps: z.number()
}),
progress: z.object({
step: z.number(),
message: z.string(),
percent: z.number()
}),
completed: z.object({
success: z.boolean(),
result: z.any(),
duration: z.number()
}),
error: z.object({
step: z.number(),
message: z.string()
})
});
class ProcessModelClass extends BaseModelClass {
@stream({
type: "sse",
events: ProcessEventsSchema,
guards: ["admin"]
})
async monitor(processId: string) {
const sse = Sonamu.getContext().createSSE(ProcessEventsSchema);
const startTime = Date.now();
try {
const steps = await this.getProcessSteps(processId);
sse.publish("started", {
taskId: processId,
totalSteps: steps.length
});
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
sse.publish("progress", {
step: i + 1,
message: step.description,
percent: ((i + 1) / steps.length) * 100
});
await this.executeStep(step);
}
sse.publish("completed", {
success: true,
result: await this.getProcessResult(processId),
duration: Date.now() - startTime
});
} catch (error) {
sse.publish("error", {
step: -1,
message: error.message
});
}
await sse.end();
return sse;
}
}
복사
const StockEventsSchema = z.object({
price: z.object({
symbol: z.string(),
price: z.number(),
change: z.number(),
timestamp: z.date()
}),
volume: z.object({
symbol: z.string(),
volume: z.number()
}),
alert: z.object({
type: z.enum(["high", "low"]),
symbol: z.string(),
message: z.string()
})
});
class StockModelClass extends BaseModelClass {
@stream({
type: "sse",
events: StockEventsSchema,
guards: ["user"]
})
async watchSymbol(symbol: string) {
const sse = Sonamu.getContext().createSSE(StockEventsSchema);
// 1초마다 가격 업데이트 (1분간)
for (let i = 0; i < 60; i++) {
const quote = await this.getQuote(symbol);
sse.publish("price", {
symbol,
price: quote.price,
change: quote.change,
timestamp: new Date()
});
// 가격 알림
if (quote.price > quote.highThreshold) {
sse.publish("alert", {
type: "high",
symbol,
message: `Price exceeded ${quote.highThreshold}`
});
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
await sse.end();
return sse;
}
}
