Skip to main content
The @stream decorator creates real-time streaming APIs using Server-Sent Events (SSE).

Basic Usage

import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";

const NotificationEventsSchema = z.object({
  message: z.object({
    id: z.number(),
    text: z.string(),
    timestamp: z.date()
  }),
  status: z.object({
    type: z.enum(["connected", "disconnected"]),
    userId: z.number()
  })
});

class NotificationModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: NotificationEventsSchema
  })
  async subscribe(userId: number) {
    const sse = Sonamu.getContext().createSSE(NotificationEventsSchema);

    // Connection started
    sse.publish("status", {
      type: "connected",
      userId
    });

    // Send events periodically
    const interval = setInterval(() => {
      sse.publish("message", {
        id: Date.now(),
        text: "New notification",
        timestamp: new Date()
      });
    }, 5000);

    // Close connection after 5 minutes
    setTimeout(async () => {
      clearInterval(interval);
      sse.publish("status", {
        type: "disconnected",
        userId
      });
      await sse.end();
    }, 300000);

    return sse;
  }
}

Options

type

Specifies the streaming type. Required option
type: "sse"  // Server-Sent Events (currently the only option)
WebSocket support may be added in the future.

events

Defines event types to send using a Zod schema. Required option
import { z } from "zod";

const EventsSchema = z.object({
  // Event name: Event data schema
  progress: z.object({
    percent: z.number(),
    message: z.string()
  }),
  complete: z.object({
    result: z.string()
  }),
  error: z.object({
    message: z.string(),
    code: z.string()
  })
});

@stream({
  type: "sse",
  events: EventsSchema
})
async process() {
  const sse = Sonamu.getContext().createSSE(EventsSchema);

  // Send events
  sse.publish("progress", { percent: 50, message: "Processing..." });
  sse.publish("complete", { result: "Success" });

  // End stream
  await sse.end();

  return sse;
}

path

Specifies the streaming endpoint path. Default: /{modelName}/{methodName} (camelCase)
@stream({
  type: "sse",
  events: EventsSchema,
  path: "/stream/notifications"
})
async subscribe() {
  // Accessible at /stream/notifications
}

guards

Specifies access permissions.
type GuardKey = "query" | "admin" | "user";
@stream({
  type: "sse",
  events: EventsSchema,
  guards: ["user"]
})
async subscribe() {
  // Only logged-in users can access
}

description

Adds a description for the streaming API.
@stream({
  type: "sse",
  events: EventsSchema,
  description: "Streams real-time notifications."
})
async notifications() {
  // ...
}

resourceName

Specifies the resource name for the generated service file.
@stream({
  type: "sse",
  events: EventsSchema,
  resourceName: "Notifications"
})
async subscribe() {
  // Included in NotificationsService.ts file
}

SSE Object Usage

createSSE

Creates an SSE object from Context.
const sse = Sonamu.getContext().createSSE(EventsSchema);

publish

Sends an event to the client.
sse.publish("eventName", data);
Type safety:
  • Event names must be defined in the schema
  • Data is automatically validated against the schema
const EventsSchema = z.object({
  message: z.object({
    text: z.string()
  })
});

const sse = context.createSSE(EventsSchema);

// ✅ Correct usage
sse.publish("message", { text: "Hello" });

// ❌ Type error
sse.publish("unknown", { text: "Hello" });  // 'unknown' not in schema
sse.publish("message", { text: 123 });      // text must be string

end

Closes the stream connection.
await sse.end();
end() sends an “end” event to the client and closes the connection after waiting 200ms.

Complete Example

import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";

const LogEventsSchema = z.object({
  log: z.object({
    level: z.enum(["info", "warn", "error"]),
    message: z.string(),
    timestamp: z.string()
  }),
  progress: z.object({
    current: z.number(),
    total: z.number()
  }),
  complete: z.object({
    success: z.boolean(),
    duration: z.number()
  })
});

class TaskModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: LogEventsSchema,
    path: "/tasks/logs",
    guards: ["admin"],
    description: "Streams task execution logs in real-time."
  })
  async streamLogs(taskId: number) {
    const sse = Sonamu.getContext().createSSE(LogEventsSchema);
    const startTime = Date.now();

    // Start task
    sse.publish("log", {
      level: "info",
      message: "Task started",
      timestamp: new Date().toISOString()
    });

    // Execute task
    const total = 100;
    for (let i = 0; i <= total; i++) {
      await this.processItem(i);

      // Send progress
      sse.publish("progress", {
        current: i,
        total
      });

      // Send logs
      if (i % 10 === 0) {
        sse.publish("log", {
          level: "info",
          message: `Processed ${i}/${total} items`,
          timestamp: new Date().toISOString()
        });
      }
    }

    // Task complete
    sse.publish("complete", {
      success: true,
      duration: Date.now() - startTime
    });

    // Close connection
    await sse.end();

    return sse;
  }

  private async processItem(index: number) {
    // Process task
    await new Promise(resolve => setTimeout(resolve, 50));
  }
}

Using with Other Decorators

@transactional

@stream({ type: "sse", events: EventsSchema })
@transactional()
async process() {
  const sse = Sonamu.getContext().createSSE(EventsSchema);
  const wdb = this.getDB("w");

  // Execute within transaction
  // Perform SSE sending and DB operations together

  await sse.end();
  return sse;
}
Write @stream first, then @transactional.

Constraints

1. Cannot be used with @api

// ❌ Error
@api()
@stream({ type: "sse", events: EventsSchema })
async subscribe() {}
Error message:
@stream decorator can only be used once on NotificationModel.subscribe.
You can use only one of @api or @stream decorator on the same method.

2. events is required

// ❌ Error
@stream({ type: "sse" })
async subscribe() {}
// events option is required

3. httpMethod is always GET

@stream automatically sets httpMethod: "GET".
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
  // Exposed as GET /notification/subscribe
}

Client Usage (Web)

Sonamu automatically generates SSE client code.
// Auto-generated code
import { NotificationService } from "@/services/NotificationService";

// SSE connection
const eventSource = NotificationService.subscribe(userId);

// Register event listeners
eventSource.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);
  console.log("New message:", data);
});

eventSource.addEventListener("status", (event) => {
  const data = JSON.parse(event.data);
  console.log("Status:", data);
});

// Connection end event
eventSource.addEventListener("end", () => {
  console.log("Stream ended");
  eventSource.close();
});

// Manual connection close
eventSource.close();

React Example

import { useEffect, useState } from "react";

function NotificationList({ userId }: { userId: number }) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [status, setStatus] = useState<string>("disconnected");

  useEffect(() => {
    const eventSource = NotificationService.subscribe(userId);

    eventSource.addEventListener("message", (event) => {
      const message = JSON.parse(event.data);
      setMessages(prev => [...prev, message]);
    });

    eventSource.addEventListener("status", (event) => {
      const { type } = JSON.parse(event.data);
      setStatus(type);
    });

    eventSource.addEventListener("end", () => {
      console.log("Stream ended by server");
      setStatus("disconnected");
    });

    return () => {
      eventSource.close();
    };
  }, [userId]);

  return (
    <div>
      <div>Status: {status}</div>
      <ul>
        {messages.map(msg => (
          <li key={msg.id}>{msg.text}</li>
        ))}
      </ul>
    </div>
  );
}

Logging

The @stream decorator automatically logs:
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
  // Auto log:
  // [DEBUG] stream: NotificationModel.subscribe
}

Example Collection

const ChatEventsSchema = z.object({
  message: z.object({
    id: z.string(),
    userId: z.number(),
    text: z.string(),
    timestamp: z.date()
  }),
  typing: z.object({
    userId: z.number(),
    isTyping: z.boolean()
  }),
  joined: z.object({
    userId: z.number(),
    username: z.string()
  }),
  left: z.object({
    userId: z.number()
  })
});

class ChatModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: ChatEventsSchema,
    guards: ["user"]
  })
  async subscribe(roomId: number) {
    const sse = Sonamu.getContext().createSSE(ChatEventsSchema);
    const { user } = Sonamu.getContext();

    // Join notification
    sse.publish("joined", {
      userId: user.id,
      username: user.name
    });

    // Register message listener
    const unsubscribe = this.onMessage(roomId, (message) => {
      sse.publish("message", message);
    });

    // Auto-close after 1 hour
    setTimeout(async () => {
      unsubscribe();
      sse.publish("left", { userId: user.id });
      await sse.end();
    }, 3600000);

    return sse;
  }
}

Next Steps