Skip to content

Commit

Permalink
Merge pull request #36 from lotteon2/develop
Browse files Browse the repository at this point in the history
release
  • Loading branch information
CessnaJ authored Jan 16, 2024
2 parents 301abe1 + 7d48c75 commit 6fac632
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package com.dailyon.notificationservice.config;


import java.util.UUID;

public class NotificationConfig {
public static final String NOTIFICATIONS_STREAM_KEY = "notifications:stream";
public static final Long NOTIFICATION_STREAM_TTL = 5 * 60 * 1000L; // ms단위 5분

public static final String MEMBER_NOTIFICATION_CONNECTION_CHANNEL = "memberNotificationConnection";

public static final String CONSUMER_GROUP_NAME = "notification-group";
public static final String UNIQUE_CONSUMER_IDENTIFIER = CONSUMER_GROUP_NAME + "-" + UUID.randomUUID();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.dailyon.notificationservice.domain.notification.dto.NotificationData;
import com.dailyon.notificationservice.domain.notification.dto.WelcomeServerSentEvent;
import com.dailyon.notificationservice.domain.notification.service.NotificationService;
import com.dailyon.notificationservice.domain.notification.service.RedisPubSubService;
import com.dailyon.notificationservice.domain.notification.service.SseNotificationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -33,7 +34,7 @@ public class NotificationApiController {

private final NotificationService notificationService;
private final SseNotificationService sseNotificationService;

private final RedisPubSubService redisPubSubService;
// SQS 발행 테스트용 임시 코드
// private final QueueMessagingTemplate queueMessagingTemplate;
// private final String notificationQueue = "order-complete-notification-queue";
Expand Down Expand Up @@ -103,12 +104,18 @@ public Flux<ServerSentEvent<NotificationData>> subscribeToNotifications(@Request

Flux<ServerSentEvent<NotificationData>> welcomeFlux = Flux.just(WelcomeServerSentEvent.getInstance());

Flux<ServerSentEvent<NotificationData>> notificationFlux = sseNotificationService.streamNotifications(memberId);
log.info("Notification stream Flux 생성됨 - memberId: {}", memberId);
Flux<ServerSentEvent<NotificationData>> notificationFlux = sseNotificationService.streamNotifications(memberId)
.doOnCancel(() -> log.info("Notification stream 취소 - memberId: {}", memberId))
.doFinally(signalType -> log.info("signal type {} 와 함께 Notification stream 종료 memberId: {}", signalType, memberId));

// log.info("Notification stream Flux 생성됨 - memberId: {}", memberId);

redisPubSubService.publishMemberConnection(memberId).subscribe();

Flux<ServerSentEvent<NotificationData>> heartbeatFlux = Flux.interval(Duration.ofSeconds(15))
.take(240) // 1시간 송신 후 객체 정리 - 45동안 연결없으면 client에서 자동으로 재연결
.map(tick -> HeartbeatServerSentEvent.getInstance())
.doOnNext(tick -> log.info("Hearbeat event sent - memberId: {}", memberId)); // 반복적 송신 객체 싱글톤으로 처리
.doOnNext(tick -> log.info("Heartbeat event sent - memberId: {}", memberId));

return Flux.concat(welcomeFlux, Flux.merge(notificationFlux, heartbeatFlux))
.doOnError(e -> log.error("Error in SSE stream for member {}: {}", memberId, e.getMessage(), e))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.dailyon.notificationservice.domain.notification.dto;


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import static com.dailyon.notificationservice.config.NotificationConfig.UNIQUE_CONSUMER_IDENTIFIER;

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DisconnectInfoDto {
private Long memberId;
private String uniqueConsumerName;

public static DisconnectInfoDto create(Long memberId) {
return DisconnectInfoDto.builder()
.memberId(memberId)
.uniqueConsumerName(UNIQUE_CONSUMER_IDENTIFIER)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ExtendedNotificationData {
private List<Long> whoToNotify;
private NotificationData notificationData;
private Map<String, String> parameters;

public static ExtendedNotificationData of(List<Long> whoToNotify, NotificationData notificationData, Map<String, String> parameters) {
return ExtendedNotificationData.builder()
.whoToNotify(whoToNotify)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.dailyon.notificationservice.domain.notification.service;

import com.dailyon.notificationservice.domain.notification.dto.DisconnectInfoDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.Objects;

import static com.dailyon.notificationservice.config.NotificationConfig.MEMBER_NOTIFICATION_CONNECTION_CHANNEL;
import static com.dailyon.notificationservice.config.NotificationConfig.UNIQUE_CONSUMER_IDENTIFIER;


@Slf4j
@Service
public class RedisPubSubService {

private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private final SseNotificationService sseNotificationService;
private final ObjectMapper objectMapper;

public RedisPubSubService(
ReactiveRedisTemplate<String, String> reactiveRedisTemplate,
SseNotificationService sseNotificationService,
ObjectMapper objectMapper) {
this.reactiveRedisTemplate = Objects.requireNonNull(reactiveRedisTemplate);
this.sseNotificationService = Objects.requireNonNull(sseNotificationService);
this.objectMapper = Objects.requireNonNull(objectMapper);

// 빈 생성하면서 구독 시작
log.info("RedisPubSubService 빈을 생성하면서 구독 시작합니다.");
subscribeToMemberConnectionChannel();
}

private void subscribeToMemberConnectionChannel() {
log.info("redis sub 시작합니다.");
reactiveRedisTemplate.listenTo(ChannelTopic.of(MEMBER_NOTIFICATION_CONNECTION_CHANNEL))
.map(ReactiveSubscription.Message::getMessage)
.doOnNext(this::handleMemberDisconnection)
.subscribe();
// .subscribe(this::handleMemberDisconnection);
}

private void handleMemberDisconnection(String disconnectInfoDtoJson) {
try {
DisconnectInfoDto disconnectInfoDto = objectMapper.readValue(disconnectInfoDtoJson, DisconnectInfoDto.class);
if (disconnectInfoDto.getUniqueConsumerName().equals(UNIQUE_CONSUMER_IDENTIFIER)) {
log.info("동일 인스턴스에서 보낸 연결해제 pub입니다.");
return;
}

Long memberIdToDisconnect = disconnectInfoDto.getMemberId();

if (sseNotificationService.isUserConnected(memberIdToDisconnect)) {
log.info(memberIdToDisconnect + "의 연결을 해제합니다.");
sseNotificationService.disconnectMember(memberIdToDisconnect);
}
} catch (JsonProcessingException e) {
log.error("handleMemberDisconnection의 readValue 도중 에러발생", e);
}
}

public Mono<Void> publishMemberConnection(Long memberId) {
log.info(memberId + "접속. disconnect 메세지 발행합니다.");
DisconnectInfoDto disconnectInfoDto = DisconnectInfoDto.create(memberId);

return Mono.fromCallable(() -> objectMapper.writeValueAsString(disconnectInfoDto))
.flatMap(jsonData -> reactiveRedisTemplate.convertAndSend(MEMBER_NOTIFICATION_CONNECTION_CHANNEL, jsonData))
.doOnError(e -> {
log.error("publishMemberConnection 도중 에러발생", e);
})
.then();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.lettuce.core.RedisBusyException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
Expand All @@ -19,11 +18,12 @@
import reactor.core.publisher.Mono;


import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

import static com.dailyon.notificationservice.config.NotificationConfig.UNIQUE_CONSUMER_IDENTIFIER;

@Slf4j
@Service
public class RedisStreamNotificationService {
Expand All @@ -32,9 +32,6 @@ public class RedisStreamNotificationService {
private final ObjectMapper objectMapper;
private final SseNotificationService sseNotificationService;

private static final String CONSUMER_GROUP_NAME = "notification-group";
private static final String UNIQUE_CONSUMER_GROUP_NAME = CONSUMER_GROUP_NAME + "-" + UUID.randomUUID();

/*
init뒤 구독정보 초기화 로직
initializeConsumerGroup를 해보고, 결과와 관계없이(then) getLastStreamEntryId 통해 streams의 lastRecordId 구함.
Expand All @@ -51,7 +48,7 @@ public RedisStreamNotificationService(
Mono.just(NotificationConfig.NOTIFICATIONS_STREAM_KEY)
.flatMap(this::initializeConsumerGroup)
.flatMap(success -> getLastStreamEntryId(NotificationConfig.NOTIFICATIONS_STREAM_KEY))
.flatMap(lastEntryId -> consumeNotificationsFrom(UNIQUE_CONSUMER_GROUP_NAME, NotificationConfig.NOTIFICATIONS_STREAM_KEY))
.flatMap(lastEntryId -> consumeNotificationsFrom(UNIQUE_CONSUMER_IDENTIFIER, NotificationConfig.NOTIFICATIONS_STREAM_KEY))
.subscribe(result -> log.info("redis streams 구독 시작합니다."),
error -> log.error("redis streams 구독에 실패했습니다...", error));
}
Expand All @@ -60,7 +57,7 @@ public RedisStreamNotificationService(
private Mono<Boolean> initializeConsumerGroup(String streamKey) {
log.info("initializeConsumerGroup 진입");
return reactiveRedisTemplate.opsForStream()
.createGroup(streamKey, ReadOffset.latest(), UNIQUE_CONSUMER_GROUP_NAME)
.createGroup(streamKey, ReadOffset.latest(), UNIQUE_CONSUMER_IDENTIFIER)
.thenReturn(true) // 성공 시 true 반환
.onErrorResume(e -> {
return Mono.just(false);
Expand Down Expand Up @@ -153,7 +150,7 @@ private void processNotification(String jsonNotification) throws JsonProcessingE

private void sendAcknowledgment(MapRecord<String, Object, Object> record) {
reactiveRedisTemplate.opsForStream()
.acknowledge(NotificationConfig.NOTIFICATIONS_STREAM_KEY, UNIQUE_CONSUMER_GROUP_NAME, record.getId().getValue())
.acknowledge(NotificationConfig.NOTIFICATIONS_STREAM_KEY, UNIQUE_CONSUMER_IDENTIFIER, record.getId().getValue())
.subscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;


@Slf4j
Expand All @@ -36,11 +37,23 @@ public class SseNotificationService {

// 구독하기. 구독 SSE 객체에는 client에게 줄 notificationData를 넣어준다.
public Flux<ServerSentEvent<NotificationData>> streamNotifications(Long memberId) {
// log.info(memberId + "새연결, 먼저 userSinks에서 제거");
userSinks.remove(memberId); // 새연결시 일단 제거

log.info("Creating new SSE Sink for memberId: {}", memberId);
Sinks.Many<ServerSentEvent<NotificationData>> sink = Sinks.many().multicast().onBackpressureBuffer();
userSinks.put(memberId, sink);

// log.info(userSinks.toString());
// log.info("새로 구독 후 확인");
// log.info(userSinks.entrySet() // Entry set 방문
// .stream() // 스트림으로 변환
// .map(entry -> {
// Long userId = entry.getKey(); // 키는 사용자 ID
// Sinks.Many<ServerSentEvent<NotificationData>> a = entry.getValue(); // 값은 Sinks.Many
// int subscriberCount = a.currentSubscriberCount(); // 현재 구독자 수 얻기
// return userId + " -> " + subscriberCount + " subscribers";
// })
// .collect(Collectors.joining(", ", "{", "}")));

Consumer<Throwable> removeSinkConsumer = e -> {
userSinks.remove(memberId);
Expand All @@ -53,7 +66,7 @@ public Flux<ServerSentEvent<NotificationData>> streamNotifications(Long memberId
}

private Mono<Void> sendSseNotificationToUser(NotificationData data, Long memberId) {
log.info("단일 유저에게 발송합니다.: {}", memberId);
// log.info("단일 유저에게 발송합니다.: {}", memberId);
return Mono.fromRunnable(() -> {
Optional.ofNullable(userSinks.get(memberId)).ifPresent(sink -> {
log.info("memberId: {}를 찾았습니다. 이제 메세지 발송합니다.", memberId);
Expand All @@ -66,7 +79,7 @@ private Mono<Void> sendSseNotificationToUser(NotificationData data, Long memberI
}

public Mono<Void> sendNotificationToConnectedUsers(List<Long> memberIds, NotificationData notificationData) {
log.info("연결된 유저들에게 발송합니다.: {}", memberIds.toString());
// log.info("연결된 유저들에게 발송합니다.: {}", memberIds.toString());
return Flux.fromIterable(memberIds)
.flatMap(memberId -> sendSseNotificationToUser(notificationData, memberId))
.then();
Expand All @@ -86,7 +99,28 @@ public Mono<Void> clearProductRestockNotifications(Long productId, Long sizeId)
}).then();
}


public boolean isUserConnected(Long memberId) {
return userSinks.containsKey(memberId);
}

public void disconnectMember(Long memberId) {
log.info("Disconnecting SSE sink for memberId: {}", memberId);
Sinks.Many<ServerSentEvent<NotificationData>> sink = userSinks.remove(memberId);

// log.info("제거 후 확인");
// log.info(userSinks.entrySet() // Entry set 방문
// .stream() // 스트림으로 변환
// .map(entry -> {
// Long userId = entry.getKey(); // 키는 사용자 ID
// Sinks.Many<ServerSentEvent<NotificationData>> a = entry.getValue(); // 값은 Sinks.Many
// int subscriberCount = a.currentSubscriberCount(); // 현재 구독자 수 얻기
// return userId + " -> " + subscriberCount + " subscribers";
// })
// .collect(Collectors.joining(", ", "{", "}")));

if (sink != null) {
sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}
}
}

0 comments on commit 6fac632

Please sign in to comment.