0. 시작하며
프로젝트에서 SSE(Server Sent Event) 방식의 알림을 구현을 담당했습니다. 이때발생한 에러와 트러블 슈팅을 기록하려합니다.
1. 알림 구현 방식 후보군
1-1 Short Polling
- 클라이언트가 서버에 요청을 보내어 새로운 정보가 있는지 확인하는 방식입니다.
- 서버는 즉시 응답을 보내며 사용가능한 데이터가 있다면 보내줍니다.
- 일정 시간 혹은 동작후, 클라이언트는 다시 서버에 요청을 보내게됩니다.

이 방법은 쉽고 빠르게 구현될 수 있지만 필요치 않은 클라이언트의 요청이 많이 생길 수 있습니다. 이는 트래픽과 서버의 부담이될 수 있는 요인입니다.
1-2 Long Polling
- Short Polling에서 조금 더 개선된 방식입니다.
- 마찬가지로 클라이언트가 서버에 요청을 보내어 사용가능 정보를 확인합니다.
- 서버는 사용가능한 데이터가 있을때까지 혹은 일정 시간(Time out)동안 요청을 보류하고 응답을 보냅니다.
- 이후 일정 시간 혹은 동작후, 클라이언트는 다시 서버에 요청을 보내게됩니다.

Short Polling 방식에비해 조금 개선이 되긴했습니다.. 하지만 여전히 요청과 응답이 빈번하게 발생될 수 있습니다. 또한 peak시간에 다수의 클라이언트와 연결을 유지하고 확인해야하므로 서버 부하가 급증할 수 있습니다.
1-3 Socket
- 서버와 클라이언트간 양방향 통신 방법입니다. 처음 핸드셰이크를 통해 연결이 이루어지면 지속적인 연결을 유지합니다.
- 핸드 셰이크 이후에는 HTTP헤더가 필요하지 않습니다 또한 TCP/IP 프로토콜을 사용하여 순서 보장, 중복 검사, 오류 검사 및 복구 등을 제공하여 신뢰성있는 데이터 전송을 보장합니다.

socket의 경우 양방향 통신을 위한 방식입니다. 하지만 저희 알림 서비스의 경우 특정 event가 발생하면 알림을 보내도록 구현하길 원했기때문에 socket을 사용해도 일방적인 data전송이 생길것입니다. 따라서 SSE방식을 통한 구현을 진행하기로 결정했습니다.
1-4 SSE(Server Sent Evnet)
- 서버가 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며 HTTP의 persistent connections을 기반으로하는 HTML5 표준 기술입니다.
- Socket처럼 최초 Hand shake를 통한 연결을 진행합니다. 이후 Event가 발생하면 서버측에서 data를 전송합니다.
- 연결이 끊어지는 경우 클라이언트는 자동으로 서버에 재연결을 시도합니다.
- 단방향 연결이므로 Socket에 비해 클라이언트단의 구현 부담이 줄어들고, 좀 더 가벼운 연결 방식입니다.

2. SSE알림 구현
각 레이어 별 구현 예제 코드를 보고 특징에대해 설명하겠습니다
AlarmController
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/alarms")
public class AlarmController {
private final AlarmService alarmService;
@GetMapping(value = "/subscribe/{memberId}", produces = "text/event-stream")
public ResponseEntity<CompletableFuture> subscribe(
@PathVariable("memberId") Long memberId,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
// X-Accel-Buffering 헤더 추가
HttpHeaders headers = new HttpHeaders();
headers.set("X-Accel-Buffering", "no");
return ResponseEntity.ok()
.headers(headers)
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(alarmService.subscribe(memberId, lastEventId));
}
//...
}
- Handshake를 위한 구독 요청 controller입니다.
- Spring의 SseEmitter는 "text/event-stream"을 사용해야합니다.
- CompletableFuture를 통해 비동기 반환타입을 지정했습니다.
- X-Accel-Buffering: no 설정을 추가해서 해당 응답에 대하여 Nginx의 proxy buffering기능을 제한하였습니다.
🔽비동기 처리 이유
SSE 알림 구현 완료 후 10번의 구독 요청을 받으면 HikariCP Exception이 발생하여 더 이상 DB작업을 할 수 없는 상태가 되었습니다.
그 이유는 JPA와 관련이 있었는데요. SSE Connection을 유지하는 동안 JPA의 영속성 컨텍스트가 열려있고, 그로인해 DB Connection또한 유지되게됩니다.
따라서 HikariCP의 기본값인 10번의 연결을 넘어가는순간 에러가 발생했습니다. 열심히 리서치를 진행했고 다음과 같은 해결책이 있었습니다.
해결 방법 1. open session in view ( OSIV , JPA 설정)을 false로 설정합니다.
위 방법을 사용하면 JPA 영속성 컨텍스트가 service 종료 시점에 강제로 종료되며db connection 자원을 반납할 수 있습니다. 하지만 service이후 영역(controller)에서 Lazy Loading을 사용할 수 없었고 이를 많이 사용하고 있는 저희 서비스는 후행되어야할 코드수정이 많아지는 악영향이 생깁니다. 또한 지연 로딩의 전체적인 성능상 이점을 포기해야하므로 단점이 더 많다고 생각되었습니다.
해결 방법 2. db connection을 늘린다.
기본 10개인 db connection 자원을 임의로 증가시켜 해결할 수 있습니다.
하지만 HijariCP github wiki에 따르면 사용자 10000명에 100개의 커넥션도 끔찍히 많은 정도이며 디스크와 네트워크의 변수가 없다면 CPU의 코어수에 1:1로 설정하는것이 최적이됩니다.
하지만 보통의 상황에서 디스크와 네트워크는 변수로 작용하며 Connection Pool의 사이즈는 CPU 수 보다 많게 유지하는것이 좋습니다. Postgre SQL에 적합하지만 대체로 사용가능한 공식도 추천해주고 있습니다. 다음과 같은 공식을 사용해 구하고 환경에 맞게 계속해서 테스트하여 최적의 수를 구하는것이 중요하다고 합니다.
connections = ((core_count * 2) + effective_spindle_count)
또한 connection pool에 많은 connection을 생성하면, 그만큼 메모리를 추가로 사용하게 되므로 성능 저하를 유발 할 수 있고 thread Pool을 늘려도 context switiching으로 인한 한계가 존재합니다.
따라서 위 두가지 방법 모두 해당 프로젝트와는 맞지 않는 방법이었다고 생각이 들었습니다.
이에 색다른 방법을 모색했습니다.그 방법은 비동기 처리였고 다음과 같은 이유가 있었습니다.
1. OSIV설정이 true인 경우 api response 및 화면 렌더링 시점까지 JPA 영속성 컨텍스트가 유지된다고 합니다. 그렇다면 위 원인을 보았을때 비동기 처리를통해 해당 요청에서 SSE Connection과 관계없이 response를 보내고, 화면에 랜더링되는 시점이된다면 DB Connection 자원을 반납할 수 있을것이라고 생각했습니다.
2. 또한 구독 요청은 동기성 중요하진 않고, 클라이언트에 전송하는 Blocking I/O작업을 기다리지않고 return하기때문에 응답시간에서 이점이 생길것이라고 생각했습니다.
AlarmService
@Service
@Transactional
@RequiredArgsConstructor
public class AlarmServiceImpl implements AlarmService {
private final AlarmRepository alarmRepository;
private final EmitterRepository emitterRepository;
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
@Override
@Async
public CompletableFuture subscribe(Long memberId, String lastEventId) {
// 고유 식별자 부여
String id = String.valueOf(memberId);
SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
//예외 상황에 emitter 삭제
emitter.onCompletion(() -> emitterRepository.deleteAllStartWithId(id));
emitter.onTimeout(() -> emitterRepository.deleteAllStartWithId(id));
emitter.onError((e) -> emitterRepository.deleteAllStartWithId(id));
// 503 에러를 방지하기 위한 더미 이벤트 전송
sendToClient(emitter, id, "EventStream Created. [userId=" + memberId + "]");
return CompletableFuture.completedFuture(emitter);
}
//....
private void send(Member member, String content, AlarmType alarmType) {
Alarm alarm = Alarm.create(member, content, alarmType);
alarmRepository.save(alarm);
String sendId = String.valueOf(member.getId());
// 로그인 한 유저의 SseEmitter 모두 가져오기
Map<String, SseEmitter> sseEmitterMap = emitterRepository.findAllStartById(sendId);
sseEmitterMap.forEach(
(key, emitter) -> {
// 데이터 캐시 저장(유실된 데이터 처리하기 위함)
emitterRepository.saveEventCache(key, alarm);
sendToClient(emitter, sendId, AlarmResponse.from(alarm));
}
);
}
private void sendToClient(SseEmitter emitter, String id, Object data) {
try {
emitter.send(SseEmitter.event()
.id(id)
.name("sseEvent")
.data(data));
} catch (IOException exception) {
emitterRepository.deleteAllStartWithId(id);
log.info("알림 전송 실패");
exception.printStackTrace();
}
}
}
- SseEmitter 생성자를 통해 TimeOut시간을 설정할 수 있습니다.
- 구독 요청에대한 응답을 하지 않는 경우 503Error가 발생할 수 있으므로 dummy text를 보내줍니다.
- 저는 event가 발생하여 알림을 보내는 경우에대한 서비스로직을 따로 작성하였습니다. 따라서 evnet 발생시 alarmService를 호출하여 해당 메서드를 호출하고 이렇게 호출된 모든 메서드는 send메서드를 호출하게됩니다.
EmitterRepository
@Repository
@RequiredArgsConstructor
public class EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();
public SseEmitter save(String id, SseEmitter emitter) {
emitters.put(id, emitter);
return emitter;
}
// 이벤트를 캐시하는 메서드
public void saveEventCache(String id, Object event) {
eventCache.put(id, event);
}
// id로 시작하는 모든 SseEmitter 객체들을 가져오는 메서드
public Map<String, SseEmitter> findAllStartById(String startId) {
Map<String, SseEmitter> result = new ConcurrentHashMap<>();
for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
String id = entry.getKey();
SseEmitter emitter = entry.getValue();
if (id.startsWith(startId)) {
result.put(id, emitter);
}
}
return result;
}
public void deleteAllStartWithId(String id) {
emitters.forEach((key, emitter) -> {
if (key.startsWith(id)) emitters.remove(key);
});
}
//Id로 시작하는 이벤트를 가져오는 메서드
public Map<String, Object> findAllEventCacheStartWithId(String startId) {
Map<String, Object> result = new ConcurrentHashMap<>();
for (Map.Entry<String, Object> entry : eventCache.entrySet()) {
String id = entry.getKey();
Object event = entry.getValue();
if (id.startsWith(startId)) {
result.put(id, event);
}
}
return result;
}
}
- 비동기 처리를 진행하므로 Thread-saft한 ConcurrentHashMap을 사용해 emitter를 저장, 삭제 등을 수행합니다.
- 이와같은 캐싱을 통해 재연결 시도중 누락되는 알림을 저장하고 해당 알림부터 전송하려고 했으나.. 프로젝트 리소스상 미완성으로 남겨져있어 리팩토링 예정 코드입니다.
AsyncConfig
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 원하는 스레드 풀 크기 설정
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Async-");
executor.initialize();
return executor;
}
}
- Spring boot에서 Thread Pool을 위와같이 간단하게 구현할 수 있습니다.
- Thread Pool없이 비동기처리를 하게된다면... thread가 계속해서 생성하고 종료되는 작업이 수행되며 해당작업은 굉장히 무거운 작업이고 메모리와 CPU 사용을 많이 하게됩니다. 이는 곧 성능 저하로 이어질 수 있습니다.
- 따라서 Thread를 효율적으로 관리할 수 있는 Thread Pool사용은 권장됩니다.
- 위와같이 설정한다면 다음과 같이 동작합니다.
- 최초 10개의 스레드에서 작업을 수행합니다. (CorePoolSize)
- 해당 작업이 밀릴경우 100 size의 Queue를 생성하여 저장합니다. (MaxPoolSize)
- 그래도 밀릴경우 최대 50개의 스레드까지 만들어 작업을 처리하게됩니다. (QueueCapacity)
Client
import './App.css';
import React, { useState, useEffect } from 'react';
interface NotificationData {
data:string;
}
function App() {
const [notificationData, setNotificationData] = useState<NotificationData | null>(null);
useEffect(() => {
const eventSource = new EventSource('your_endpoint');
eventSource.addEventListener('sseEvent', (event) => {
const data: NotificationData = event;
console.log(data);
setNotificationData(data);
});
eventSource.onerror = (error) => {
console.error('SSE 연결 오류:', error);
};
return () => {
eventSource.close();
};
}, []);
return (
<div className="App">
<h1>Notification Data:</h1>
{notificationData ? (
<div>
<p>Data: {notificationData.data}</p>
</div>
) : (
<p>No data received yet.</p>
)}
</div>
);
}
export default App;
- React를 사용하여 간단한 예제 코드를 생성해봤습니다.
- EventSource 생성자를 사용하여 구독요청 url을 인자로 넣어줍니다.
- eventSource.addEventListener()에 SseEmitter name을 동일하게 해주시면 연결되고 알림을 받을 수 있습니다.
이제 SSE알림을 정상적으로 받을 수 있습니다. 여기까지 오시느라 수고하셨습니다!
3. 결론
비동기 처리를통해 SSE Connection과 무관하게 DB Connection자원을 반납할 수 있었습니다. 또한 message broker (redis, kafka 등..)를 이용한 방식을 사용하는 방식도 적용해보려합니다. 비동기처리에 대한 포스팅은 따로 추후 정리해오겠습니다.
<참고 레퍼런스>
https://jsonobject.tistory.com/558
Spring Boot, SSE(Server-Sent Events)로 단방향 스트리밍 통신 구현하기
개요 Server-Sent Events(이하 SSE)는 HTTP 스트리밍을 통해 서버에서 클라이언트로 단방향의 Push Notification을 전송할 수 있는 HTML5 표준 기술이다. 이번 글에서는 Spring Boot에서 SSE를 이용한 단방향 스트
jsonobject.tistory.com
[Spring + SSE] Server-Sent Events를 이용한 실시간 알림
코드리뷰 매칭 플랫폼 개발 중 알림 기능이 필요했다. 리뷰어 입장에서는 새로운 리뷰 요청이 생겼을 때 모든 리뷰가 끝나고 리뷰이의 피드백이 도착했을 때 리뷰이 입장에서는 리뷰 요청이 거
velog.io
https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
Spring에서 Server-Sent-Events 구현하기
…
tecoble.techcourse.co.kr
https://dodeon.gitbook.io/study/kimyounghan-spring-boot-and-jpa-optimization/04-osiv
OSIV와 성능 최적화 - dodeon
보통 서비스 계층에서 트랜잭션을 유지하기 때문에, 두 서비스 모두 트랜잭션을 유지하면서 지연 로딩을 사용할 수 있다.
dodeon.gitbook.io
https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
About Pool Sizing
光 HikariCP・A solid, high-performance, JDBC connection pool at last. - brettwooldridge/HikariCP
github.com
'Java > SpringBoot' 카테고리의 다른 글
[SpringBoot] @Transactional을 알아보자 (0) | 2024.02.01 |
---|---|
ProcessBuilder를 통해 Python파일 실행하기 (0) | 2023.09.28 |
스프링 부트 환경변수 설정하기 (0) | 2023.09.19 |