๋ฉ”์ธ ์ฝ˜ํ…์ธ ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
@stream ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋Š” Server-Sent Events(SSE)๋ฅผ ์‚ฌ์šฉํ•œ ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ API๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

๊ธฐ๋ณธ ์‚ฌ์šฉ๋ฒ•

import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";

const NotificationEventsSchema = z.object({
  message: z.object({
    id: z.number(),
    text: z.string(),
    timestamp: z.date()
  }),
  status: z.object({
    type: z.enum(["connected", "disconnected"]),
    userId: z.number()
  })
});

class NotificationModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: NotificationEventsSchema
  })
  async subscribe(userId: number) {
    const sse = Sonamu.getContext().createSSE(NotificationEventsSchema);

    // ์—ฐ๊ฒฐ ์‹œ์ž‘
    sse.publish("status", {
      type: "connected",
      userId
    });

    // ์ฃผ๊ธฐ์ ์œผ๋กœ ์ด๋ฒคํŠธ ์ „์†ก
    const interval = setInterval(() => {
      sse.publish("message", {
        id: Date.now(),
        text: "New notification",
        timestamp: new Date()
      });
    }, 5000);

    // 5๋ถ„ ํ›„ ์—ฐ๊ฒฐ ์ข…๋ฃŒ
    setTimeout(async () => {
      clearInterval(interval);
      sse.publish("status", {
        type: "disconnected",
        userId
      });
      await sse.end();
    }, 300000);

    return sse;
  }
}

์˜ต์…˜

type

์ŠคํŠธ๋ฆฌ๋ฐ ํƒ€์ž…์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค. ํ•„์ˆ˜ ์˜ต์…˜
type: "sse"  // Server-Sent Events (ํ˜„์žฌ ์œ ์ผํ•œ ์˜ต์…˜)
ํ–ฅํ›„ WebSocket ์ง€์›์ด ์ถ”๊ฐ€๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

events

์ „์†กํ•  ์ด๋ฒคํŠธ์˜ ํƒ€์ž…์„ Zod ์Šคํ‚ค๋งˆ๋กœ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค. ํ•„์ˆ˜ ์˜ต์…˜
import { z } from "zod";

const EventsSchema = z.object({
  // ์ด๋ฒคํŠธ ์ด๋ฆ„: ์ด๋ฒคํŠธ ๋ฐ์ดํ„ฐ ์Šคํ‚ค๋งˆ
  progress: z.object({
    percent: z.number(),
    message: z.string()
  }),
  complete: z.object({
    result: z.string()
  }),
  error: z.object({
    message: z.string(),
    code: z.string()
  })
});

@stream({
  type: "sse",
  events: EventsSchema
})
async process() {
  const sse = Sonamu.getContext().createSSE(EventsSchema);
  
  // ์ด๋ฒคํŠธ ์ „์†ก
  sse.publish("progress", { percent: 50, message: "Processing..." });
  sse.publish("complete", { result: "Success" });
  
  // ์ŠคํŠธ๋ฆผ ์ข…๋ฃŒ
  await sse.end();
  
  return sse;
}

path

์ŠคํŠธ๋ฆฌ๋ฐ ์—”๋“œํฌ์ธํŠธ ๊ฒฝ๋กœ๋ฅผ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ๊ฐ’: /{modelName}/{methodName} (camelCase)
@stream({
  type: "sse",
  events: EventsSchema,
  path: "/stream/notifications"
})
async subscribe() {
  // /stream/notifications๋กœ ์ ‘๊ทผ
}

guards

์ ‘๊ทผ ๊ถŒํ•œ์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.
type GuardKey = "query" | "admin" | "user";
@stream({
  type: "sse",
  events: EventsSchema,
  guards: ["user"]
})
async subscribe() {
  // ๋กœ๊ทธ์ธํ•œ ์‚ฌ์šฉ์ž๋งŒ ์ ‘๊ทผ ๊ฐ€๋Šฅ
}

description

์ŠคํŠธ๋ฆฌ๋ฐ API ์„ค๋ช…์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
@stream({
  type: "sse",
  events: EventsSchema,
  description: "์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ์„ ์ŠคํŠธ๋ฆฌ๋ฐํ•ฉ๋‹ˆ๋‹ค."
})
async notifications() {
  // ...
}

resourceName

์ƒ์„ฑ๋˜๋Š” ์„œ๋น„์Šค ํŒŒ์ผ์˜ ๋ฆฌ์†Œ์Šค ์ด๋ฆ„์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.
@stream({
  type: "sse",
  events: EventsSchema,
  resourceName: "Notifications"
})
async subscribe() {
  // NotificationsService.ts ํŒŒ์ผ์— ํฌํ•จ
}

SSE ๊ฐ์ฒด ์‚ฌ์šฉ๋ฒ•

createSSE

Context์—์„œ SSE ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
const sse = Sonamu.getContext().createSSE(EventsSchema);

publish

์ด๋ฒคํŠธ๋ฅผ ํด๋ผ์ด์–ธํŠธ๋กœ ์ „์†กํ•ฉ๋‹ˆ๋‹ค.
sse.publish("eventName", data);
ํƒ€์ž… ์•ˆ์ •์„ฑ:
  • ์ด๋ฒคํŠธ ์ด๋ฆ„์€ ์Šคํ‚ค๋งˆ์— ์ •์˜๋œ ๊ฒƒ๋งŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅ
  • ๋ฐ์ดํ„ฐ๋Š” ์Šคํ‚ค๋งˆ์— ๋งž์ถฐ ์ž๋™ ๊ฒ€์ฆ
const EventsSchema = z.object({
  message: z.object({
    text: z.string()
  })
});

const sse = context.createSSE(EventsSchema);

// โœ… ์˜ฌ๋ฐ”๋ฅธ ์‚ฌ์šฉ
sse.publish("message", { text: "Hello" });

// โŒ ํƒ€์ž… ์—๋Ÿฌ
sse.publish("unknown", { text: "Hello" });  // 'unknown'์€ ์Šคํ‚ค๋งˆ์— ์—†์Œ
sse.publish("message", { text: 123 });      // text๋Š” string์ด์–ด์•ผ ํ•จ

end

์ŠคํŠธ๋ฆผ ์—ฐ๊ฒฐ์„ ์ข…๋ฃŒํ•ฉ๋‹ˆ๋‹ค.
await sse.end();
end()๋Š” ํด๋ผ์ด์–ธํŠธ์— โ€œendโ€ ์ด๋ฒคํŠธ๋ฅผ ์ „์†กํ•˜๊ณ  200ms ๋Œ€๊ธฐ ํ›„ ์—ฐ๊ฒฐ์„ ๋‹ซ์Šต๋‹ˆ๋‹ค.

์ „์ฒด ์˜ˆ์‹œ

import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";

const LogEventsSchema = z.object({
  log: z.object({
    level: z.enum(["info", "warn", "error"]),
    message: z.string(),
    timestamp: z.string()
  }),
  progress: z.object({
    current: z.number(),
    total: z.number()
  }),
  complete: z.object({
    success: z.boolean(),
    duration: z.number()
  })
});

class TaskModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: LogEventsSchema,
    path: "/tasks/logs",
    guards: ["admin"],
    description: "์ž‘์—… ์‹คํ–‰ ๋กœ๊ทธ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ŠคํŠธ๋ฆฌ๋ฐํ•ฉ๋‹ˆ๋‹ค."
  })
  async streamLogs(taskId: number) {
    const sse = Sonamu.getContext().createSSE(LogEventsSchema);
    const startTime = Date.now();

    // ์ž‘์—… ์‹œ์ž‘
    sse.publish("log", {
      level: "info",
      message: "Task started",
      timestamp: new Date().toISOString()
    });

    // ์ž‘์—… ์‹คํ–‰
    const total = 100;
    for (let i = 0; i <= total; i++) {
      await this.processItem(i);
      
      // ์ง„ํ–‰๋ฅ  ์ „์†ก
      sse.publish("progress", {
        current: i,
        total
      });

      // ๋กœ๊ทธ ์ „์†ก
      if (i % 10 === 0) {
        sse.publish("log", {
          level: "info",
          message: `Processed ${i}/${total} items`,
          timestamp: new Date().toISOString()
        });
      }
    }

    // ์ž‘์—… ์™„๋ฃŒ
    sse.publish("complete", {
      success: true,
      duration: Date.now() - startTime
    });

    // ์—ฐ๊ฒฐ ์ข…๋ฃŒ
    await sse.end();

    return sse;
  }

  private async processItem(index: number) {
    // ์ž‘์—… ์ฒ˜๋ฆฌ
    await new Promise(resolve => setTimeout(resolve, 50));
  }
}

๋‹ค๋ฅธ ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ์™€ ํ•จ๊ป˜ ์‚ฌ์šฉ

@transactional

@stream({ type: "sse", events: EventsSchema })
@transactional()
async process() {
  const sse = Sonamu.getContext().createSSE(EventsSchema);
  const wdb = this.getDB("w");
  
  // ํŠธ๋žœ์žญ์…˜ ๋‚ด์—์„œ ์‹คํ–‰
  // SSE ์ „์†ก๊ณผ DB ์ž‘์—… ํ•จ๊ป˜ ์ˆ˜ํ–‰
  
  await sse.end();
  return sse;
}
@stream์„ ๋จผ์ €, @transactional์„ ๋‚˜์ค‘์— ์ž‘์„ฑํ•˜์„ธ์š”.

์ œ์•ฝ์‚ฌํ•ญ

1. @api์™€ ์ค‘๋ณต ์‚ฌ์šฉ ๋ถˆ๊ฐ€

// โŒ ์—๋Ÿฌ ๋ฐœ์ƒ
@api()
@stream({ type: "sse", events: EventsSchema })
async subscribe() {}
์—๋Ÿฌ ๋ฉ”์‹œ์ง€:
@stream decorator can only be used once on NotificationModel.subscribe. 
You can use only one of @api or @stream decorator on the same method.

2. events๋Š” ํ•„์ˆ˜

// โŒ ์—๋Ÿฌ ๋ฐœ์ƒ
@stream({ type: "sse" })
async subscribe() {}
// events ์˜ต์…˜์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค

3. httpMethod๋Š” ํ•ญ์ƒ GET

@stream์€ ์ž๋™์œผ๋กœ httpMethod: "GET"์„ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
  // GET /notification/subscribe๋กœ ๋…ธ์ถœ๋จ
}

ํด๋ผ์ด์–ธํŠธ ์‚ฌ์šฉ (Web)

Sonamu๋Š” ์ž๋™์œผ๋กœ SSE ํด๋ผ์ด์–ธํŠธ ์ฝ”๋“œ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.
// ์ž๋™ ์ƒ์„ฑ๋œ ์ฝ”๋“œ
import { NotificationService } from "@/services/NotificationService";

// SSE ์—ฐ๊ฒฐ
const eventSource = NotificationService.subscribe(userId);

// ์ด๋ฒคํŠธ ๋ฆฌ์Šค๋„ˆ ๋“ฑ๋ก
eventSource.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);
  console.log("New message:", data);
});

eventSource.addEventListener("status", (event) => {
  const data = JSON.parse(event.data);
  console.log("Status:", data);
});

// ์—ฐ๊ฒฐ ์ข…๋ฃŒ ์ด๋ฒคํŠธ
eventSource.addEventListener("end", () => {
  console.log("Stream ended");
  eventSource.close();
});

// ์ˆ˜๋™ ์—ฐ๊ฒฐ ์ข…๋ฃŒ
eventSource.close();

React ์˜ˆ์‹œ

import { useEffect, useState } from "react";

function NotificationList({ userId }: { userId: number }) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [status, setStatus] = useState<string>("disconnected");

  useEffect(() => {
    const eventSource = NotificationService.subscribe(userId);

    eventSource.addEventListener("message", (event) => {
      const message = JSON.parse(event.data);
      setMessages(prev => [...prev, message]);
    });

    eventSource.addEventListener("status", (event) => {
      const { type } = JSON.parse(event.data);
      setStatus(type);
    });

    eventSource.addEventListener("end", () => {
      console.log("Stream ended by server");
      setStatus("disconnected");
    });

    return () => {
      eventSource.close();
    };
  }, [userId]);

  return (
    <div>
      <div>Status: {status}</div>
      <ul>
        {messages.map(msg => (
          <li key={msg.id}>{msg.text}</li>
        ))}
      </ul>
    </div>
  );
}

๋กœ๊น…

@stream ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋Š” ์ž๋™์œผ๋กœ ๋กœ๊ทธ๋ฅผ ๋‚จ๊น๋‹ˆ๋‹ค:
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
  // ์ž๋™ ๋กœ๊ทธ:
  // [DEBUG] stream: NotificationModel.subscribe
}

์˜ˆ์‹œ ๋ชจ์Œ

const ChatEventsSchema = z.object({
  message: z.object({
    id: z.string(),
    userId: z.number(),
    text: z.string(),
    timestamp: z.date()
  }),
  typing: z.object({
    userId: z.number(),
    isTyping: z.boolean()
  }),
  joined: z.object({
    userId: z.number(),
    username: z.string()
  }),
  left: z.object({
    userId: z.number()
  })
});

class ChatModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: ChatEventsSchema,
    guards: ["user"]
  })
  async subscribe(roomId: number) {
    const sse = Sonamu.getContext().createSSE(ChatEventsSchema);
    const { user } = Sonamu.getContext();

    // ์ž…์žฅ ์•Œ๋ฆผ
    sse.publish("joined", {
      userId: user.id,
      username: user.name
    });

    // ๋ฉ”์‹œ์ง€ ์ˆ˜์‹  ๋ฆฌ์Šค๋„ˆ ๋“ฑ๋ก
    const unsubscribe = this.onMessage(roomId, (message) => {
      sse.publish("message", message);
    });

    // 1์‹œ๊ฐ„ ํ›„ ์ž๋™ ์ข…๋ฃŒ
    setTimeout(async () => {
      unsubscribe();
      sse.publish("left", { userId: user.id });
      await sse.end();
    }, 3600000);

    return sse;
  }
}

๋‹ค์Œ ๋‹จ๊ณ„