메인 콘텐츠로 건너뛰기
Sonamu는 @stream 데코레이터를 통해 SSE 엔드포인트를 쉽게 만들 수 있습니다. ctx.createSSE()로 생성한 SSE 연결 객체를 사용하여 이벤트를 전송합니다.

@stream 데코레이터

SSE 엔드포인트를 정의하는 데코레이터입니다.

기본 사용법

import { BaseModelClass, stream, z } from "sonamu";

class NotificationModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      notification: z.object({
        id: z.number(),
        message: z.string(),
      }),
    })
  })
  async streamNotifications(userId: number, ctx: Context): Promise<void> {
    const sse = ctx.createSSE(
      z.object({
        notification: z.object({
          id: z.number(),
          message: z.string(),
        }),
      })
    );

    // 이벤트 전송
    sse.publish('notification', {
      id: 1,
      message: 'Hello, World!'
    });

    // 연결 종료
    await sse.end();
  }
}

@stream 옵션

스트림 타입
@stream({
  type: 'sse',  // 현재는 SSE만 지원
})
지원 타입:
  • 'sse': Server-Sent Events
  • (향후 WebSocket 지원 예정)

SSEConnection API

ctx.createSSE()로 생성된 SSE 연결 객체입니다.

closed

연결이 종료되었는지 여부를 나타내는 읽기 전용 속성입니다. 클라이언트가 연결을 끊거나 end()를 호출한 후 true가 됩니다.
if (sse.closed) {
  return;
}
sse.publish('status', { message: 'still alive' });

onClose(callback)

연결이 종료될 때 호출될 콜백을 등록합니다. 클라이언트 측 연결 종료와 end() 호출 모두에서 실행됩니다. 타이머나 외부 리소스 정리에 활용할 수 있습니다.
const interval = setInterval(() => {
  sse.publish('heartbeat', { ts: Date.now() });
}, 5000);

sse.onClose(() => {
  clearInterval(interval);
});

publish(event, data)

이벤트를 클라이언트로 전송합니다.
sse.publish('notification', {
  id: 1,
  message: 'New notification'
});
타입 안전성:
// ✅ 올바른 이벤트
sse.publish('notification', { id: 1, message: 'Hi' });

// ❌ 잘못된 이벤트 (컴파일 에러)
sse.publish('unknown', { ... });

// ❌ 잘못된 데이터 (컴파일 에러)
sse.publish('notification', { id: 'wrong' });

end()

연결을 종료합니다.
await sse.end();
자동 전송: event: end, data: END

실전 예제

1. 실시간 알림

class NotificationModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      notification: z.object({
        id: z.number(),
        type: z.enum(['like', 'comment', 'follow']),
        message: z.string(),
        createdAt: z.string(),
      }),
    })
  })
  async streamNotifications(userId: number, ctx: Context): Promise<void> {
    const sse = ctx.createSSE(
      z.object({
        notification: z.object({
          id: z.number(),
          type: z.enum(['like', 'comment', 'follow']),
          message: z.string(),
          createdAt: z.string(),
        }),
      })
    );

    try {
      // 실시간 알림 감지 (예: 폴링 또는 이벤트 리스너)
      while (true) {
        // DB에서 새 알림 조회
        const notifications = await this.findMany({
          wq: [
            ['user_id', userId],
            ['created_at', '>', new Date(Date.now() - 5000)],  // 최근 5초
          ],
          num: 10,
        });

        // 알림 전송
        for (const notification of notifications.data) {
          sse.publish('notification', {
            id: notification.id,
            type: notification.type,
            message: notification.message,
            createdAt: notification.created_at.toISOString(),
          });
        }

        // 5초 대기
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    } finally {
      await sse.end();
    }
  }
}

2. 작업 진행률

class TaskModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      progress: z.object({
        current: z.number(),
        total: z.number(),
        percentage: z.number(),
      }),
      complete: z.object({
        result: z.string(),
      }),
      error: z.object({
        message: z.string(),
      }),
    })
  })
  async streamTaskProgress(taskId: number, ctx: Context): Promise<void> {
    const sse = ctx.createSSE(
      z.object({
        progress: z.object({
          current: z.number(),
          total: z.number(),
          percentage: z.number(),
        }),
        complete: z.object({
          result: z.string(),
        }),
        error: z.object({
          message: z.string(),
        }),
      })
    );

    try {
      const total = 100;

      for (let current = 0; current <= total; current += 10) {
        // 진행률 전송
        sse.publish('progress', {
          current,
          total,
          percentage: Math.round((current / total) * 100),
        });

        // 작업 수행 (예: 파일 처리)
        await new Promise(resolve => setTimeout(resolve, 1000));
      }

      // 완료 이벤트
      sse.publish('complete', {
        result: 'Task completed successfully'
      });
    } catch (error) {
      // 에러 이벤트
      sse.publish('error', {
        message: error.message
      });
    } finally {
      await sse.end();
    }
  }
}

3. 라이브 피드

class PostModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      newPost: z.object({
        id: z.number(),
        title: z.string(),
        author: z.string(),
        createdAt: z.string(),
      }),
    })
  })
  async streamNewPosts(ctx: Context): Promise<void> {
    const sse = ctx.createSSE(
      z.object({
        newPost: z.object({
          id: z.number(),
          title: z.string(),
          author: z.string(),
          createdAt: z.string(),
        }),
      })
    );

    try {
      let lastCheckTime = new Date();

      while (true) {
        // 새 게시물 조회
        const posts = await this.findMany({
          wq: [['created_at', '>', lastCheckTime]],
          num: 20,
          order: [['created_at', 'DESC']],
        });

        // 새 게시물 전송
        for (const post of posts.data) {
          sse.publish('newPost', {
            id: post.id,
            title: post.title,
            author: post.author_name,
            createdAt: post.created_at.toISOString(),
          });
        }

        // 마지막 체크 시간 업데이트
        if (posts.data.length > 0) {
          lastCheckTime = posts.data[0].created_at;
        }

        // 10초 대기
        await new Promise(resolve => setTimeout(resolve, 10000));
      }
    } finally {
      await sse.end();
    }
  }
}

4. 다중 이벤트

class ChatModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      message: z.object({
        id: z.number(),
        text: z.string(),
        sender: z.string(),
      }),
      typing: z.object({
        userId: z.number(),
        isTyping: z.boolean(),
      }),
      joined: z.object({
        userId: z.number(),
        username: z.string(),
      }),
      left: z.object({
        userId: z.number(),
      }),
    })
  })
  async streamChatRoom(roomId: number, ctx: Context): Promise<void> {
    const sse = ctx.createSSE(
      z.object({
        message: z.object({
          id: z.number(),
          text: z.string(),
          sender: z.string(),
        }),
        typing: z.object({
          userId: z.number(),
          isTyping: z.boolean(),
        }),
        joined: z.object({
          userId: z.number(),
          username: z.string(),
        }),
        left: z.object({
          userId: z.number(),
        }),
      })
    );

    try {
      // 입장 이벤트
      sse.publish('joined', {
        userId: ctx.user.id,
        username: ctx.user.name,
      });

      // 메시지 스트리밍
      while (true) {
        const messages = await this.getNewMessages(roomId);

        for (const msg of messages) {
          sse.publish('message', {
            id: msg.id,
            text: msg.text,
            sender: msg.sender_name,
          });
        }

        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    } finally {
      // 퇴장 이벤트
      sse.publish('left', {
        userId: ctx.user.id,
      });

      await sse.end();
    }
  }
}

5. 조건부 전송

class MonitorModelClass extends BaseModelClass {
  @stream({
    type: 'sse',
    events: z.object({
      status: z.object({
        cpu: z.number(),
        memory: z.number(),
        disk: z.number(),
      }),
      alert: z.object({
        level: z.enum(['warning', 'critical']),
        message: z.string(),
      }),
    })
  })
  async streamServerStatus(ctx: Context): Promise<void> {
    const sse = ctx.createSSE(
      z.object({
        status: z.object({
          cpu: z.number(),
          memory: z.number(),
          disk: z.number(),
        }),
        alert: z.object({
          level: z.enum(['warning', 'critical']),
          message: z.string(),
        }),
      })
    );

    try {
      while (true) {
        const status = await this.getServerStatus();

        // 상태 전송
        sse.publish('status', {
          cpu: status.cpu,
          memory: status.memory,
          disk: status.disk,
        });

        // 임계값 초과 시 알림
        if (status.cpu > 90) {
          sse.publish('alert', {
            level: 'critical',
            message: `CPU usage is ${status.cpu}%`,
          });
        } else if (status.cpu > 70) {
          sse.publish('alert', {
            level: 'warning',
            message: `CPU usage is ${status.cpu}%`,
          });
        }

        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    } finally {
      await sse.end();
    }
  }
}

연결 관리

클라이언트 연결 종료 감지

@stream({
  type: 'sse',
  events: z.object({
    message: z.string(),
  })
})
async streamData(ctx: Context): Promise<void> {
  const sse = ctx.createSSE(
    z.object({
      message: z.string(),
    })
  );

  // 클라이언트 연결 종료 감지
  ctx.request.socket.on('close', () => {
    console.log('Client disconnected');
  });

  try {
    while (true) {
      sse.publish('message', 'Hello');
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  } finally {
    await sse.end();
  }
}

Keep-Alive

SSE는 자동으로 Keep-Alive를 전송하지만, 커스텀 구현도 가능합니다:
@stream({
  type: 'sse',
  events: z.object({
    message: z.string(),
    keepalive: z.object({}),
  })
})
async streamWithKeepAlive(ctx: Context): Promise<void> {
  const sse = ctx.createSSE(
    z.object({
      message: z.string(),
      keepalive: z.object({}),
    })
  );

  // Keep-alive 전송 (30초마다)
  const keepAliveInterval = setInterval(() => {
    sse.publish('keepalive', {});
  }, 30000);

  try {
    while (true) {
      sse.publish('message', 'Data');
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  } finally {
    clearInterval(keepAliveInterval);
    await sse.end();
  }
}

에러 처리

@stream({
  type: 'sse',
  events: z.object({
    data: z.object({
      value: z.number(),
    }),
    error: z.object({
      message: z.string(),
    }),
  })
})
async streamWithErrorHandling(ctx: Context): Promise<void> {
  const sse = ctx.createSSE(
    z.object({
      data: z.object({
        value: z.number(),
      }),
      error: z.object({
        message: z.string(),
      }),
    })
  );

  try {
    const value = await this.getSomeData();
    sse.publish('data', { value });
  } catch (error) {
    // 에러를 이벤트로 전송
    sse.publish('error', {
      message: error.message
    });
  } finally {
    await sse.end();
  }
}

주의사항

SSE 엔드포인트 작성 시 주의사항:
  1. @stream@api는 동시에 사용할 수 없습니다: SSE 엔드포인트에는 @stream 데코레이터만 사용합니다
    // ✅ @stream만 사용
    @stream({ type: 'sse', events: ... })
    async streamData(ctx: Context): Promise<void> { ... }
    
    // ❌ @api와 @stream 동시 사용 (런타임 에러)
    @stream({ type: 'sse', events: ... })
    @api({ compress: false })
    async streamData(ctx: Context): Promise<void> { ... }
    
  2. 반드시 end() 호출: 연결 종료 필수
    try {
      // 이벤트 전송
    } finally {
      await sse.end();  // 필수!
    }
    
  3. 무한 루프 주의: 적절한 대기 시간 필요
    while (true) {
      sse.publish('data', ...);
      await new Promise(resolve => setTimeout(resolve, 1000));  // 1초 대기
    }
    
  4. 이벤트 스키마 일치: events와 publish 이벤트 이름 일치 필수
    // ✅ 일치
    events: z.object({
      message: z.string(),
    })
    sse.publish('message', 'Hi');
    
    // ❌ 불일치 (컴파일 에러)
    sse.publish('msg', 'Hi');
    
  5. 메모리 누수 방지: 타이머/리스너 정리
    const interval = setInterval(() => { ... }, 1000);
    try {
      // ...
    } finally {
      clearInterval(interval);  // 정리
      await sse.end();
    }
    

다음 단계

SSE 설정

SSE 플러그인 설정하기

클라이언트 통합

프론트엔드에서 SSE 사용하기