Skip to main content
useSSEStream is a hook for managing real-time data streaming via Server-Sent Events (SSE). It provides auto-reconnection, type-safe event handlers, and connection state tracking.

Core Features

Real-time Streaming

Server to clientUnidirectional real-time data

Auto-reconnection

Automatic retry on failureConfigurable count/interval

Type Safety

Per-event type definitionGeneric-based

Connection State

isConnected, error trackingUI feedback ready

What is SSE?

Server-Sent Events is a technology for unidirectional real-time data transmission from server to client.

WebSocket vs SSE

FeatureWebSocketSSE
DirectionBidirectional (Full-duplex)Unidirectional (Server → Client)
ProtocolWS/WSSHTTP/HTTPS
ReconnectionManual implementationBrowser auto-reconnect
ComplexityHighLow
Use CasesChat, gamesNotifications, logs, progress
When to use SSE
  • Server-to-client only data push
  • AI response streaming (ChatGPT style)
  • Real-time log monitoring
  • File upload/processing progress
  • Server event notifications

Basic Usage

Import

import { useSSEStream } from "./sonamu.shared";
useSSEStream is defined in sonamu.shared.ts, which is generated during project scaffolding. services.generated.ts auto-generates wrapper functions for each @stream API, using useSSEStream internally.

Type Definition

// Event type definition
type AIStreamEvents = {
  token: { text: string };     // token event: text chunks
  usage: { tokens: number };   // usage event: token count
  end: string;                  // end event: completion signal
};

Basic Usage

import { useState } from "react";
import { useSSEStream } from "./sonamu.shared";

export function AIChat() {
  const [response, setResponse] = useState("");
  const [totalTokens, setTotalTokens] = useState(0);

  const { isConnected, error, isEnded } = useSSEStream<AIStreamEvents>(
    "/api/ai/stream",  // SSE endpoint
    { prompt: "Hello, AI!" },  // Query parameters
    {
      // Event handlers
      token: (data) => {
        // Append each text chunk
        setResponse((prev) => prev + data.text);
      },
      usage: (data) => {
        // Update token usage
        setTotalTokens(data.tokens);
      },
      end: () => {
        // Streaming complete
        console.log("Stream ended");
      },
    },
    {
      enabled: true,      // Auto-connect
      retry: 3,           // Max 3 retries
      retryInterval: 3000, // 3 second interval
    }
  );

  return (
    <div>
      {isConnected && <div className="text-green-500">Connected</div>}
      {error && <div className="text-red-500">Error: {error}</div>}
      <div className="whitespace-pre-wrap">{response}</div>
      {isEnded && <div>Tokens used: {totalTokens}</div>}
    </div>
  );
}

API Reference

useSSEStream

function useSSEStream<T extends Record<string, any>>(
  url: string,
  params: Record<string, any>,
  handlers: {
    [K in keyof T]?: (data: T[K]) => void;
  },
  options?: SSEStreamOptions
): SSEStreamState

Parameters

url

The SSE endpoint URL.
useSSEStream("/api/ai/stream", params, handlers);

params

Object to be sent as URL query parameters.
useSSEStream(
  "/api/ai/stream",
  {
    prompt: "Hello",
    model: "gpt-4",
    temperature: 0.7,
  },
  handlers
);

// Actual request URL:
// /api/ai/stream?prompt=Hello&model=gpt-4&temperature=0.7

handlers

Handler functions for each event type.
{
  token: (data: { text: string }) => {
    // Handle token event
  },
  usage: (data: { tokens: number }) => {
    // Handle usage event
  },
  end: () => {
    // Handle end event
  },
}
Special events:
  • end: When the server explicitly signals stream termination
  • message: Default message when no event type is specified

options

OptionTypeDefaultDescription
enabledbooleantrueAuto-connect toggle
retrynumber3Max retry count
retryIntervalnumber3000Retry interval (ms)

Return Value (SSEStreamState)

type SSEStreamState = {
  isConnected: boolean;  // Current connection state
  error: string | null;  // Error message
  retryCount: number;    // Current retry count
  isEnded: boolean;      // Normal termination flag
};

Practical Examples

AI Response Streaming

import { useState } from "react";
import { useSSEStream } from "./sonamu.shared";
import { Button, Textarea } from "@sonamu-kit/react-components/components";

type AIStreamEvents = {
  token: { text: string };
  end: string;
};

export function AIChat() {
  const [prompt, setPrompt] = useState("");
  const [response, setResponse] = useState("");
  const [enabled, setEnabled] = useState(false);

  const { isConnected, error, isEnded } = useSSEStream<AIStreamEvents>(
    "/api/ai/chat",
    { prompt },
    {
      token: (data) => {
        setResponse((prev) => prev + data.text);
      },
      end: () => {
        console.log("AI response complete");
        setEnabled(false);  // Close connection
      },
    },
    {
      enabled,
      retry: 1,  // Only 1 retry for AI requests
      retryInterval: 1000,
    }
  );

  const handleSubmit = () => {
    setResponse("");  // Clear previous response
    setEnabled(true);  // Start stream
  };

  return (
    <div className="space-y-4">
      <Textarea
        value={prompt}
        onValueChange={setPrompt}
        placeholder="Enter your question"
        rows={3}
      />

      <Button
        onClick={handleSubmit}
        disabled={isConnected || !prompt}
      >
        {isConnected ? "Responding..." : "Ask"}
      </Button>

      {error && (
        <div className="p-4 bg-red-50 text-red-700 rounded">
          {error}
        </div>
      )}

      {response && (
        <div className="p-4 bg-gray-50 rounded">
          <div className="whitespace-pre-wrap">{response}</div>
          {isConnected && <span className="animate-pulse"></span>}
        </div>
      )}

      {isEnded && (
        <div className="text-sm text-gray-500">
          Response complete
        </div>
      )}
    </div>
  );
}

File Upload Progress

type UploadEvents = {
  progress: { percent: number; loaded: number; total: number };
  complete: { fileUrl: string };
  error: { message: string };
};

export function FileUploader() {
  const [file, setFile] = useState<File | null>(null);
  const [uploadId, setUploadId] = useState<string | null>(null);
  const [progress, setProgress] = useState(0);
  const [fileUrl, setFileUrl] = useState<string | null>(null);

  const { isConnected, error } = useSSEStream<UploadEvents>(
    "/api/file/upload/progress",
    { uploadId: uploadId ?? "" },
    {
      progress: (data) => {
        setProgress(data.percent);
      },
      complete: (data) => {
        setFileUrl(data.fileUrl);
      },
      error: (data) => {
        alert(data.message);
      },
    },
    {
      enabled: uploadId !== null,
      retry: 5,
      retryInterval: 2000,
    }
  );

  const handleUpload = async () => {
    if (!file) return;

    // 1. Start upload (get uploadId)
    const formData = new FormData();
    formData.append("file", file);

    const response = await fetch("/api/file/upload", {
      method: "POST",
      body: formData,
    });

    const { uploadId } = await response.json();

    // 2. Track progress via SSE
    setUploadId(uploadId);
  };

  return (
    <div>
      <input
        type="file"
        onChange={(e) => setFile(e.target.files?.[0] ?? null)}
      />

      <Button onClick={handleUpload} disabled={!file || isConnected}>
        Upload
      </Button>

      {isConnected && (
        <div>
          <div className="w-full bg-gray-200 rounded-full h-2">
            <div
              className="bg-blue-600 h-2 rounded-full transition-all"
              style={{ width: `${progress}%` }}
            />
          </div>
          <p>{progress}%</p>
        </div>
      )}

      {fileUrl && (
        <div>
          Upload complete: <a href={fileUrl}>{fileUrl}</a>
        </div>
      )}

      {error && <div className="text-red-500">{error}</div>}
    </div>
  );
}

Real-time Log Monitoring

type LogEvents = {
  log: { level: string; message: string; timestamp: string };
};

export function LogViewer() {
  const [logs, setLogs] = useState<Array<{ level: string; message: string; timestamp: string }>>([]);
  const [enabled, setEnabled] = useState(true);

  const { isConnected, error } = useSSEStream<LogEvents>(
    "/api/logs/stream",
    { level: "info" },
    {
      log: (data) => {
        setLogs((prev) => [...prev, data].slice(-100));  // Keep only last 100
      },
    },
    {
      enabled,
      retry: 10,  // Keep reconnecting for logs
      retryInterval: 5000,
    }
  );

  return (
    <div>
      <div className="flex items-center gap-2 mb-4">
        <Button
          variant={enabled ? "red" : "default"}
          onClick={() => setEnabled(!enabled)}
        >
          {enabled ? "Stop" : "Start"}
        </Button>

        {isConnected && <span className="text-green-500">Connected</span>}
        {error && <span className="text-red-500">● {error}</span>}
      </div>

      <div className="bg-black text-green-400 p-4 rounded font-mono text-sm h-96 overflow-y-auto">
        {logs.map((log, i) => (
          <div key={i} className="mb-1">
            <span className="text-gray-500">[{log.timestamp}]</span>{" "}
            <span className={log.level === "error" ? "text-red-400" : ""}>
              {log.level.toUpperCase()}
            </span>{" "}
            {log.message}
          </div>
        ))}
      </div>
    </div>
  );
}

Backend Implementation

SSE endpoints are created using the @stream decorator and ctx.createSSE().

Basic SSE API

// ai.model.ts (backend)
import { BaseModelClass, stream, Sonamu, z } from "sonamu";

class AIModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      token: z.object({ text: z.string() }),
      usage: z.object({ tokens: z.number() }),
    }),
    guards: ["login"],
  })
  async chat(prompt: string): Promise<void> {
    const ctx = Sonamu.getContext();
    const sse = ctx.createSSE(
      z.object({
        token: z.object({ text: z.string() }),
        usage: z.object({ tokens: z.number() }),
      })
    );

    const stream = await openai.chat.completions.create({
      model: "gpt-4",
      messages: [{ role: "user", content: prompt }],
      stream: true,
    });

    for await (const chunk of stream) {
      const text = chunk.choices[0]?.delta?.content ?? "";
      if (text) {
        sse.publish('token', { text });
      }
    }

    sse.publish('usage', { tokens: 42 });
    await sse.end();
  }
}

publish Format

sse.publish('eventName', { /* data */ });
Examples:
sse.publish('token', { text: "Hello" });
sse.publish('usage', { tokens: 42 });
await sse.end();  // Sends end event and closes the stream
For detailed SSE endpoint implementation, see Creating SSE Endpoints.

Advanced Features

Conditional Connection

const [shouldConnect, setShouldConnect] = useState(false);

const { isConnected } = useSSEStream(
  "/api/notifications",
  {},
  handlers,
  {
    enabled: shouldConnect,  // Won't connect when false
  }
);

// Start connection via user action
<Button onClick={() => setShouldConnect(true)}>
  Subscribe to notifications
</Button>

Dynamic Parameters

const [filter, setFilter] = useState("all");

useSSEStream(
  "/api/logs/stream",
  { level: filter },  // Reconnects when filter changes
  handlers
);

// When filter changes, the connection is automatically dropped and reconnected with new params

Manual Reconnection

const [reconnectTrigger, setReconnectTrigger] = useState(0);

useSSEStream(
  "/api/stream",
  { trigger: reconnectTrigger },  // Reconnects when trigger changes
  handlers
);

// Manual reconnection
<Button onClick={() => setReconnectTrigger(prev => prev + 1)}>
  Reconnect
</Button>

Multiple Streams

// Stream 1: AI response
const stream1 = useSSEStream<AIEvents>(
  "/api/ai/chat",
  { prompt: "Hello" },
  {
    token: (data) => setAiResponse(prev => prev + data.text),
  }
);

// Stream 2: Notifications
const stream2 = useSSEStream<NotificationEvents>(
  "/api/notifications",
  {},
  {
    notification: (data) => showNotification(data),
  }
);

Cautions

1. EventSource Limit

  • HTTP/1.1 limits to 6 concurrent connections per domain
  • Avoid opening too many SSE connections simultaneously

2. Memory Management

Don’t accumulate streaming data indefinitely.
// Bad: Unbounded memory growth
const [logs, setLogs] = useState<string[]>([]);

useSSEStream("/api/logs", {}, {
  log: (data) => {
    setLogs(prev => [...prev, data.message]);  // Grows forever
  },
});

// Good: Keep only last N entries
useSSEStream("/api/logs", {}, {
  log: (data) => {
    setLogs(prev => [...prev, data.message].slice(-100));  // Last 100 only
  },
});

3. Error Handling

const { error, isEnded } = useSSEStream(url, params, {
  error: (data) => {
    // Error event from server
    console.error("Server error:", data);
  },
});

// Connection failure error
if (error) {
  console.error("Connection error:", error);
}

4. Component Unmount

Connections are automatically closed when the component unmounts.
useEffect(() => {
  // useSSEStream handles cleanup automatically
  return () => {
    // No manual cleanup needed
  };
}, []);

Troubleshooting

SSE Not Connecting

Cause: CORS configuration or endpoint error Solution:
// Backend: sonamu.config.ts
export default {
  api: {
    cors: {
      origin: "http://localhost:5173",
      credentials: true,
    },
  },
};

Reconnection Keeps Failing

Cause: Backend server is down or network issue Solution: Adjust retry and retryInterval, or display an error UI.

Data Not Parsing

Cause: Server sending non-JSON format Solution: Use sse.publish('eventName', jsonObject) format on the backend.