@stream decorator creates real-time streaming APIs using Server-Sent Events (SSE).
Basic Usage
Copy
import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";
const NotificationEventsSchema = z.object({
message: z.object({
id: z.number(),
text: z.string(),
timestamp: z.date()
}),
status: z.object({
type: z.enum(["connected", "disconnected"]),
userId: z.number()
})
});
class NotificationModelClass extends BaseModelClass {
@stream({
type: "sse",
events: NotificationEventsSchema
})
async subscribe(userId: number) {
const sse = Sonamu.getContext().createSSE(NotificationEventsSchema);
// Connection started
sse.publish("status", {
type: "connected",
userId
});
// Send events periodically
const interval = setInterval(() => {
sse.publish("message", {
id: Date.now(),
text: "New notification",
timestamp: new Date()
});
}, 5000);
// Close connection after 5 minutes
setTimeout(async () => {
clearInterval(interval);
sse.publish("status", {
type: "disconnected",
userId
});
await sse.end();
}, 300000);
return sse;
}
}
Options
type
Specifies the streaming type. Required optionCopy
type: "sse" // Server-Sent Events (currently the only option)
WebSocket support may be added in the future.
events
Defines event types to send using a Zod schema. Required optionCopy
import { z } from "zod";
const EventsSchema = z.object({
// Event name: Event data schema
progress: z.object({
percent: z.number(),
message: z.string()
}),
complete: z.object({
result: z.string()
}),
error: z.object({
message: z.string(),
code: z.string()
})
});
@stream({
type: "sse",
events: EventsSchema
})
async process() {
const sse = Sonamu.getContext().createSSE(EventsSchema);
// Send events
sse.publish("progress", { percent: 50, message: "Processing..." });
sse.publish("complete", { result: "Success" });
// End stream
await sse.end();
return sse;
}
path
Specifies the streaming endpoint path. Default:/{modelName}/{methodName} (camelCase)
Copy
@stream({
type: "sse",
events: EventsSchema,
path: "/stream/notifications"
})
async subscribe() {
// Accessible at /stream/notifications
}
guards
Specifies access permissions.Copy
type GuardKey = "query" | "admin" | "user";
Copy
@stream({
type: "sse",
events: EventsSchema,
guards: ["user"]
})
async subscribe() {
// Only logged-in users can access
}
description
Adds a description for the streaming API.Copy
@stream({
type: "sse",
events: EventsSchema,
description: "Streams real-time notifications."
})
async notifications() {
// ...
}
resourceName
Specifies the resource name for the generated service file.Copy
@stream({
type: "sse",
events: EventsSchema,
resourceName: "Notifications"
})
async subscribe() {
// Included in NotificationsService.ts file
}
SSE Object Usage
createSSE
Creates an SSE object from Context.Copy
const sse = Sonamu.getContext().createSSE(EventsSchema);
publish
Sends an event to the client.Copy
sse.publish("eventName", data);
- Event names must be defined in the schema
- Data is automatically validated against the schema
Copy
const EventsSchema = z.object({
message: z.object({
text: z.string()
})
});
const sse = context.createSSE(EventsSchema);
// ✅ Correct usage
sse.publish("message", { text: "Hello" });
// ❌ Type error
sse.publish("unknown", { text: "Hello" }); // 'unknown' not in schema
sse.publish("message", { text: 123 }); // text must be string
end
Closes the stream connection.Copy
await sse.end();
end() sends an “end” event to the client and closes the connection after waiting 200ms.Complete Example
Copy
import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";
const LogEventsSchema = z.object({
log: z.object({
level: z.enum(["info", "warn", "error"]),
message: z.string(),
timestamp: z.string()
}),
progress: z.object({
current: z.number(),
total: z.number()
}),
complete: z.object({
success: z.boolean(),
duration: z.number()
})
});
class TaskModelClass extends BaseModelClass {
@stream({
type: "sse",
events: LogEventsSchema,
path: "/tasks/logs",
guards: ["admin"],
description: "Streams task execution logs in real-time."
})
async streamLogs(taskId: number) {
const sse = Sonamu.getContext().createSSE(LogEventsSchema);
const startTime = Date.now();
// Start task
sse.publish("log", {
level: "info",
message: "Task started",
timestamp: new Date().toISOString()
});
// Execute task
const total = 100;
for (let i = 0; i <= total; i++) {
await this.processItem(i);
// Send progress
sse.publish("progress", {
current: i,
total
});
// Send logs
if (i % 10 === 0) {
sse.publish("log", {
level: "info",
message: `Processed ${i}/${total} items`,
timestamp: new Date().toISOString()
});
}
}
// Task complete
sse.publish("complete", {
success: true,
duration: Date.now() - startTime
});
// Close connection
await sse.end();
return sse;
}
private async processItem(index: number) {
// Process task
await new Promise(resolve => setTimeout(resolve, 50));
}
}
Using with Other Decorators
@transactional
Copy
@stream({ type: "sse", events: EventsSchema })
@transactional()
async process() {
const sse = Sonamu.getContext().createSSE(EventsSchema);
const wdb = this.getDB("w");
// Execute within transaction
// Perform SSE sending and DB operations together
await sse.end();
return sse;
}
Write
@stream first, then @transactional.Constraints
1. Cannot be used with @api
Copy
// ❌ Error
@api()
@stream({ type: "sse", events: EventsSchema })
async subscribe() {}
Copy
@stream decorator can only be used once on NotificationModel.subscribe.
You can use only one of @api or @stream decorator on the same method.
2. events is required
Copy
// ❌ Error
@stream({ type: "sse" })
async subscribe() {}
// events option is required
3. httpMethod is always GET
@stream automatically sets httpMethod: "GET".
Copy
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
// Exposed as GET /notification/subscribe
}
Client Usage (Web)
Sonamu automatically generates SSE client code.Copy
// Auto-generated code
import { NotificationService } from "@/services/NotificationService";
// SSE connection
const eventSource = NotificationService.subscribe(userId);
// Register event listeners
eventSource.addEventListener("message", (event) => {
const data = JSON.parse(event.data);
console.log("New message:", data);
});
eventSource.addEventListener("status", (event) => {
const data = JSON.parse(event.data);
console.log("Status:", data);
});
// Connection end event
eventSource.addEventListener("end", () => {
console.log("Stream ended");
eventSource.close();
});
// Manual connection close
eventSource.close();
React Example
Copy
import { useEffect, useState } from "react";
function NotificationList({ userId }: { userId: number }) {
const [messages, setMessages] = useState<Message[]>([]);
const [status, setStatus] = useState<string>("disconnected");
useEffect(() => {
const eventSource = NotificationService.subscribe(userId);
eventSource.addEventListener("message", (event) => {
const message = JSON.parse(event.data);
setMessages(prev => [...prev, message]);
});
eventSource.addEventListener("status", (event) => {
const { type } = JSON.parse(event.data);
setStatus(type);
});
eventSource.addEventListener("end", () => {
console.log("Stream ended by server");
setStatus("disconnected");
});
return () => {
eventSource.close();
};
}, [userId]);
return (
<div>
<div>Status: {status}</div>
<ul>
{messages.map(msg => (
<li key={msg.id}>{msg.text}</li>
))}
</ul>
</div>
);
}
Logging
The@stream decorator automatically logs:
Copy
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
// Auto log:
// [DEBUG] stream: NotificationModel.subscribe
}
Example Collection
- Chat
- Progress Monitoring
- Real-time Data
Copy
const ChatEventsSchema = z.object({
message: z.object({
id: z.string(),
userId: z.number(),
text: z.string(),
timestamp: z.date()
}),
typing: z.object({
userId: z.number(),
isTyping: z.boolean()
}),
joined: z.object({
userId: z.number(),
username: z.string()
}),
left: z.object({
userId: z.number()
})
});
class ChatModelClass extends BaseModelClass {
@stream({
type: "sse",
events: ChatEventsSchema,
guards: ["user"]
})
async subscribe(roomId: number) {
const sse = Sonamu.getContext().createSSE(ChatEventsSchema);
const { user } = Sonamu.getContext();
// Join notification
sse.publish("joined", {
userId: user.id,
username: user.name
});
// Register message listener
const unsubscribe = this.onMessage(roomId, (message) => {
sse.publish("message", message);
});
// Auto-close after 1 hour
setTimeout(async () => {
unsubscribe();
sse.publish("left", { userId: user.id });
await sse.end();
}, 3600000);
return sse;
}
}
Copy
const ProcessEventsSchema = z.object({
started: z.object({
taskId: z.string(),
totalSteps: z.number()
}),
progress: z.object({
step: z.number(),
message: z.string(),
percent: z.number()
}),
completed: z.object({
success: z.boolean(),
result: z.any(),
duration: z.number()
}),
error: z.object({
step: z.number(),
message: z.string()
})
});
class ProcessModelClass extends BaseModelClass {
@stream({
type: "sse",
events: ProcessEventsSchema,
guards: ["admin"]
})
async monitor(processId: string) {
const sse = Sonamu.getContext().createSSE(ProcessEventsSchema);
const startTime = Date.now();
try {
const steps = await this.getProcessSteps(processId);
sse.publish("started", {
taskId: processId,
totalSteps: steps.length
});
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
sse.publish("progress", {
step: i + 1,
message: step.description,
percent: ((i + 1) / steps.length) * 100
});
await this.executeStep(step);
}
sse.publish("completed", {
success: true,
result: await this.getProcessResult(processId),
duration: Date.now() - startTime
});
} catch (error) {
sse.publish("error", {
step: -1,
message: error.message
});
}
await sse.end();
return sse;
}
}
Copy
const StockEventsSchema = z.object({
price: z.object({
symbol: z.string(),
price: z.number(),
change: z.number(),
timestamp: z.date()
}),
volume: z.object({
symbol: z.string(),
volume: z.number()
}),
alert: z.object({
type: z.enum(["high", "low"]),
symbol: z.string(),
message: z.string()
})
});
class StockModelClass extends BaseModelClass {
@stream({
type: "sse",
events: StockEventsSchema,
guards: ["user"]
})
async watchSymbol(symbol: string) {
const sse = Sonamu.getContext().createSSE(StockEventsSchema);
// Update price every second (for 1 minute)
for (let i = 0; i < 60; i++) {
const quote = await this.getQuote(symbol);
sse.publish("price", {
symbol,
price: quote.price,
change: quote.change,
timestamp: new Date()
});
// Price alert
if (quote.price > quote.highThreshold) {
sse.publish("alert", {
type: "high",
symbol,
message: `Price exceeded ${quote.highThreshold}`
});
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
await sse.end();
return sse;
}
}