Skip to main content
Sonamu makes it easy to create SSE endpoints using the @stream decorator. Generator functions are used to send events sequentially.

@stream Decorator

A decorator for defining SSE endpoints.

Basic Usage

import { BaseModel, stream, api, z } from "sonamu";

class NotificationModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      notification: z.object({
        id: z.number(),
        message: z.string(),
      }),
    })
  })
  @api({ compress: false })  // Compression must be disabled
  async *streamNotifications(userId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        notification: z.object({
          id: z.number(),
          message: z.string(),
        }),
      })
    );

    // Send event
    sse.publish('notification', {
      id: 1,
      message: 'Hello, World!'
    });

    // Close connection
    await sse.end();
  }
}

@stream Options

Stream type
@stream({
  type: 'sse',  // Currently only SSE is supported
})
Supported types:
  • 'sse': Server-Sent Events
  • (WebSocket support planned for future)

SSEConnection API

The SSE connection object created by ctx.createSSE().

publish(event, data)

Sends an event to the client.
sse.publish('notification', {
  id: 1,
  message: 'New notification'
});
Type safety:
// ✅ Correct event
sse.publish('notification', { id: 1, message: 'Hi' });

// ❌ Wrong event (compile error)
sse.publish('unknown', { ... });

// ❌ Wrong data (compile error)
sse.publish('notification', { id: 'wrong' });

end()

Closes the connection.
await sse.end();
Auto-sends: event: end, data: END

Practical Examples

1. Real-time Notifications

class NotificationModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      notification: z.object({
        id: z.number(),
        type: z.enum(['like', 'comment', 'follow']),
        message: z.string(),
        createdAt: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamNotifications(userId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        notification: z.object({
          id: z.number(),
          type: z.enum(['like', 'comment', 'follow']),
          message: z.string(),
          createdAt: z.string(),
        }),
      })
    );

    try {
      // Real-time notification detection (e.g., polling or event listener)
      while (true) {
        // Query new notifications from DB
        const notifications = await this.findMany({
          wq: [
            ['user_id', userId],
            ['created_at', '>', new Date(Date.now() - 5000)],  // Last 5 seconds
          ],
          num: 10,
        });

        // Send notifications
        for (const notification of notifications.data) {
          sse.publish('notification', {
            id: notification.id,
            type: notification.type,
            message: notification.message,
            createdAt: notification.created_at.toISOString(),
          });
        }

        // Wait 5 seconds
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    } finally {
      await sse.end();
    }
  }
}

2. Task Progress

class TaskModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      progress: z.object({
        current: z.number(),
        total: z.number(),
        percentage: z.number(),
      }),
      complete: z.object({
        result: z.string(),
      }),
      error: z.object({
        message: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamTaskProgress(taskId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        progress: z.object({
          current: z.number(),
          total: z.number(),
          percentage: z.number(),
        }),
        complete: z.object({
          result: z.string(),
        }),
        error: z.object({
          message: z.string(),
        }),
      })
    );

    try {
      const total = 100;

      for (let current = 0; current <= total; current += 10) {
        // Send progress
        sse.publish('progress', {
          current,
          total,
          percentage: Math.round((current / total) * 100),
        });

        // Perform work (e.g., file processing)
        await new Promise(resolve => setTimeout(resolve, 1000));
      }

      // Completion event
      sse.publish('complete', {
        result: 'Task completed successfully'
      });
    } catch (error) {
      // Error event
      sse.publish('error', {
        message: error.message
      });
    } finally {
      await sse.end();
    }
  }
}

3. Live Feed

class PostModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      newPost: z.object({
        id: z.number(),
        title: z.string(),
        author: z.string(),
        createdAt: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamNewPosts(ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        newPost: z.object({
          id: z.number(),
          title: z.string(),
          author: z.string(),
          createdAt: z.string(),
        }),
      })
    );

    try {
      let lastCheckTime = new Date();

      while (true) {
        // Query new posts
        const posts = await this.findMany({
          wq: [['created_at', '>', lastCheckTime]],
          num: 20,
          order: [['created_at', 'DESC']],
        });

        // Send new posts
        for (const post of posts.data) {
          sse.publish('newPost', {
            id: post.id,
            title: post.title,
            author: post.author_name,
            createdAt: post.created_at.toISOString(),
          });
        }

        // Update last check time
        if (posts.data.length > 0) {
          lastCheckTime = posts.data[0].created_at;
        }

        // Wait 10 seconds
        await new Promise(resolve => setTimeout(resolve, 10000));
      }
    } finally {
      await sse.end();
    }
  }
}

4. Multiple Events

class ChatModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      message: z.object({
        id: z.number(),
        text: z.string(),
        sender: z.string(),
      }),
      typing: z.object({
        userId: z.number(),
        isTyping: z.boolean(),
      }),
      joined: z.object({
        userId: z.number(),
        username: z.string(),
      }),
      left: z.object({
        userId: z.number(),
      }),
    })
  })
  @api({ compress: false })
  async *streamChatRoom(roomId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        message: z.object({
          id: z.number(),
          text: z.string(),
          sender: z.string(),
        }),
        typing: z.object({
          userId: z.number(),
          isTyping: z.boolean(),
        }),
        joined: z.object({
          userId: z.number(),
          username: z.string(),
        }),
        left: z.object({
          userId: z.number(),
        }),
      })
    );

    try {
      // Join event
      sse.publish('joined', {
        userId: ctx.user.id,
        username: ctx.user.name,
      });

      // Message streaming
      while (true) {
        const messages = await this.getNewMessages(roomId);

        for (const msg of messages) {
          sse.publish('message', {
            id: msg.id,
            text: msg.text,
            sender: msg.sender_name,
          });
        }

        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    } finally {
      // Leave event
      sse.publish('left', {
        userId: ctx.user.id,
      });

      await sse.end();
    }
  }
}

5. Conditional Sending

class MonitorModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      status: z.object({
        cpu: z.number(),
        memory: z.number(),
        disk: z.number(),
      }),
      alert: z.object({
        level: z.enum(['warning', 'critical']),
        message: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamServerStatus(ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        status: z.object({
          cpu: z.number(),
          memory: z.number(),
          disk: z.number(),
        }),
        alert: z.object({
          level: z.enum(['warning', 'critical']),
          message: z.string(),
        }),
      })
    );

    try {
      while (true) {
        const status = await this.getServerStatus();

        // Send status
        sse.publish('status', {
          cpu: status.cpu,
          memory: status.memory,
          disk: status.disk,
        });

        // Alert on threshold exceeded
        if (status.cpu > 90) {
          sse.publish('alert', {
            level: 'critical',
            message: `CPU usage is ${status.cpu}%`,
          });
        } else if (status.cpu > 70) {
          sse.publish('alert', {
            level: 'warning',
            message: `CPU usage is ${status.cpu}%`,
          });
        }

        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    } finally {
      await sse.end();
    }
  }
}

Connection Management

Detecting Client Disconnection

@stream({
  type: 'sse',
  events: z.object({
    message: z.string(),
  })
})
@api({ compress: false })
async *streamData(ctx: Context) {
  const sse = ctx.createSSE(
    z.object({
      message: z.string(),
    })
  );

  // Detect client disconnection
  ctx.request.socket.on('close', () => {
    console.log('Client disconnected');
  });

  try {
    while (true) {
      sse.publish('message', 'Hello');
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  } finally {
    await sse.end();
  }
}

Keep-Alive

SSE automatically sends keep-alive signals, but custom implementation is also possible:
@stream({
  type: 'sse',
  events: z.object({
    message: z.string(),
    keepalive: z.object({}),
  })
})
@api({ compress: false })
async *streamWithKeepAlive(ctx: Context) {
  const sse = ctx.createSSE(
    z.object({
      message: z.string(),
      keepalive: z.object({}),
    })
  );

  // Send keep-alive (every 30 seconds)
  const keepAliveInterval = setInterval(() => {
    sse.publish('keepalive', {});
  }, 30000);

  try {
    while (true) {
      sse.publish('message', 'Data');
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  } finally {
    clearInterval(keepAliveInterval);
    await sse.end();
  }
}

Error Handling

@stream({
  type: 'sse',
  events: z.object({
    data: z.object({
      value: z.number(),
    }),
    error: z.object({
      message: z.string(),
    }),
  })
})
@api({ compress: false })
async *streamWithErrorHandling(ctx: Context) {
  const sse = ctx.createSSE(
    z.object({
      data: z.object({
        value: z.number(),
      }),
      error: z.object({
        message: z.string(),
      }),
    })
  );

  try {
    const value = await this.getSomeData();
    sse.publish('data', { value });
  } catch (error) {
    // Send error as event
    sse.publish('error', {
      message: error.message
    });
  } finally {
    await sse.end();
  }
}

Important Notes

Important considerations when writing SSE endpoints:
  1. Compression must be disabled: SSE is streaming, so compression is prohibited
    @api({ compress: false })
    
  2. Use generator function: async * keyword required
    // ✅ Generator function
    async *streamData() { ... }
    
    // ❌ Regular function
    async streamData() { ... }
    
  3. Always call end(): Connection closure is required
    try {
      // Send events
    } finally {
      await sse.end();  // Required!
    }
    
  4. Beware of infinite loops: Appropriate wait time needed
    while (true) {
      sse.publish('data', ...);
      await new Promise(resolve => setTimeout(resolve, 1000));  // Wait 1 second
    }
    
  5. Event schema matching: events and publish event names must match
    // ✅ Match
    events: z.object({
      message: z.string(),
    })
    sse.publish('message', 'Hi');
    
    // ❌ Mismatch (compile error)
    sse.publish('msg', 'Hi');
    
  6. Prevent memory leaks: Clean up timers/listeners
    const interval = setInterval(() => { ... }, 1000);
    try {
      // ...
    } finally {
      clearInterval(interval);  // Cleanup
      await sse.end();
    }
    

Next Steps