메인 콘텐츠로 건너뛰기
@stream 데코레이터는 Server-Sent Events(SSE)를 사용한 실시간 스트리밍 API를 생성합니다.

기본 사용법

import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";

const NotificationEventsSchema = z.object({
  message: z.object({
    id: z.number(),
    text: z.string(),
    timestamp: z.date()
  }),
  status: z.object({
    type: z.enum(["connected", "disconnected"]),
    userId: z.number()
  })
});

class NotificationModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: NotificationEventsSchema
  })
  async subscribe(userId: number) {
    const sse = Sonamu.getContext().createSSE(NotificationEventsSchema);

    // 연결 시작
    sse.publish("status", {
      type: "connected",
      userId
    });

    // 주기적으로 이벤트 전송
    const interval = setInterval(() => {
      sse.publish("message", {
        id: Date.now(),
        text: "New notification",
        timestamp: new Date()
      });
    }, 5000);

    // 5분 후 연결 종료
    setTimeout(async () => {
      clearInterval(interval);
      sse.publish("status", {
        type: "disconnected",
        userId
      });
      await sse.end();
    }, 300000);

    return sse;
  }
}

옵션

type

스트리밍 타입을 지정합니다. 필수 옵션
type: "sse"  // Server-Sent Events (현재 유일한 옵션)
향후 WebSocket 지원이 추가될 수 있습니다.

events

전송할 이벤트의 타입을 Zod 스키마로 정의합니다. 필수 옵션
import { z } from "zod";

const EventsSchema = z.object({
  // 이벤트 이름: 이벤트 데이터 스키마
  progress: z.object({
    percent: z.number(),
    message: z.string()
  }),
  complete: z.object({
    result: z.string()
  }),
  error: z.object({
    message: z.string(),
    code: z.string()
  })
});

@stream({
  type: "sse",
  events: EventsSchema
})
async process() {
  const sse = Sonamu.getContext().createSSE(EventsSchema);
  
  // 이벤트 전송
  sse.publish("progress", { percent: 50, message: "Processing..." });
  sse.publish("complete", { result: "Success" });
  
  // 스트림 종료
  await sse.end();
  
  return sse;
}

path

스트리밍 엔드포인트 경로를 지정합니다. 기본값: /{modelName}/{methodName} (camelCase)
@stream({
  type: "sse",
  events: EventsSchema,
  path: "/stream/notifications"
})
async subscribe() {
  // /stream/notifications로 접근
}

guards

접근 권한을 지정합니다.
type GuardKey = "query" | "admin" | "user";
@stream({
  type: "sse",
  events: EventsSchema,
  guards: ["user"]
})
async subscribe() {
  // 로그인한 사용자만 접근 가능
}

description

스트리밍 API 설명을 추가합니다.
@stream({
  type: "sse",
  events: EventsSchema,
  description: "실시간 알림을 스트리밍합니다."
})
async notifications() {
  // ...
}

resourceName

생성되는 서비스 파일의 리소스 이름을 지정합니다.
@stream({
  type: "sse",
  events: EventsSchema,
  resourceName: "Notifications"
})
async subscribe() {
  // NotificationsService.ts 파일에 포함
}

SSE 객체 사용법

createSSE

Context에서 SSE 객체를 생성합니다.
const sse = Sonamu.getContext().createSSE(EventsSchema);

publish

이벤트를 클라이언트로 전송합니다.
sse.publish("eventName", data);
타입 안정성:
  • 이벤트 이름은 스키마에 정의된 것만 사용 가능
  • 데이터는 스키마에 맞춰 자동 검증
const EventsSchema = z.object({
  message: z.object({
    text: z.string()
  })
});

const sse = context.createSSE(EventsSchema);

// ✅ 올바른 사용
sse.publish("message", { text: "Hello" });

// ❌ 타입 에러
sse.publish("unknown", { text: "Hello" });  // 'unknown'은 스키마에 없음
sse.publish("message", { text: 123 });      // text는 string이어야 함

end

스트림 연결을 종료합니다.
await sse.end();
end()는 클라이언트에 “end” 이벤트를 전송하고 200ms 대기 후 연결을 닫습니다.

전체 예시

import { BaseModelClass, stream } from "sonamu";
import { z } from "zod";

const LogEventsSchema = z.object({
  log: z.object({
    level: z.enum(["info", "warn", "error"]),
    message: z.string(),
    timestamp: z.string()
  }),
  progress: z.object({
    current: z.number(),
    total: z.number()
  }),
  complete: z.object({
    success: z.boolean(),
    duration: z.number()
  })
});

class TaskModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: LogEventsSchema,
    path: "/tasks/logs",
    guards: ["admin"],
    description: "작업 실행 로그를 실시간으로 스트리밍합니다."
  })
  async streamLogs(taskId: number) {
    const sse = Sonamu.getContext().createSSE(LogEventsSchema);
    const startTime = Date.now();

    // 작업 시작
    sse.publish("log", {
      level: "info",
      message: "Task started",
      timestamp: new Date().toISOString()
    });

    // 작업 실행
    const total = 100;
    for (let i = 0; i <= total; i++) {
      await this.processItem(i);
      
      // 진행률 전송
      sse.publish("progress", {
        current: i,
        total
      });

      // 로그 전송
      if (i % 10 === 0) {
        sse.publish("log", {
          level: "info",
          message: `Processed ${i}/${total} items`,
          timestamp: new Date().toISOString()
        });
      }
    }

    // 작업 완료
    sse.publish("complete", {
      success: true,
      duration: Date.now() - startTime
    });

    // 연결 종료
    await sse.end();

    return sse;
  }

  private async processItem(index: number) {
    // 작업 처리
    await new Promise(resolve => setTimeout(resolve, 50));
  }
}

다른 데코레이터와 함께 사용

@transactional

@stream({ type: "sse", events: EventsSchema })
@transactional()
async process() {
  const sse = Sonamu.getContext().createSSE(EventsSchema);
  const wdb = this.getDB("w");
  
  // 트랜잭션 내에서 실행
  // SSE 전송과 DB 작업 함께 수행
  
  await sse.end();
  return sse;
}
@stream을 먼저, @transactional을 나중에 작성하세요.

제약사항

1. @api와 중복 사용 불가

// ❌ 에러 발생
@api()
@stream({ type: "sse", events: EventsSchema })
async subscribe() {}
에러 메시지:
@stream decorator can only be used once on NotificationModel.subscribe. 
You can use only one of @api or @stream decorator on the same method.

2. events는 필수

// ❌ 에러 발생
@stream({ type: "sse" })
async subscribe() {}
// events 옵션이 필요합니다

3. httpMethod는 항상 GET

@stream은 자동으로 httpMethod: "GET"을 설정합니다.
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
  // GET /notification/subscribe로 노출됨
}

클라이언트 사용 (Web)

Sonamu는 자동으로 SSE 클라이언트 코드를 생성합니다.
// 자동 생성된 코드
import { NotificationService } from "@/services/NotificationService";

// SSE 연결
const eventSource = NotificationService.subscribe(userId);

// 이벤트 리스너 등록
eventSource.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);
  console.log("New message:", data);
});

eventSource.addEventListener("status", (event) => {
  const data = JSON.parse(event.data);
  console.log("Status:", data);
});

// 연결 종료 이벤트
eventSource.addEventListener("end", () => {
  console.log("Stream ended");
  eventSource.close();
});

// 수동 연결 종료
eventSource.close();

React 예시

import { useEffect, useState } from "react";

function NotificationList({ userId }: { userId: number }) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [status, setStatus] = useState<string>("disconnected");

  useEffect(() => {
    const eventSource = NotificationService.subscribe(userId);

    eventSource.addEventListener("message", (event) => {
      const message = JSON.parse(event.data);
      setMessages(prev => [...prev, message]);
    });

    eventSource.addEventListener("status", (event) => {
      const { type } = JSON.parse(event.data);
      setStatus(type);
    });

    eventSource.addEventListener("end", () => {
      console.log("Stream ended by server");
      setStatus("disconnected");
    });

    return () => {
      eventSource.close();
    };
  }, [userId]);

  return (
    <div>
      <div>Status: {status}</div>
      <ul>
        {messages.map(msg => (
          <li key={msg.id}>{msg.text}</li>
        ))}
      </ul>
    </div>
  );
}

로깅

@stream 데코레이터는 자동으로 로그를 남깁니다:
@stream({ type: "sse", events: EventsSchema })
async subscribe() {
  // 자동 로그:
  // [DEBUG] stream: NotificationModel.subscribe
}

예시 모음

const ChatEventsSchema = z.object({
  message: z.object({
    id: z.string(),
    userId: z.number(),
    text: z.string(),
    timestamp: z.date()
  }),
  typing: z.object({
    userId: z.number(),
    isTyping: z.boolean()
  }),
  joined: z.object({
    userId: z.number(),
    username: z.string()
  }),
  left: z.object({
    userId: z.number()
  })
});

class ChatModelClass extends BaseModelClass {
  @stream({
    type: "sse",
    events: ChatEventsSchema,
    guards: ["user"]
  })
  async subscribe(roomId: number) {
    const sse = Sonamu.getContext().createSSE(ChatEventsSchema);
    const { user } = Sonamu.getContext();

    // 입장 알림
    sse.publish("joined", {
      userId: user.id,
      username: user.name
    });

    // 메시지 수신 리스너 등록
    const unsubscribe = this.onMessage(roomId, (message) => {
      sse.publish("message", message);
    });

    // 1시간 후 자동 종료
    setTimeout(async () => {
      unsubscribe();
      sse.publish("left", { userId: user.id });
      await sse.end();
    }, 3600000);

    return sse;
  }
}

다음 단계