Home SSE Spring에서 구현하기 A to Z
Post
Cancel

SSE Spring에서 구현하기 A to Z

SSE란?

SSE(Server-Sent Events)는 웹 애플리케이션에서 서버로부터 데이터를 비동기적으로 전송받을 수 있는 기술 중 하나이다. 클라이언트의 별도의 요청이 없이도 알림처럼 실시간으로 서버에서 데이터를 전달해야할때가 있다. 이럴때 단방향으로 통신을 지원하며 서버로 데이터를 보낼수없다는 단점이 있지만, 실시간 업데이트가 필요할때는 효율적으로 데이터를 전달할 수 있다.

물론, SSE 방식외에도 클라이언트가 주기적으로 서버로 요청을 보내서 데이터를 받는 Short Polling , 서버의 변경이 일어날때까지 대기하는 Long Polling 방식이 있지만, 해당 프로젝트는 실시간으로 반영되어야하고 빈번하게 발생 될 수있는 상황이기에 SSE를 선택하였다. SSE는 서버와의 한번 연결을 하고나면 HTTP 1.1의 keep alive와 비슷하게 서버에서 변경이 일어날떄마다 데이터를 전송하는 방법이다.

SSE 구조와 구현

Client 하나당 sseemitter하나

SSE의 기본적인 흐름은 클라이언트가 SSE요청을 보내면 서버에서는 클라이언트와 매핑되는 SSE 통신객체를 만든다(SseEmitter) 해당객체가 이벤트 발생시 eventsource를 client에게 전송하면서 데이터가 전달되는 방식이다. sseemitter는 SSE 통신을 지원하는 스프링에서 지원하는 API이다.

위 그림은 SSE 플로우이다. 위 그림에 맞춰서 하나하나 Spring에서 구현하고 설명하겠다. 맨처음 클라이언트에서 SSE 요청이 오면 서버는 위 그림과 같이 기본적인 응답해더값과 더불어 필요한 헤더들을 반환해야한다. (초록 화살표) 그러기 위해서는 컨트롤러, 서비스에 해당 로직을 작성해야한다. 아래의 controller를 보자.

1
2
3
4
5
6
7
//컨트롤러
	@GetMapping(value = "/connect", produces = "text/event-stream")
	public ResponseEntity<SseEmitter> sseConnection(@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId, HttpServletResponse response) {

		return new ResponseEntity<>(notificationService.connection(lastEventId, response), HttpStatus.OK);
	}

lastEventId를 파라미터로 받는 이유는 로그인 정보를 기반으로 미수신 event 유실을 예방하기 위해서이다. 자 이제 service를 구현해보겠다. 인터페이스는 따로 작성하지않겠다. 해당 서비스는 노란 화살표에 해당되는 내용이다. 클라이언트로부터 SSE 연결 요청을 받아서, user정보와 HttpServletResponse값을 토대로 connection 메서드를 작성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 //서비스
@Service
@RequiredArgsConstructor
public class NotificationServiceImpl implements NotificationService {
	private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
	private static final Long DEFAULT_TIMEOUT = 120L * 1000 * 60; // SSE 유효시간
	private final EmitterRepository emitterRepository;


	@Override
	public SseEmitter connection(String lastEventId, HttpServletResponse response) {

		String userid = "user"; // 로그인 정보를 기반으로 만들어야하는 곳이다.(로그인을 구현하지않아서 user라고 고정함)
		String id = userid + "_" + System.currentTimeMillis(); // 데이터 유실 시점 파악 위함

		// 클라이언트의 sse 연결 요청에 응답하기 위한 SseEmitter 객체 생성
		// 유효시간 지정으로 시간이 지나면 클라이언트에서 자동으로 재연결 요청함
		SseEmitter emitter = emitterRepository.save(id, new SseEmitter(DEFAULT_TIMEOUT));
		response.setHeader("X-Accel-Buffering", "no"); // NGINX PROXY 에서의 필요설정 불필요한 버퍼링방지


		// SseEmitter 의 완료/시간초과/에러로 인한 전송 불가 시 sseEmitter 삭제
		emitter.onCompletion(() -> emitterRepository.deleteAllStartByWithId(id));
		emitter.onTimeout(() -> emitterRepository.deleteAllStartByWithId(id));
		emitter.onError((e) -> emitterRepository.deleteAllStartByWithId(id));

		// 연결 직후, 데이터 전송이 없을 시 503 에러 발생. 에러 방지 위한 더미데이터 전송
		sendToClient(emitter, id, "연결되었습니다. " + id + "님");

		if (!lastEventId.isEmpty()) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다.
			Map<String, SseEmitter> events = emitterRepository.findAllStartById(userid);
			events.entrySet().stream()
					.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
					.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
		}

		return emitter;
	}
... 생략

emitterRepo에 save(id, ssemitter(timeout))을 저장함으로써 향후 이벤트가 발생됐을때 클라이언트에게 데이터를 전송할 수있다. 이때 repo에서는 sseEmitter를 관리하는 스레드들이 콜백할때 스레드가 다를수 있기에 ThreadSafe한 구조인 ConcurrentHashMap을 사용해서 해당 메시지를 저장해야한다.

또한, 초기에 클라이언트에게 메시지를 보내줘야하는데 이유는 데이터의 전송이 없으면 503에러를 발생시키기 떄문이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
	@Override
	public void sendToClient(SseEmitter emitter, String id, Object data) {

		try {
			emitter.send(SseEmitter.event()
					.id(id)
					.name("sse")
					.data(data));
		} catch (IOException e) {
			emitterRepository.deleteAllStartByWithId(id);
			log.error("SSE 연결 오류 발생", e);
		}
	}

이를 방지하기 위에 위와같이 간단하게 요청을 보내면된다. 필자는 “000님 연결되었습니다”“로 대체 했다.

마지막으로 클라이언트가 미수신한 Event에 대해서 전부 전송을 하기위한 findAllstartById(userId)를 구현해주면 SSE의 기본적인 셋팅이 마무리된다. 아래는 Repo를 구현한것이고 repo를 설명한 후에는 send라는 메서드를 만들어서 실시간으로 eventlisten해야하는 곳에 세팅을 하면된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Repository
@RequiredArgsConstructor
public class EmitterMyBatisRepository implements EmitterRepository {
	private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
	private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

	@Override
	public SseEmitter save(String id, SseEmitter sseEmitter) {
		emitters.put(id, sseEmitter);
		return sseEmitter;
	}

	@Override
	public void saveEventCache(String id, Object event) {
		eventCache.put(id, event);
	}

	@Override
	public Map<String, SseEmitter> findAllStartById(String id) {
		return emitters.entrySet().stream()
				.filter(entry -> entry.getKey().startsWith(id))
				.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
	}

	@Override
	public void deleteAllStartByWithId(String id) {
		emitters.forEach((key, emitter) -> {
			if (key.startsWith(id)) emitters.remove(key);
		});
	}
}

repo는 정말 단순하기 떄문에 별도의 설명은 하지않겠다. 앞서 말한듯이 thread safe하게 설계하면 문제될게 없다. 다만 EC2가 두대 이상으로 스케일 아웃을 한다면, concurrentHashMap 이 아닌 redis pub/sub방식을 이용해야 동작할 것이다.

다시 서비스단으로 돌아와서 제일 핵심 로직인 send를 구현할 것이다. userId와 title(알림에 사용될 타이틀), content (알림내용), EnumType(입고,출고 여부)를 담은 Notification 클래스를 만들고 위에 ID별로 저장해두었던 데이터를 가져온다. 그리고 초기에 설정해두었던 sendToClient에 데이터를 보내면 SSE통신이 완료된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//서비스
	@Override
	@Transactional
	// 알림 보낼 로직에 send 메서드 호출하면 됨
	public void send(String title, String content, NotificationType notificationType) {

		String id = "user";
		Notification notification = createNotification(id, title, content, notificationType);

		// 유저의 모든 SseEmitter 가져옴
		Map<String, SseEmitter> sseEmitters = emitterRepository.findAllStartById(id);
		sseEmitters.forEach(
				(key, emitter) -> {
					// 데이터 캐시 저장 (유실된 데이터 처리 위함)
					emitterRepository.saveEventCache(key, notification);
					// 데이터 전송
					sendToClient(emitter, key, notification);
				}
		);
	}

Nginx 도입시 주의점

Nginx가 was로 요청을 보낼때는 http/1.0으로 데이터를 보내는데, 1.0은 connection을 keep alive하지않는다. 그래서 connection 헤더는 close를 사용하게되면서 sse 통신이 close되는 문제점이 발생된다. 그래서 nginx cofig에는 꼭 아래와같이 설정을 해야한다.

1
2
proxy_set_header Connection '';
proxy_http_version 1.1;

또한, 위에서 말한것처럼 Nginx는 서버의 응답을 버퍼 형태로 모아두고 서버가 응답 데이터를 모두 보내면 클라이언트로 전송을 하는 구조인데 SSE는 이게 언제 크기가 종료될지 비동기적으로 작동하기에 실시간성이 떨어진다. 따라서 X-accel기능을 헤더에 심어서 보내주면 nginx의 proxy buffering기능을 SSE 통신에는 적용안할수가 있다.

This post is licensed under CC BY 4.0 by the author.

Redis Cache Spring boot에서 사용하기

cluster-index와 uncluster-index