메인 콘텐츠로 건너뛰기
Sonamu는 @stream 데코레이터를 통해 SSE 엔드포인트를 쉽게 만들 수 있습니다. Generator 함수를 사용하여 이벤트를 순차적으로 전송합니다.

@stream 데코레이터

SSE 엔드포인트를 정의하는 데코레이터입니다.

기본 사용법

import { BaseModel, stream, api, z } from "sonamu";

class NotificationModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      notification: z.object({
        id: z.number(),
        message: z.string(),
      }),
    })
  })
  @api({ compress: false })  // 압축 비활성화 필수
  async *streamNotifications(userId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        notification: z.object({
          id: z.number(),
          message: z.string(),
        }),
      })
    );
    
    // 이벤트 전송
    sse.publish('notification', {
      id: 1,
      message: 'Hello, World!'
    });
    
    // 연결 종료
    await sse.end();
  }
}

@stream 옵션

스트림 타입
@stream({
  type: 'sse',  // 현재는 SSE만 지원
})
지원 타입:
  • 'sse': Server-Sent Events
  • (향후 WebSocket 지원 예정)

SSEConnection API

ctx.createSSE()로 생성된 SSE 연결 객체입니다.

publish(event, data)

이벤트를 클라이언트로 전송합니다.
sse.publish('notification', {
  id: 1,
  message: 'New notification'
});
타입 안전성:
// ✅ 올바른 이벤트
sse.publish('notification', { id: 1, message: 'Hi' });

// ❌ 잘못된 이벤트 (컴파일 에러)
sse.publish('unknown', { ... });

// ❌ 잘못된 데이터 (컴파일 에러)
sse.publish('notification', { id: 'wrong' });

end()

연결을 종료합니다.
await sse.end();
자동 전송: event: end, data: END

실전 예제

1. 실시간 알림

class NotificationModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      notification: z.object({
        id: z.number(),
        type: z.enum(['like', 'comment', 'follow']),
        message: z.string(),
        createdAt: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamNotifications(userId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        notification: z.object({
          id: z.number(),
          type: z.enum(['like', 'comment', 'follow']),
          message: z.string(),
          createdAt: z.string(),
        }),
      })
    );
    
    try {
      // 실시간 알림 감지 (예: 폴링 또는 이벤트 리스너)
      while (true) {
        // DB에서 새 알림 조회
        const notifications = await this.findMany({
          wq: [
            ['user_id', userId],
            ['created_at', '>', new Date(Date.now() - 5000)],  // 최근 5초
          ],
          num: 10,
        });
        
        // 알림 전송
        for (const notification of notifications.data) {
          sse.publish('notification', {
            id: notification.id,
            type: notification.type,
            message: notification.message,
            createdAt: notification.created_at.toISOString(),
          });
        }
        
        // 5초 대기
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    } finally {
      await sse.end();
    }
  }
}

2. 작업 진행률

class TaskModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      progress: z.object({
        current: z.number(),
        total: z.number(),
        percentage: z.number(),
      }),
      complete: z.object({
        result: z.string(),
      }),
      error: z.object({
        message: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamTaskProgress(taskId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        progress: z.object({
          current: z.number(),
          total: z.number(),
          percentage: z.number(),
        }),
        complete: z.object({
          result: z.string(),
        }),
        error: z.object({
          message: z.string(),
        }),
      })
    );
    
    try {
      const total = 100;
      
      for (let current = 0; current <= total; current += 10) {
        // 진행률 전송
        sse.publish('progress', {
          current,
          total,
          percentage: Math.round((current / total) * 100),
        });
        
        // 작업 수행 (예: 파일 처리)
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
      
      // 완료 이벤트
      sse.publish('complete', {
        result: 'Task completed successfully'
      });
    } catch (error) {
      // 에러 이벤트
      sse.publish('error', {
        message: error.message
      });
    } finally {
      await sse.end();
    }
  }
}

3. 라이브 피드

class PostModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      newPost: z.object({
        id: z.number(),
        title: z.string(),
        author: z.string(),
        createdAt: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamNewPosts(ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        newPost: z.object({
          id: z.number(),
          title: z.string(),
          author: z.string(),
          createdAt: z.string(),
        }),
      })
    );
    
    try {
      let lastCheckTime = new Date();
      
      while (true) {
        // 새 게시물 조회
        const posts = await this.findMany({
          wq: [['created_at', '>', lastCheckTime]],
          num: 20,
          order: [['created_at', 'DESC']],
        });
        
        // 새 게시물 전송
        for (const post of posts.data) {
          sse.publish('newPost', {
            id: post.id,
            title: post.title,
            author: post.author_name,
            createdAt: post.created_at.toISOString(),
          });
        }
        
        // 마지막 체크 시간 업데이트
        if (posts.data.length > 0) {
          lastCheckTime = posts.data[0].created_at;
        }
        
        // 10초 대기
        await new Promise(resolve => setTimeout(resolve, 10000));
      }
    } finally {
      await sse.end();
    }
  }
}

4. 다중 이벤트

class ChatModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      message: z.object({
        id: z.number(),
        text: z.string(),
        sender: z.string(),
      }),
      typing: z.object({
        userId: z.number(),
        isTyping: z.boolean(),
      }),
      joined: z.object({
        userId: z.number(),
        username: z.string(),
      }),
      left: z.object({
        userId: z.number(),
      }),
    })
  })
  @api({ compress: false })
  async *streamChatRoom(roomId: number, ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        message: z.object({
          id: z.number(),
          text: z.string(),
          sender: z.string(),
        }),
        typing: z.object({
          userId: z.number(),
          isTyping: z.boolean(),
        }),
        joined: z.object({
          userId: z.number(),
          username: z.string(),
        }),
        left: z.object({
          userId: z.number(),
        }),
      })
    );
    
    try {
      // 입장 이벤트
      sse.publish('joined', {
        userId: ctx.user.id,
        username: ctx.user.name,
      });
      
      // 메시지 스트리밍
      while (true) {
        const messages = await this.getNewMessages(roomId);
        
        for (const msg of messages) {
          sse.publish('message', {
            id: msg.id,
            text: msg.text,
            sender: msg.sender_name,
          });
        }
        
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    } finally {
      // 퇴장 이벤트
      sse.publish('left', {
        userId: ctx.user.id,
      });
      
      await sse.end();
    }
  }
}

5. 조건부 전송

class MonitorModelClass extends BaseModel {
  @stream({
    type: 'sse',
    events: z.object({
      status: z.object({
        cpu: z.number(),
        memory: z.number(),
        disk: z.number(),
      }),
      alert: z.object({
        level: z.enum(['warning', 'critical']),
        message: z.string(),
      }),
    })
  })
  @api({ compress: false })
  async *streamServerStatus(ctx: Context) {
    const sse = ctx.createSSE(
      z.object({
        status: z.object({
          cpu: z.number(),
          memory: z.number(),
          disk: z.number(),
        }),
        alert: z.object({
          level: z.enum(['warning', 'critical']),
          message: z.string(),
        }),
      })
    );
    
    try {
      while (true) {
        const status = await this.getServerStatus();
        
        // 상태 전송
        sse.publish('status', {
          cpu: status.cpu,
          memory: status.memory,
          disk: status.disk,
        });
        
        // 임계값 초과 시 알림
        if (status.cpu > 90) {
          sse.publish('alert', {
            level: 'critical',
            message: `CPU usage is ${status.cpu}%`,
          });
        } else if (status.cpu > 70) {
          sse.publish('alert', {
            level: 'warning',
            message: `CPU usage is ${status.cpu}%`,
          });
        }
        
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    } finally {
      await sse.end();
    }
  }
}

연결 관리

클라이언트 연결 종료 감지

@stream({
  type: 'sse',
  events: z.object({
    message: z.string(),
  })
})
@api({ compress: false })
async *streamData(ctx: Context) {
  const sse = ctx.createSSE(
    z.object({
      message: z.string(),
    })
  );
  
  // 클라이언트 연결 종료 감지
  ctx.request.socket.on('close', () => {
    console.log('Client disconnected');
  });
  
  try {
    while (true) {
      sse.publish('message', 'Hello');
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
  } finally {
    await sse.end();
  }
}

Keep-Alive

SSE는 자동으로 Keep-Alive를 전송하지만, 커스텀 구현도 가능합니다:
@stream({
  type: 'sse',
  events: z.object({
    message: z.string(),
    keepalive: z.object({}),
  })
})
@api({ compress: false })
async *streamWithKeepAlive(ctx: Context) {
  const sse = ctx.createSSE(
    z.object({
      message: z.string(),
      keepalive: z.object({}),
    })
  );
  
  // Keep-alive 전송 (30초마다)
  const keepAliveInterval = setInterval(() => {
    sse.publish('keepalive', {});
  }, 30000);
  
  try {
    while (true) {
      sse.publish('message', 'Data');
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  } finally {
    clearInterval(keepAliveInterval);
    await sse.end();
  }
}

에러 처리

@stream({
  type: 'sse',
  events: z.object({
    data: z.object({
      value: z.number(),
    }),
    error: z.object({
      message: z.string(),
    }),
  })
})
@api({ compress: false })
async *streamWithErrorHandling(ctx: Context) {
  const sse = ctx.createSSE(
    z.object({
      data: z.object({
        value: z.number(),
      }),
      error: z.object({
        message: z.string(),
      }),
    })
  );
  
  try {
    const value = await this.getSomeData();
    sse.publish('data', { value });
  } catch (error) {
    // 에러를 이벤트로 전송
    sse.publish('error', {
      message: error.message
    });
  } finally {
    await sse.end();
  }
}

주의사항

SSE 엔드포인트 작성 시 주의사항:
  1. 압축 비활성화 필수: SSE는 스트리밍이므로 압축 금지
    @api({ compress: false })
    
  2. Generator 함수 사용: async * 키워드 필요
    // ✅ Generator 함수
    async *streamData() { ... }
    
    // ❌ 일반 함수
    async streamData() { ... }
    
  3. 반드시 end() 호출: 연결 종료 필수
    try {
      // 이벤트 전송
    } finally {
      await sse.end();  // 필수!
    }
    
  4. 무한 루프 주의: 적절한 대기 시간 필요
    while (true) {
      sse.publish('data', ...);
      await new Promise(resolve => setTimeout(resolve, 1000));  // 1초 대기
    }
    
  5. 이벤트 스키마 일치: events와 publish 이벤트 이름 일치 필수
    // ✅ 일치
    events: z.object({
      message: z.string(),
    })
    sse.publish('message', 'Hi');
    
    // ❌ 불일치 (컴파일 에러)
    sse.publish('msg', 'Hi');
    
  6. 메모리 누수 방지: 타이머/리스너 정리
    const interval = setInterval(() => { ... }, 1000);
    try {
      // ...
    } finally {
      clearInterval(interval);  // 정리
      await sse.end();
    }
    

다음 단계