ctx.createSSE()로 생성한 SSE 연결 객체를 사용하여 이벤트를 전송합니다.
@stream 데코레이터
SSE 엔드포인트를 정의하는 데코레이터입니다.기본 사용법
import { BaseModelClass, stream, z } from "sonamu";
class NotificationModelClass extends BaseModelClass {
@stream({
type: 'sse',
events: z.object({
notification: z.object({
id: z.number(),
message: z.string(),
}),
})
})
async streamNotifications(userId: number, ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
notification: z.object({
id: z.number(),
message: z.string(),
}),
})
);
// 이벤트 전송
sse.publish('notification', {
id: 1,
message: 'Hello, World!'
});
// 연결 종료
await sse.end();
}
}
@stream 옵션
- type
- events
- path
- guards
스트림 타입지원 타입:
@stream({
type: 'sse', // 현재는 SSE만 지원
})
'sse': Server-Sent Events- (향후 WebSocket 지원 예정)
이벤트 스키마역할: 타입 안전성 보장 + 자동 코드 생성
@stream({
type: 'sse',
events: z.object({
message: z.string(),
update: z.object({
count: z.number(),
}),
error: z.object({
code: z.string(),
message: z.string(),
}),
})
})
커스텀 경로 (선택)기본값:
@stream({
type: 'sse',
events: z.object({
message: z.string(),
}),
path: '/custom/stream'
})
/{modelName}/{methodName}인증/권한 (선택)
@stream({
type: 'sse',
events: z.object({
message: z.string(),
}),
guards: ['user'] // 로그인 사용자만
})
SSEConnection API
ctx.createSSE()로 생성된 SSE 연결 객체입니다.
closed
연결이 종료되었는지 여부를 나타내는 읽기 전용 속성입니다. 클라이언트가 연결을 끊거나end()를 호출한 후 true가 됩니다.
if (sse.closed) {
return;
}
sse.publish('status', { message: 'still alive' });
onClose(callback)
연결이 종료될 때 호출될 콜백을 등록합니다. 클라이언트 측 연결 종료와end() 호출 모두에서 실행됩니다. 타이머나 외부 리소스 정리에 활용할 수 있습니다.
const interval = setInterval(() => {
sse.publish('heartbeat', { ts: Date.now() });
}, 5000);
sse.onClose(() => {
clearInterval(interval);
});
publish(event, data)
이벤트를 클라이언트로 전송합니다.sse.publish('notification', {
id: 1,
message: 'New notification'
});
// ✅ 올바른 이벤트
sse.publish('notification', { id: 1, message: 'Hi' });
// ❌ 잘못된 이벤트 (컴파일 에러)
sse.publish('unknown', { ... });
// ❌ 잘못된 데이터 (컴파일 에러)
sse.publish('notification', { id: 'wrong' });
end()
연결을 종료합니다.await sse.end();
event: end, data: END
실전 예제
1. 실시간 알림
class NotificationModelClass extends BaseModelClass {
@stream({
type: 'sse',
events: z.object({
notification: z.object({
id: z.number(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
createdAt: z.string(),
}),
})
})
async streamNotifications(userId: number, ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
notification: z.object({
id: z.number(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
createdAt: z.string(),
}),
})
);
try {
// 실시간 알림 감지 (예: 폴링 또는 이벤트 리스너)
while (true) {
// DB에서 새 알림 조회
const notifications = await this.findMany({
wq: [
['user_id', userId],
['created_at', '>', new Date(Date.now() - 5000)], // 최근 5초
],
num: 10,
});
// 알림 전송
for (const notification of notifications.data) {
sse.publish('notification', {
id: notification.id,
type: notification.type,
message: notification.message,
createdAt: notification.created_at.toISOString(),
});
}
// 5초 대기
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
await sse.end();
}
}
}
2. 작업 진행률
class TaskModelClass extends BaseModelClass {
@stream({
type: 'sse',
events: z.object({
progress: z.object({
current: z.number(),
total: z.number(),
percentage: z.number(),
}),
complete: z.object({
result: z.string(),
}),
error: z.object({
message: z.string(),
}),
})
})
async streamTaskProgress(taskId: number, ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
progress: z.object({
current: z.number(),
total: z.number(),
percentage: z.number(),
}),
complete: z.object({
result: z.string(),
}),
error: z.object({
message: z.string(),
}),
})
);
try {
const total = 100;
for (let current = 0; current <= total; current += 10) {
// 진행률 전송
sse.publish('progress', {
current,
total,
percentage: Math.round((current / total) * 100),
});
// 작업 수행 (예: 파일 처리)
await new Promise(resolve => setTimeout(resolve, 1000));
}
// 완료 이벤트
sse.publish('complete', {
result: 'Task completed successfully'
});
} catch (error) {
// 에러 이벤트
sse.publish('error', {
message: error.message
});
} finally {
await sse.end();
}
}
}
3. 라이브 피드
class PostModelClass extends BaseModelClass {
@stream({
type: 'sse',
events: z.object({
newPost: z.object({
id: z.number(),
title: z.string(),
author: z.string(),
createdAt: z.string(),
}),
})
})
async streamNewPosts(ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
newPost: z.object({
id: z.number(),
title: z.string(),
author: z.string(),
createdAt: z.string(),
}),
})
);
try {
let lastCheckTime = new Date();
while (true) {
// 새 게시물 조회
const posts = await this.findMany({
wq: [['created_at', '>', lastCheckTime]],
num: 20,
order: [['created_at', 'DESC']],
});
// 새 게시물 전송
for (const post of posts.data) {
sse.publish('newPost', {
id: post.id,
title: post.title,
author: post.author_name,
createdAt: post.created_at.toISOString(),
});
}
// 마지막 체크 시간 업데이트
if (posts.data.length > 0) {
lastCheckTime = posts.data[0].created_at;
}
// 10초 대기
await new Promise(resolve => setTimeout(resolve, 10000));
}
} finally {
await sse.end();
}
}
}
4. 다중 이벤트
class ChatModelClass extends BaseModelClass {
@stream({
type: 'sse',
events: z.object({
message: z.object({
id: z.number(),
text: z.string(),
sender: z.string(),
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean(),
}),
joined: z.object({
userId: z.number(),
username: z.string(),
}),
left: z.object({
userId: z.number(),
}),
})
})
async streamChatRoom(roomId: number, ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
message: z.object({
id: z.number(),
text: z.string(),
sender: z.string(),
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean(),
}),
joined: z.object({
userId: z.number(),
username: z.string(),
}),
left: z.object({
userId: z.number(),
}),
})
);
try {
// 입장 이벤트
sse.publish('joined', {
userId: ctx.user.id,
username: ctx.user.name,
});
// 메시지 스트리밍
while (true) {
const messages = await this.getNewMessages(roomId);
for (const msg of messages) {
sse.publish('message', {
id: msg.id,
text: msg.text,
sender: msg.sender_name,
});
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
// 퇴장 이벤트
sse.publish('left', {
userId: ctx.user.id,
});
await sse.end();
}
}
}
5. 조건부 전송
class MonitorModelClass extends BaseModelClass {
@stream({
type: 'sse',
events: z.object({
status: z.object({
cpu: z.number(),
memory: z.number(),
disk: z.number(),
}),
alert: z.object({
level: z.enum(['warning', 'critical']),
message: z.string(),
}),
})
})
async streamServerStatus(ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
status: z.object({
cpu: z.number(),
memory: z.number(),
disk: z.number(),
}),
alert: z.object({
level: z.enum(['warning', 'critical']),
message: z.string(),
}),
})
);
try {
while (true) {
const status = await this.getServerStatus();
// 상태 전송
sse.publish('status', {
cpu: status.cpu,
memory: status.memory,
disk: status.disk,
});
// 임계값 초과 시 알림
if (status.cpu > 90) {
sse.publish('alert', {
level: 'critical',
message: `CPU usage is ${status.cpu}%`,
});
} else if (status.cpu > 70) {
sse.publish('alert', {
level: 'warning',
message: `CPU usage is ${status.cpu}%`,
});
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
await sse.end();
}
}
}
연결 관리
클라이언트 연결 종료 감지
@stream({
type: 'sse',
events: z.object({
message: z.string(),
})
})
async streamData(ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
message: z.string(),
})
);
// 클라이언트 연결 종료 감지
ctx.request.socket.on('close', () => {
console.log('Client disconnected');
});
try {
while (true) {
sse.publish('message', 'Hello');
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
await sse.end();
}
}
Keep-Alive
SSE는 자동으로 Keep-Alive를 전송하지만, 커스텀 구현도 가능합니다:@stream({
type: 'sse',
events: z.object({
message: z.string(),
keepalive: z.object({}),
})
})
async streamWithKeepAlive(ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
message: z.string(),
keepalive: z.object({}),
})
);
// Keep-alive 전송 (30초마다)
const keepAliveInterval = setInterval(() => {
sse.publish('keepalive', {});
}, 30000);
try {
while (true) {
sse.publish('message', 'Data');
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
clearInterval(keepAliveInterval);
await sse.end();
}
}
에러 처리
@stream({
type: 'sse',
events: z.object({
data: z.object({
value: z.number(),
}),
error: z.object({
message: z.string(),
}),
})
})
async streamWithErrorHandling(ctx: Context): Promise<void> {
const sse = ctx.createSSE(
z.object({
data: z.object({
value: z.number(),
}),
error: z.object({
message: z.string(),
}),
})
);
try {
const value = await this.getSomeData();
sse.publish('data', { value });
} catch (error) {
// 에러를 이벤트로 전송
sse.publish('error', {
message: error.message
});
} finally {
await sse.end();
}
}
주의사항
SSE 엔드포인트 작성 시 주의사항:
-
@stream과@api는 동시에 사용할 수 없습니다: SSE 엔드포인트에는@stream데코레이터만 사용합니다// ✅ @stream만 사용 @stream({ type: 'sse', events: ... }) async streamData(ctx: Context): Promise<void> { ... } // ❌ @api와 @stream 동시 사용 (런타임 에러) @stream({ type: 'sse', events: ... }) @api({ compress: false }) async streamData(ctx: Context): Promise<void> { ... } -
반드시 end() 호출: 연결 종료 필수
try { // 이벤트 전송 } finally { await sse.end(); // 필수! } -
무한 루프 주의: 적절한 대기 시간 필요
while (true) { sse.publish('data', ...); await new Promise(resolve => setTimeout(resolve, 1000)); // 1초 대기 } -
이벤트 스키마 일치: events와 publish 이벤트 이름 일치 필수
// ✅ 일치 events: z.object({ message: z.string(), }) sse.publish('message', 'Hi'); // ❌ 불일치 (컴파일 에러) sse.publish('msg', 'Hi'); -
메모리 누수 방지: 타이머/리스너 정리
const interval = setInterval(() => { ... }, 1000); try { // ... } finally { clearInterval(interval); // 정리 await sse.end(); }
다음 단계
SSE 설정
SSE 플러그인 설정하기
클라이언트 통합
프론트엔드에서 SSE 사용하기