@stream 데코레이터
SSE 엔드포인트를 정의하는 데코레이터입니다.기본 사용법
복사
import { BaseModel, stream, api, z } from "sonamu";
class NotificationModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
notification: z.object({
id: z.number(),
message: z.string(),
}),
})
})
@api({ compress: false }) // 압축 비활성화 필수
async *streamNotifications(userId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
notification: z.object({
id: z.number(),
message: z.string(),
}),
})
);
// 이벤트 전송
sse.publish('notification', {
id: 1,
message: 'Hello, World!'
});
// 연결 종료
await sse.end();
}
}
@stream 옵션
- type
- events
- path
- guards
스트림 타입지원 타입:
복사
@stream({
type: 'sse', // 현재는 SSE만 지원
})
'sse': Server-Sent Events- (향후 WebSocket 지원 예정)
이벤트 스키마역할: 타입 안전성 보장 + 자동 코드 생성
복사
@stream({
type: 'sse',
events: z.object({
message: z.string(),
update: z.object({
count: z.number(),
}),
error: z.object({
code: z.string(),
message: z.string(),
}),
})
})
커스텀 경로 (선택)기본값:
복사
@stream({
type: 'sse',
events: z.object({
message: z.string(),
}),
path: '/custom/stream'
})
/{modelName}/{methodName}인증/권한 (선택)
복사
@stream({
type: 'sse',
events: z.object({
message: z.string(),
}),
guards: ['user'] // 로그인 사용자만
})
SSEConnection API
ctx.createSSE()로 생성된 SSE 연결 객체입니다.
publish(event, data)
이벤트를 클라이언트로 전송합니다.복사
sse.publish('notification', {
id: 1,
message: 'New notification'
});
복사
// ✅ 올바른 이벤트
sse.publish('notification', { id: 1, message: 'Hi' });
// ❌ 잘못된 이벤트 (컴파일 에러)
sse.publish('unknown', { ... });
// ❌ 잘못된 데이터 (컴파일 에러)
sse.publish('notification', { id: 'wrong' });
end()
연결을 종료합니다.복사
await sse.end();
event: end, data: END
실전 예제
1. 실시간 알림
복사
class NotificationModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
notification: z.object({
id: z.number(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
createdAt: z.string(),
}),
})
})
@api({ compress: false })
async *streamNotifications(userId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
notification: z.object({
id: z.number(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
createdAt: z.string(),
}),
})
);
try {
// 실시간 알림 감지 (예: 폴링 또는 이벤트 리스너)
while (true) {
// DB에서 새 알림 조회
const notifications = await this.findMany({
wq: [
['user_id', userId],
['created_at', '>', new Date(Date.now() - 5000)], // 최근 5초
],
num: 10,
});
// 알림 전송
for (const notification of notifications.data) {
sse.publish('notification', {
id: notification.id,
type: notification.type,
message: notification.message,
createdAt: notification.created_at.toISOString(),
});
}
// 5초 대기
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
await sse.end();
}
}
}
2. 작업 진행률
복사
class TaskModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
progress: z.object({
current: z.number(),
total: z.number(),
percentage: z.number(),
}),
complete: z.object({
result: z.string(),
}),
error: z.object({
message: z.string(),
}),
})
})
@api({ compress: false })
async *streamTaskProgress(taskId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
progress: z.object({
current: z.number(),
total: z.number(),
percentage: z.number(),
}),
complete: z.object({
result: z.string(),
}),
error: z.object({
message: z.string(),
}),
})
);
try {
const total = 100;
for (let current = 0; current <= total; current += 10) {
// 진행률 전송
sse.publish('progress', {
current,
total,
percentage: Math.round((current / total) * 100),
});
// 작업 수행 (예: 파일 처리)
await new Promise(resolve => setTimeout(resolve, 1000));
}
// 완료 이벤트
sse.publish('complete', {
result: 'Task completed successfully'
});
} catch (error) {
// 에러 이벤트
sse.publish('error', {
message: error.message
});
} finally {
await sse.end();
}
}
}
3. 라이브 피드
복사
class PostModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
newPost: z.object({
id: z.number(),
title: z.string(),
author: z.string(),
createdAt: z.string(),
}),
})
})
@api({ compress: false })
async *streamNewPosts(ctx: Context) {
const sse = ctx.createSSE(
z.object({
newPost: z.object({
id: z.number(),
title: z.string(),
author: z.string(),
createdAt: z.string(),
}),
})
);
try {
let lastCheckTime = new Date();
while (true) {
// 새 게시물 조회
const posts = await this.findMany({
wq: [['created_at', '>', lastCheckTime]],
num: 20,
order: [['created_at', 'DESC']],
});
// 새 게시물 전송
for (const post of posts.data) {
sse.publish('newPost', {
id: post.id,
title: post.title,
author: post.author_name,
createdAt: post.created_at.toISOString(),
});
}
// 마지막 체크 시간 업데이트
if (posts.data.length > 0) {
lastCheckTime = posts.data[0].created_at;
}
// 10초 대기
await new Promise(resolve => setTimeout(resolve, 10000));
}
} finally {
await sse.end();
}
}
}
4. 다중 이벤트
복사
class ChatModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
message: z.object({
id: z.number(),
text: z.string(),
sender: z.string(),
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean(),
}),
joined: z.object({
userId: z.number(),
username: z.string(),
}),
left: z.object({
userId: z.number(),
}),
})
})
@api({ compress: false })
async *streamChatRoom(roomId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
message: z.object({
id: z.number(),
text: z.string(),
sender: z.string(),
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean(),
}),
joined: z.object({
userId: z.number(),
username: z.string(),
}),
left: z.object({
userId: z.number(),
}),
})
);
try {
// 입장 이벤트
sse.publish('joined', {
userId: ctx.user.id,
username: ctx.user.name,
});
// 메시지 스트리밍
while (true) {
const messages = await this.getNewMessages(roomId);
for (const msg of messages) {
sse.publish('message', {
id: msg.id,
text: msg.text,
sender: msg.sender_name,
});
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
// 퇴장 이벤트
sse.publish('left', {
userId: ctx.user.id,
});
await sse.end();
}
}
}
5. 조건부 전송
복사
class MonitorModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
status: z.object({
cpu: z.number(),
memory: z.number(),
disk: z.number(),
}),
alert: z.object({
level: z.enum(['warning', 'critical']),
message: z.string(),
}),
})
})
@api({ compress: false })
async *streamServerStatus(ctx: Context) {
const sse = ctx.createSSE(
z.object({
status: z.object({
cpu: z.number(),
memory: z.number(),
disk: z.number(),
}),
alert: z.object({
level: z.enum(['warning', 'critical']),
message: z.string(),
}),
})
);
try {
while (true) {
const status = await this.getServerStatus();
// 상태 전송
sse.publish('status', {
cpu: status.cpu,
memory: status.memory,
disk: status.disk,
});
// 임계값 초과 시 알림
if (status.cpu > 90) {
sse.publish('alert', {
level: 'critical',
message: `CPU usage is ${status.cpu}%`,
});
} else if (status.cpu > 70) {
sse.publish('alert', {
level: 'warning',
message: `CPU usage is ${status.cpu}%`,
});
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
await sse.end();
}
}
}
연결 관리
클라이언트 연결 종료 감지
복사
@stream({
type: 'sse',
events: z.object({
message: z.string(),
})
})
@api({ compress: false })
async *streamData(ctx: Context) {
const sse = ctx.createSSE(
z.object({
message: z.string(),
})
);
// 클라이언트 연결 종료 감지
ctx.request.socket.on('close', () => {
console.log('Client disconnected');
});
try {
while (true) {
sse.publish('message', 'Hello');
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
await sse.end();
}
}
Keep-Alive
SSE는 자동으로 Keep-Alive를 전송하지만, 커스텀 구현도 가능합니다:복사
@stream({
type: 'sse',
events: z.object({
message: z.string(),
keepalive: z.object({}),
})
})
@api({ compress: false })
async *streamWithKeepAlive(ctx: Context) {
const sse = ctx.createSSE(
z.object({
message: z.string(),
keepalive: z.object({}),
})
);
// Keep-alive 전송 (30초마다)
const keepAliveInterval = setInterval(() => {
sse.publish('keepalive', {});
}, 30000);
try {
while (true) {
sse.publish('message', 'Data');
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
clearInterval(keepAliveInterval);
await sse.end();
}
}
에러 처리
복사
@stream({
type: 'sse',
events: z.object({
data: z.object({
value: z.number(),
}),
error: z.object({
message: z.string(),
}),
})
})
@api({ compress: false })
async *streamWithErrorHandling(ctx: Context) {
const sse = ctx.createSSE(
z.object({
data: z.object({
value: z.number(),
}),
error: z.object({
message: z.string(),
}),
})
);
try {
const value = await this.getSomeData();
sse.publish('data', { value });
} catch (error) {
// 에러를 이벤트로 전송
sse.publish('error', {
message: error.message
});
} finally {
await sse.end();
}
}
주의사항
SSE 엔드포인트 작성 시 주의사항:
-
압축 비활성화 필수: SSE는 스트리밍이므로 압축 금지
복사
@api({ compress: false }) -
Generator 함수 사용:
async *키워드 필요복사// ✅ Generator 함수 async *streamData() { ... } // ❌ 일반 함수 async streamData() { ... } -
반드시 end() 호출: 연결 종료 필수
복사
try { // 이벤트 전송 } finally { await sse.end(); // 필수! } -
무한 루프 주의: 적절한 대기 시간 필요
복사
while (true) { sse.publish('data', ...); await new Promise(resolve => setTimeout(resolve, 1000)); // 1초 대기 } -
이벤트 스키마 일치: events와 publish 이벤트 이름 일치 필수
복사
// ✅ 일치 events: z.object({ message: z.string(), }) sse.publish('message', 'Hi'); // ❌ 불일치 (컴파일 에러) sse.publish('msg', 'Hi'); -
메모리 누수 방지: 타이머/리스너 정리
복사
const interval = setInterval(() => { ... }, 1000); try { // ... } finally { clearInterval(interval); // 정리 await sse.end(); }
