Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lf1 525 qna register notification 기능 추가 #10

Merged
merged 5 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ dependencies {
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.4.RELEASE'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'software.amazon.awssdk:sns:2.21.37'
implementation 'io.github.lotteon-maven:blooming-blooms-utils:25057818'
implementation 'io.github.lotteon-maven:blooming-blooms-utils:202312101522'

testImplementation 'org.mockito:mockito-core:4.8.0'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kr.bb.notification.domain.emitter.api;

import kr.bb.notification.domain.emitter.application.SseService;
import kr.bb.notification.domain.notification.entity.NotificationCommand.SSENotification;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand All @@ -12,8 +14,16 @@
public class SSEClientController {
private final SseService sseService;

@PostMapping("/send-data/{userId}")
public void sendData(@PathVariable Long userId, @RequestBody Object data) {
sseService.notify(userId, data);
// TODO: 삭제될 예정, sse 전송 테스트 용
@PostMapping(value = "/send-data/manager/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void sendData(@PathVariable Long userId, @RequestBody String data) {
SSENotification build =
SSENotification.builder()
.role("manager")
.redirectUrl("/question")
.content(data)
.userId(3L)
.build();
sseService.notify(build);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

Expand All @@ -13,8 +13,9 @@
public class SSERestController {
private final SseService sseService;

@GetMapping(value = "subscribe/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(@RequestHeader Long userId) {
return sseService.subscribe(userId);
// TODO: 화면 테스트용으로 pathvariable 사용
@GetMapping(value = "subscribe/manager/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribeManager(@PathVariable Long userId) {
return sseService.subscribe(userId, "manager");
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,70 @@
package kr.bb.notification.domain.emitter.application;

import bloomingblooms.domain.notification.NotificationKind;
import java.io.IOException;
import kr.bb.notification.domain.emitter.repository.SSERepository;
import kr.bb.notification.domain.notification.entity.NotificationCommand.SSENotification;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Slf4j
@Service
@RequiredArgsConstructor
public class SseService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final SSERepository emitterRepository;

public SseEmitter subscribe(Long userId) {
SseEmitter emitter = createEmitter(userId);
sendToClient(userId, "EventStream Created. [userId=" + userId + "]");
public SseEmitter subscribe(Long userId, String role) {
SseEmitter emitter = createEmitter(userId, role);
sendToClient(userId, role, "EventStream Created. [userId=" + userId + "]");
return emitter;
}

public void notify(Long userId, Object event) {
sendToClient(userId, event);
public void notify(SSENotification event) {
sendToClient(event);
}

private void sendToClient(Long userId, Object data) {
SseEmitter emitter = emitterRepository.get(userId);
private void sendToClient(SSENotification event) {
String id = event.getRole() + event.getUserId();
String data = event.getContent() + "\n" + event.getRedirectUrl();
SseEmitter emitter = emitterRepository.get(event.getUserId(), event.getRole());
if (emitter != null) {
try {
emitter.send(
SseEmitter.event()
.id(id)
.name(NotificationKind.QUESTION.getKind())
.data(data, MediaType.TEXT_EVENT_STREAM));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

private void sendToClient(Long userId, String role, Object data) {
SseEmitter emitter = emitterRepository.get(userId, role);

if (emitter != null) {
try {
emitter.send(SseEmitter.event().id(String.valueOf(userId)).name("sse").data(data));
} catch (IOException exception) {
emitterRepository.deleteById(userId);
emitterRepository.deleteById(userId, role);
throw new RuntimeException("연결 오류!");
}
}
}

private SseEmitter createEmitter(Long userId) {
private SseEmitter createEmitter(Long userId, String role) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(userId, emitter);
emitterRepository.save(userId, role, emitter);

// Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
emitter.onCompletion(() -> emitterRepository.deleteById(userId));
emitter.onCompletion(() -> emitterRepository.deleteById(userId, role));
// Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
emitter.onTimeout(() -> emitterRepository.deleteById(userId));
emitter.onTimeout(() -> emitterRepository.deleteById(userId, role));

return emitter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
@Repository
@RequiredArgsConstructor
public class SSERepository {
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

public void save(Long userId, SseEmitter emitter) {
emitters.put(userId, emitter);
public void save(Long userId, String role, SseEmitter emitter) {
emitters.put(role + userId, emitter);
}

public void deleteById(Long userId) {
emitters.remove(userId);
public void deleteById(Long userId, String role) {
emitters.remove(role + userId);
}

public SseEmitter get(Long userId) {
return emitters.get(userId);
public SseEmitter get(Long userId, String role) {
return emitters.get(role + userId);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kr.bb.notification.domain.notification.application;

import bloomingblooms.domain.notification.NotificationData;
import bloomingblooms.domain.notification.QuestionRegisterNotification;
import bloomingblooms.domain.resale.ResaleNotificationList;
import java.util.List;
import kr.bb.notification.domain.notification.entity.MemberNotification;
Expand All @@ -19,10 +20,21 @@ public class NotificationCommandService {

@Transactional
public void saveResaleNotification(NotificationData<ResaleNotificationList> restoreNotification) {
Notification notification = NotificationCommand.toEntity(restoreNotification.getMessage());
Notification notification = NotificationCommand.toEntity(restoreNotification.getPublishInformation());
List<MemberNotification> memberNotifications =
MemberNotificationCommand.toEntityList(restoreNotification.getWhoToNotify());
notification.setMemberNotifications(memberNotifications);
notificationJpaRepository.save(notification);
}

@Transactional
public void saveQuestionRegister(
NotificationData<QuestionRegisterNotification> questionRegisterNotification) {
Notification notification =
NotificationCommand.toEntity(questionRegisterNotification.getPublishInformation());
MemberNotification memberNotification =
MemberNotificationCommand.toEntity(questionRegisterNotification.getWhoToNotify());
notification.setMemberNotifications(List.of(memberNotification));
notificationJpaRepository.save(notification);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kr.bb.notification.domain.notification.entity;

import bloomingblooms.domain.notification.QuestionRegisterNotification;
import bloomingblooms.domain.resale.ResaleNotificationList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -11,4 +12,8 @@ public static List<MemberNotification> toEntityList(ResaleNotificationList whoTo
.map(item -> MemberNotification.builder().userId(item.getUserId()).build())
.collect(Collectors.toList());
}

public static MemberNotification toEntity(QuestionRegisterNotification whoToNotify) {
return MemberNotification.builder().userId(whoToNotify.getUserId()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import bloomingblooms.domain.notification.NotificationData;
import bloomingblooms.domain.notification.PublishNotificationInformation;
import bloomingblooms.domain.notification.QuestionRegisterNotification;
import bloomingblooms.domain.resale.ResaleNotificationList;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -58,18 +59,18 @@ public SMSNotification(Long userId, String content, String redirectUrl, String p
this.phoneNumber = phoneNumber;
}

public static List<SMSNotification> getData(
public static List<SMSNotification> getResaleNotificationSMSData(
NotificationData<ResaleNotificationList> restoreNotification) {
return restoreNotification.getWhoToNotify().getResaleNotificationData().stream()
.map(
item ->
SMSNotification.builder()
.phoneNumber(item.getPhoneNumber())
.content(
restoreNotification.getMessage().getNotificationKind().getKind()
restoreNotification.getPublishInformation().getNotificationKind().getKind()
+ "\n"
+ restoreNotification.getWhoToNotify().getProductName()
+ restoreNotification.getMessage().getMessage())
+ restoreNotification.getPublishInformation().getMessage())
.redirectUrl(
RedirectUrl.PRODUCT_DETAIL.getRedirectUrl()
+ restoreNotification.getWhoToNotify().getProductId())
Expand All @@ -96,4 +97,38 @@ public SMSNotification build() {
}
}
}

@Getter
public static class SSENotification extends NotificationInformation {
private final String role;

@Builder
public SSENotification(Long userId, String content, String redirectUrl, String role) {
super(userId, content, redirectUrl);
this.role = role;
}

public static SSENotificationBuilder builder() {
return new SSENotificationBuilder();
}

public static SSENotification getQuestionRegisterSSEData(
NotificationData<QuestionRegisterNotification> questionRegisterNotification) {
return SSENotification.builder()
.content(
questionRegisterNotification.getPublishInformation().getNotificationKind().getKind()
+ "\n"
+ questionRegisterNotification.getPublishInformation().getMessage())
.redirectUrl(questionRegisterNotification.getPublishInformation().getNotificationUrl())
.role(questionRegisterNotification.getWhoToNotify().getRole().getRole())
.build();
}

public static class SSENotificationBuilder extends NotificationInformationBuilder {
@Override
public SSENotification build() {
return new SSENotification(userId, content, redirectUrl, role);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
package kr.bb.notification.domain.notification.facade;

import bloomingblooms.domain.notification.NotificationData;
import bloomingblooms.domain.notification.QuestionRegisterNotification;
import bloomingblooms.domain.resale.ResaleNotificationList;
import java.util.List;
import kr.bb.notification.domain.notification.application.NotificationCommandService;
import kr.bb.notification.domain.notification.entity.NotificationCommand.SMSNotification;
import kr.bb.notification.domain.notification.entity.NotificationCommand.SSENotification;
import kr.bb.notification.domain.notification.infrastructure.sms.SendSMS;
import kr.bb.notification.domain.notification.infrastructure.sse.SendSSE;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ResaleNotificationFacadeHandler {
public class NotificationFacadeHandler {
private final SendSMS sms;
private final SendSSE sse;
private final NotificationCommandService notificationCommandService;

public void publishResaleNotification(
NotificationData<ResaleNotificationList> restoreNotification) {
List<SMSNotification> data = SMSNotification.getData(restoreNotification);
List<SMSNotification> data = SMSNotification.getResaleNotificationSMSData(restoreNotification);
data.forEach(sms::publishCustomer);

// save notification
notificationCommandService.saveResaleNotification(restoreNotification);
}

public void publishQuestionRegisterNotification(
NotificationData<QuestionRegisterNotification> questionRegisterNotification) {
SSENotification sseNotification =
SSENotification.getQuestionRegisterSSEData(questionRegisterNotification);
sse.publishCustomer(sseNotification);

// save notification
notificationCommandService.saveQuestionRegister(questionRegisterNotification);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

public interface InfrastructureActionHandler<
T extends NotificationCommand.NotificationInformation> {
void publishCustomer(T NotifyData);
void publishCustomer(T notifyData);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package kr.bb.notification.domain.notification.infrastructure.message;

import bloomingblooms.domain.notification.NotificationData;
import bloomingblooms.domain.notification.QuestionRegisterNotification;
import bloomingblooms.domain.resale.ResaleNotificationList;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import kr.bb.notification.domain.notification.facade.ResaleNotificationFacadeHandler;
import kr.bb.notification.domain.notification.facade.NotificationFacadeHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.aws.messaging.listener.Acknowledgment;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
Expand All @@ -18,7 +19,7 @@
@RequiredArgsConstructor
public class NotificationSQSListener {
private final ObjectMapper objectMapper;
private final ResaleNotificationFacadeHandler resaleNotificationFacadeHandler;
private final NotificationFacadeHandler notificationFacadeHandler;

@SqsListener(
value = "${cloud.aws.sqs.product-resale-notification-queue.name}",
Expand All @@ -33,7 +34,25 @@ public void consumeProductResaleNotificationCheckQueue(
.getTypeFactory()
.constructParametricType(NotificationData.class, ResaleNotificationList.class));
// call facade
resaleNotificationFacadeHandler.publishResaleNotification(restoreNotification);
notificationFacadeHandler.publishResaleNotification(restoreNotification);
ack.acknowledge();
}

@SqsListener(
value = "${cloud.aws.sqs.question-register-notification-queue.name}",
deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void consumeQuestionRegisterNotificationQueue(
@Payload String message, @Headers Map<String, String> headers, Acknowledgment ack)
throws JsonProcessingException {
NotificationData<QuestionRegisterNotification> questionRegisterNotification =
objectMapper.readValue(
message,
objectMapper
.getTypeFactory()
.constructParametricType(
NotificationData.class, QuestionRegisterNotification.class));
// call facade
notificationFacadeHandler.publishQuestionRegisterNotification(questionRegisterNotification);
ack.acknowledge();
}
}
Loading
Loading