@stream Decorator
A decorator for defining SSE endpoints.Basic Usage
Copy
import { BaseModel, stream, api, z } from "sonamu";
class NotificationModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
notification: z.object({
id: z.number(),
message: z.string(),
}),
})
})
@api({ compress: false }) // Compression must be disabled
async *streamNotifications(userId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
notification: z.object({
id: z.number(),
message: z.string(),
}),
})
);
// Send event
sse.publish('notification', {
id: 1,
message: 'Hello, World!'
});
// Close connection
await sse.end();
}
}
@stream Options
- type
- events
- path
- guards
Stream typeSupported types:
Copy
@stream({
type: 'sse', // Currently only SSE is supported
})
'sse': Server-Sent Events- (WebSocket support planned for future)
Event schemaPurpose: Ensures type safety + automatic code generation
Copy
@stream({
type: 'sse',
events: z.object({
message: z.string(),
update: z.object({
count: z.number(),
}),
error: z.object({
code: z.string(),
message: z.string(),
}),
})
})
Custom path (optional)Default:
Copy
@stream({
type: 'sse',
events: z.object({
message: z.string(),
}),
path: '/custom/stream'
})
/{modelName}/{methodName}Authentication/Authorization (optional)
Copy
@stream({
type: 'sse',
events: z.object({
message: z.string(),
}),
guards: ['user'] // Logged-in users only
})
SSEConnection API
The SSE connection object created byctx.createSSE().
publish(event, data)
Sends an event to the client.Copy
sse.publish('notification', {
id: 1,
message: 'New notification'
});
Copy
// ✅ Correct event
sse.publish('notification', { id: 1, message: 'Hi' });
// ❌ Wrong event (compile error)
sse.publish('unknown', { ... });
// ❌ Wrong data (compile error)
sse.publish('notification', { id: 'wrong' });
end()
Closes the connection.Copy
await sse.end();
event: end, data: END
Practical Examples
1. Real-time Notifications
Copy
class NotificationModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
notification: z.object({
id: z.number(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
createdAt: z.string(),
}),
})
})
@api({ compress: false })
async *streamNotifications(userId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
notification: z.object({
id: z.number(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
createdAt: z.string(),
}),
})
);
try {
// Real-time notification detection (e.g., polling or event listener)
while (true) {
// Query new notifications from DB
const notifications = await this.findMany({
wq: [
['user_id', userId],
['created_at', '>', new Date(Date.now() - 5000)], // Last 5 seconds
],
num: 10,
});
// Send notifications
for (const notification of notifications.data) {
sse.publish('notification', {
id: notification.id,
type: notification.type,
message: notification.message,
createdAt: notification.created_at.toISOString(),
});
}
// Wait 5 seconds
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
await sse.end();
}
}
}
2. Task Progress
Copy
class TaskModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
progress: z.object({
current: z.number(),
total: z.number(),
percentage: z.number(),
}),
complete: z.object({
result: z.string(),
}),
error: z.object({
message: z.string(),
}),
})
})
@api({ compress: false })
async *streamTaskProgress(taskId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
progress: z.object({
current: z.number(),
total: z.number(),
percentage: z.number(),
}),
complete: z.object({
result: z.string(),
}),
error: z.object({
message: z.string(),
}),
})
);
try {
const total = 100;
for (let current = 0; current <= total; current += 10) {
// Send progress
sse.publish('progress', {
current,
total,
percentage: Math.round((current / total) * 100),
});
// Perform work (e.g., file processing)
await new Promise(resolve => setTimeout(resolve, 1000));
}
// Completion event
sse.publish('complete', {
result: 'Task completed successfully'
});
} catch (error) {
// Error event
sse.publish('error', {
message: error.message
});
} finally {
await sse.end();
}
}
}
3. Live Feed
Copy
class PostModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
newPost: z.object({
id: z.number(),
title: z.string(),
author: z.string(),
createdAt: z.string(),
}),
})
})
@api({ compress: false })
async *streamNewPosts(ctx: Context) {
const sse = ctx.createSSE(
z.object({
newPost: z.object({
id: z.number(),
title: z.string(),
author: z.string(),
createdAt: z.string(),
}),
})
);
try {
let lastCheckTime = new Date();
while (true) {
// Query new posts
const posts = await this.findMany({
wq: [['created_at', '>', lastCheckTime]],
num: 20,
order: [['created_at', 'DESC']],
});
// Send new posts
for (const post of posts.data) {
sse.publish('newPost', {
id: post.id,
title: post.title,
author: post.author_name,
createdAt: post.created_at.toISOString(),
});
}
// Update last check time
if (posts.data.length > 0) {
lastCheckTime = posts.data[0].created_at;
}
// Wait 10 seconds
await new Promise(resolve => setTimeout(resolve, 10000));
}
} finally {
await sse.end();
}
}
}
4. Multiple Events
Copy
class ChatModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
message: z.object({
id: z.number(),
text: z.string(),
sender: z.string(),
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean(),
}),
joined: z.object({
userId: z.number(),
username: z.string(),
}),
left: z.object({
userId: z.number(),
}),
})
})
@api({ compress: false })
async *streamChatRoom(roomId: number, ctx: Context) {
const sse = ctx.createSSE(
z.object({
message: z.object({
id: z.number(),
text: z.string(),
sender: z.string(),
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean(),
}),
joined: z.object({
userId: z.number(),
username: z.string(),
}),
left: z.object({
userId: z.number(),
}),
})
);
try {
// Join event
sse.publish('joined', {
userId: ctx.user.id,
username: ctx.user.name,
});
// Message streaming
while (true) {
const messages = await this.getNewMessages(roomId);
for (const msg of messages) {
sse.publish('message', {
id: msg.id,
text: msg.text,
sender: msg.sender_name,
});
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
// Leave event
sse.publish('left', {
userId: ctx.user.id,
});
await sse.end();
}
}
}
5. Conditional Sending
Copy
class MonitorModelClass extends BaseModel {
@stream({
type: 'sse',
events: z.object({
status: z.object({
cpu: z.number(),
memory: z.number(),
disk: z.number(),
}),
alert: z.object({
level: z.enum(['warning', 'critical']),
message: z.string(),
}),
})
})
@api({ compress: false })
async *streamServerStatus(ctx: Context) {
const sse = ctx.createSSE(
z.object({
status: z.object({
cpu: z.number(),
memory: z.number(),
disk: z.number(),
}),
alert: z.object({
level: z.enum(['warning', 'critical']),
message: z.string(),
}),
})
);
try {
while (true) {
const status = await this.getServerStatus();
// Send status
sse.publish('status', {
cpu: status.cpu,
memory: status.memory,
disk: status.disk,
});
// Alert on threshold exceeded
if (status.cpu > 90) {
sse.publish('alert', {
level: 'critical',
message: `CPU usage is ${status.cpu}%`,
});
} else if (status.cpu > 70) {
sse.publish('alert', {
level: 'warning',
message: `CPU usage is ${status.cpu}%`,
});
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
await sse.end();
}
}
}
Connection Management
Detecting Client Disconnection
Copy
@stream({
type: 'sse',
events: z.object({
message: z.string(),
})
})
@api({ compress: false })
async *streamData(ctx: Context) {
const sse = ctx.createSSE(
z.object({
message: z.string(),
})
);
// Detect client disconnection
ctx.request.socket.on('close', () => {
console.log('Client disconnected');
});
try {
while (true) {
sse.publish('message', 'Hello');
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
await sse.end();
}
}
Keep-Alive
SSE automatically sends keep-alive signals, but custom implementation is also possible:Copy
@stream({
type: 'sse',
events: z.object({
message: z.string(),
keepalive: z.object({}),
})
})
@api({ compress: false })
async *streamWithKeepAlive(ctx: Context) {
const sse = ctx.createSSE(
z.object({
message: z.string(),
keepalive: z.object({}),
})
);
// Send keep-alive (every 30 seconds)
const keepAliveInterval = setInterval(() => {
sse.publish('keepalive', {});
}, 30000);
try {
while (true) {
sse.publish('message', 'Data');
await new Promise(resolve => setTimeout(resolve, 5000));
}
} finally {
clearInterval(keepAliveInterval);
await sse.end();
}
}
Error Handling
Copy
@stream({
type: 'sse',
events: z.object({
data: z.object({
value: z.number(),
}),
error: z.object({
message: z.string(),
}),
})
})
@api({ compress: false })
async *streamWithErrorHandling(ctx: Context) {
const sse = ctx.createSSE(
z.object({
data: z.object({
value: z.number(),
}),
error: z.object({
message: z.string(),
}),
})
);
try {
const value = await this.getSomeData();
sse.publish('data', { value });
} catch (error) {
// Send error as event
sse.publish('error', {
message: error.message
});
} finally {
await sse.end();
}
}
Important Notes
Important considerations when writing SSE endpoints:
-
Compression must be disabled: SSE is streaming, so compression is prohibited
Copy
@api({ compress: false }) -
Use generator function:
async *keyword requiredCopy// ✅ Generator function async *streamData() { ... } // ❌ Regular function async streamData() { ... } -
Always call end(): Connection closure is required
Copy
try { // Send events } finally { await sse.end(); // Required! } -
Beware of infinite loops: Appropriate wait time needed
Copy
while (true) { sse.publish('data', ...); await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second } -
Event schema matching: events and publish event names must match
Copy
// ✅ Match events: z.object({ message: z.string(), }) sse.publish('message', 'Hi'); // ❌ Mismatch (compile error) sse.publish('msg', 'Hi'); -
Prevent memory leaks: Clean up timers/listeners
Copy
const interval = setInterval(() => { ... }, 1000); try { // ... } finally { clearInterval(interval); // Cleanup await sse.end(); }