Skip to content

Commit

Permalink
Merge branch 'develop' into fix/async
Browse files Browse the repository at this point in the history
  • Loading branch information
wakkpu authored Jan 20, 2024
2 parents 24ee5b1 + 21bc9c5 commit eb768d7
Show file tree
Hide file tree
Showing 19 changed files with 384 additions and 23 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ dependencies {
implementation platform('org.testcontainers:testcontainers-bom:1.19.3') //import bom
compileOnly 'org.projectlombok:lombok'

implementation 'org.springframework.cloud:spring-cloud-starter-aws:2.2.4.RELEASE'
implementation 'org.springframework.cloud:spring-cloud-aws-messaging:2.2.4.RELEASE'

developmentOnly 'org.springframework.boot:spring-boot-devtools'

annotationProcessor 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.dailyon.auctionservice.chat.response.ChatCommand;
import com.dailyon.auctionservice.chat.response.ChatPayload;
import com.dailyon.auctionservice.controller.ChatHandler;
import com.dailyon.auctionservice.service.AuctionService;
import com.dailyon.auctionservice.service.BidService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
Expand All @@ -22,24 +24,30 @@ public class ChatScheduler implements SchedulingConfigurer {

private final ThreadPoolTaskScheduler taskScheduler;
private final ChatHandler chatHandler;
private long countdown = 5 * 60 * 1000;
private final AuctionService auctionService;
private final BidService bidService;
private long countdown = 1 * 30 * 1000;
private Disposable jobDisposable;

public ChatScheduler(ChatHandler chatHandler) {
public ChatScheduler(
ChatHandler chatHandler, AuctionService auctionService, BidService bidService) {
taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.initialize();
this.chatHandler = chatHandler;
this.auctionService = auctionService;
this.bidService = bidService;
}

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setTaskScheduler(taskScheduler);
}

public void startJob() {
public void startJob(String auctionId) {
countdown = 1 * 60 * 1000;
if (this.jobDisposable == null || this.jobDisposable.isDisposed()) {
this.jobDisposable =
Flux.interval(Duration.ofSeconds(1)).flatMap(it -> executeJob()).subscribe();
Flux.interval(Duration.ofSeconds(1)).flatMap(it -> executeJob(auctionId)).subscribe();
}
}

Expand All @@ -49,12 +57,12 @@ public void stopJob() {
}
}

public Mono<Void> executeJob() {
public Mono<Void> executeJob(String auctionId) {
synchronized (this) { // 동기화 블록 시작
updateCountdown();
if (countdown <= 0) {
countdown = 0;
return sendCloseCommand()
return sendCloseCommand(auctionId)
.doFinally(signalType -> stopJob()); // sendCloseCommand()가 완전히 실행된 후에 stopJob()을 호출
} else {
return sendTimeSyncCommand();
Expand All @@ -69,13 +77,16 @@ private void updateCountdown() {
}
}

private Mono<Void> sendCloseCommand() {
private Mono<Void> sendCloseCommand(String auctionId) {
ChatPayload<Object> payload = ChatPayload.of(ChatCommand.AUCTION_CLOSE, null);
return chatHandler.broadCast(payload).then();
return auctionService
.endAuction(auctionId)
.flatMap(auction -> bidService.createAuctionHistories(auction))
.then(chatHandler.broadCast(payload));
}

private Mono<Void> sendTimeSyncCommand() {
ChatPayload<Long> payload = ChatPayload.of(ChatCommand.TIME_SYNC, countdown);
ChatPayload<Object> payload = ChatPayload.of(ChatCommand.TIME_SYNC, countdown);
return chatHandler.broadCast(payload).then();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.dailyon.auctionservice.config;

import com.dailyon.auctionservice.chat.messaging.RedisChatMessageListener;
import com.dailyon.auctionservice.document.BidHistory;
import com.dailyon.auctionservice.dto.response.BidInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -84,6 +83,18 @@ public ReactiveRedisTemplate<String, BidInfo> reactiveRedisTemplate(
return new ReactiveRedisTemplate<>(factory, serializationContext);
}

@Bean("reactiveRedisTemplateForAuction")
public ReactiveRedisTemplate<String, Long> reactiveRedisTemplateForAuction(
ReactiveRedisConnectionFactory factory) {
Jackson2JsonRedisSerializer<Long> serializer = new Jackson2JsonRedisSerializer<>(Long.class);
RedisSerializationContext<String, Long> serializationContext =
RedisSerializationContext.<String, Long>newSerializationContext(new StringRedisSerializer())
.value(serializer)
.build();

return new ReactiveRedisTemplate<>(factory, serializationContext);
}

@Bean
ApplicationRunner applicationRunner(RedisChatMessageListener redisChatMessageListener) {
return args -> {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/dailyon/auctionservice/config/RedisConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public ReactiveRedisTemplate<String, BidInfo> reactiveRedisTemplate(
return new ReactiveRedisTemplate<>(factory, serializationContext);
}

@Bean("reactiveRedisTemplateForAuction")
public ReactiveRedisTemplate<String, Long> reactiveRedisTemplateForAuction(
ReactiveRedisConnectionFactory factory) {
Jackson2JsonRedisSerializer<Long> serializer = new Jackson2JsonRedisSerializer<>(Long.class);
RedisSerializationContext<String, Long> serializationContext =
RedisSerializationContext.<String, Long>newSerializationContext(new StringRedisSerializer())
.value(serializer)
.build();

return new ReactiveRedisTemplate<>(factory, serializationContext);
}

@Bean
ApplicationRunner applicationRunner(RedisChatMessageListener redisChatMessageListener) {
return args -> {
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/dailyon/auctionservice/config/SqsConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.dailyon.auctionservice.config;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;

@Configuration
public class SqsConfig {
// bus-refresh 적용된 부분
private final Environment environment;

@Autowired
public SqsConfig(Environment environment) {
this.environment = environment;
}

@Bean
@Primary
@RefreshScope
public AmazonSQSAsync amazonSQSAsync() {

String accessKey = environment.getProperty("cloud.aws.credentials.ACCESS_KEY_ID");
String secretKey = environment.getProperty("cloud.aws.credentials.SECRET_ACCESS_KEY");
String sqsRegion = environment.getProperty("cloud.aws.sqs.region");

BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(sqsRegion)
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}

@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,11 @@ public Mono<Void> broadCast(ChatPayload chatPayload) {
.flatMap(redisChatMessagePublisher::publishChatMessage)
.then();
}

public Mono<Void> broadCastStart(ChatPayload payload) {
return objectStringConverter
.objectToString(payload)
.flatMap(redisChatMessagePublisher::publishChatMessage)
.then();
}
}
24 changes: 24 additions & 0 deletions src/main/java/com/dailyon/auctionservice/dto/response/BidInfo.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.dailyon.auctionservice.dto.response;

import com.dailyon.auctionservice.document.Auction;
import com.dailyon.auctionservice.document.AuctionHistory;
import com.dailyon.auctionservice.document.BidHistory;
import com.dailyon.auctionservice.dto.response.ReadAuctionDetailResponse.ReadProductDetailResponse;
import lombok.*;

@ToString
Expand All @@ -22,4 +25,25 @@ public static BidInfo from(BidHistory history) {
.round(history.getRound())
.build();
}

public AuctionHistory createAuctionHistory(
Auction auction,
ReadProductDetailResponse product,
Long bidAmount,
long auctionWinnerBid,
boolean isWinner) {
return AuctionHistory.builder()
.memberId(memberId)
.auctionId(auctionId)
.auctionName(auction.getAuctionName())
.auctionProductId(auction.getAuctionProductId())
.auctionProductImg(product.getImgUrl())
.auctionProductName(product.getName())
.auctionProductSizeId(product.getProductStocks().get(0).getProductSizeId())
.auctionProductSizeName(product.getProductStocks().get(0).getProductSizeName())
.auctionWinnerBid(auctionWinnerBid)
.isWinner(isWinner)
.memberHighestBid(bidAmount)
.build();
}
}
10 changes: 4 additions & 6 deletions src/main/java/com/dailyon/auctionservice/facade/BidFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,19 @@ public Mono<Void> start(String auctionId) {
.startAuction(auctionId)
.flatMap(
auction -> {
chatHandler.broadCast(payload);
scheduler.startJob();
return Mono.empty();
scheduler.startJob(auctionId);
return chatHandler.broadCastStart(payload);
});
}

public Mono<Void> end(String auctionId) {

return auctionService
.endAuction(auctionId)
.flatMap(
auction -> {
ChatPayload<Object> payload = ChatPayload.of(ChatCommand.AUCTION_CLOSE, null);
chatHandler.broadCast(payload);
return Mono.empty();
return chatHandler.broadCast(payload).then();
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.dailyon.auctionservice.infra.sqs;


import com.dailyon.auctionservice.infra.sqs.dto.SQSNotificationDto;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RequiredArgsConstructor
public class AuctionSqsProducer {

private final QueueMessagingTemplate sqsTemplate;
private final ObjectMapper objectMapper;

public static final String AUCTION_END_NOTIFICATION_QUEUE = "auction-end-notification-queue";

public void produce(String queueName, SQSNotificationDto sqsNotificationDto) {
// 알림 생성 중 에러 때문에 전체 로직이 취소되는것을 막음.
try {
String jsonMessage = objectMapper.writeValueAsString(sqsNotificationDto);
Message<String> message = MessageBuilder.withPayload(jsonMessage).build();
sqsTemplate.send(queueName, message);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.dailyon.auctionservice.infra.sqs.dto;

import com.dailyon.auctionservice.infra.sqs.dto.enums.NotificationType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.Map;

@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RawNotificationData {
private String message;
private Map<String, String> parameters;
private NotificationType notificationType; // 알림 유형


public static RawNotificationData forAuctionEnd(String auctionId) {
Map<String, String> parameters = new HashMap<>();
parameters.put("auctionId", auctionId);

return new RawNotificationData(
null,
parameters,
NotificationType.AUCTION_END
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.dailyon.auctionservice.infra.sqs.dto;

import lombok.*;

import java.util.Collections;
import java.util.List;

@Getter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SQSNotificationDto {
List<Long> whoToNotify; // if null, 전체유저에게 발송

RawNotificationData rawNotificationData;

public static SQSNotificationDto create( RawNotificationData rawNotificationData) {
return SQSNotificationDto.builder()
.rawNotificationData(rawNotificationData)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.dailyon.auctionservice.infra.sqs.dto.enums;

import lombok.Getter;

@Getter
public enum NotificationType {
PRODUCT_RESTOCK("재입고", "상품 재입고 알림."),
ORDER_COMPLETE("주문완료", "주문이 완료되었습니다."),
ORDER_SHIPPED("선적", "주문하신 상품이 출발했습니다."),
ORDER_ARRIVED("배송 도착", "주문하신 상품이 도착했습니다."),
ORDER_CANCELED("주문 취소", "주문이 취소되었습니다."),
AUCTION_END("실시간 경매 종료", "실시간 경매가 종료되었습니다."),
GIFT_RECEIVED("선물", "선물을 받았습니다."),
POINTS_EARNED_SNS("SNS 구매유도 포인트 적립", "SNS를 통해 포인트가 적립되었습니다."),
HEARTBEAT("하트비트", "연결 유지용 주기적 송신.");
// 정의하면서 넣을 예정

private final String name;
private final String description;

NotificationType(String name, String description) {
this.name = name;
this.description = description;
}
}
Loading

0 comments on commit eb768d7

Please sign in to comment.