diff --git a/build.gradle b/build.gradle index dae4dab..0525931 100644 --- a/build.gradle +++ b/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-validation' implementation group: 'io.github.dailyon-maven', name: 'daily-on-common', version: '0.0.9' + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'io.jsonwebtoken:jjwt-api:0.11.5' implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j' diff --git a/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java new file mode 100644 index 0000000..138374e --- /dev/null +++ b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java @@ -0,0 +1,30 @@ +package com.dailyon.auctionservice.infra.kafka; + +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; +import com.dailyon.auctionservice.service.AuctionHistoryService; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class AuctionEventListener { + + private final ObjectMapper objectMapper; + private final AuctionHistoryService auctionHistoryService; + + @KafkaListener(topics = "cancel-bidding") + public void cancel(String message, Acknowledgment ack) { + BiddingDTO biddingDTO = null; + try { + biddingDTO = objectMapper.readValue(message, BiddingDTO.class); + auctionHistoryService.delete(biddingDTO); + ack.acknowledge(); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java new file mode 100644 index 0000000..9bff5e6 --- /dev/null +++ b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java @@ -0,0 +1,27 @@ +package com.dailyon.auctionservice.infra.kafka; + +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class AuctionEventProducer { + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + + public void createAuctionHistory(BiddingDTO biddingDTO) { + log.info("createAuctionHistory -> memberId {}", biddingDTO.getMemberId()); + try { + kafkaTemplate.send("success-bidding", objectMapper.writeValueAsString(biddingDTO)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java b/src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java new file mode 100644 index 0000000..8edafa5 --- /dev/null +++ b/src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java @@ -0,0 +1,14 @@ +package com.dailyon.auctionservice.infra.kafka.dto; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class BiddingDTO { + private String auctionId; + private Long memberId; + private Long usePoints; +} diff --git a/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java b/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java index 4f95f0a..663a2e6 100644 --- a/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java +++ b/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java @@ -1,12 +1,14 @@ package com.dailyon.auctionservice.service; import com.dailyon.auctionservice.document.AuctionHistory; +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; import com.dailyon.auctionservice.repository.AuctionHistoryRepository; import lombok.RequiredArgsConstructor; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; @@ -46,4 +48,13 @@ public AuctionHistory getAuctionHistory(String memberId, String auctionId) { .orElseThrow(() -> new RuntimeException("해당 경매 내역 정보가 존재하지 않습니다.")); return auctionHistory; } + + public void delete(BiddingDTO biddingDTO) { + AuctionHistory auctionHistory = + auctionHistoryRepository + .findByAuctionIdAndMemberId( + biddingDTO.getAuctionId(), String.valueOf(biddingDTO.getMemberId())) + .get(); + auctionHistoryRepository.delete(auctionHistory); + } } diff --git a/src/main/java/com/dailyon/auctionservice/service/BidService.java b/src/main/java/com/dailyon/auctionservice/service/BidService.java index 0d638cc..ff51a61 100644 --- a/src/main/java/com/dailyon/auctionservice/service/BidService.java +++ b/src/main/java/com/dailyon/auctionservice/service/BidService.java @@ -8,6 +8,8 @@ import com.dailyon.auctionservice.dto.response.BidInfo; import com.dailyon.auctionservice.dto.response.ReadAuctionDetailResponse; import com.dailyon.auctionservice.dto.response.TopBidderResponse; +import com.dailyon.auctionservice.infra.kafka.AuctionEventProducer; +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; import com.dailyon.auctionservice.infra.sqs.AuctionSqsProducer; import com.dailyon.auctionservice.infra.sqs.dto.RawNotificationData; import com.dailyon.auctionservice.infra.sqs.dto.SQSNotificationDto; @@ -36,6 +38,7 @@ public class BidService { private final ProductClient productClient; private final AuctionSqsProducer auctionSqsProducer; private final AuctionHistoryRepository auctionHistoryRepository; + private final AuctionEventProducer eventProducer; public Mono create(CreateBidRequest request, String memberId) { BidHistory bidHistory = request.toEntity(memberId); @@ -77,10 +80,10 @@ private Mono processAuction(Auction auction, Long bid) { Mono productInfo = productClient.readProductDetail(auction.getAuctionProductId()); - return Mono.when( - saveSuccessfulBiddersHistory(productInfo, auction, bid), - saveRemainBiddersHistory(productInfo, auction, bid), - sendSqsNotification(auction)); + return Mono.zip( + saveSuccessfulBiddersHistory(productInfo, auction, bid), + saveRemainBiddersHistory(productInfo, auction, bid)) + .then(sendSqsNotification(auction)); } private Mono saveSuccessfulBiddersHistory( @@ -94,8 +97,16 @@ private Mono saveSuccessfulBiddersHistory( tuple -> createAuctionHistories(auction, tuple.getT1(), tuple.getT2(), true)); return saveAuctionHistories(auctionHistories) - .then(auctionHistories.collectList()) - // .doOnNext() kafka메세지 들어갈자리 + .doOnNext( + auctionHistoryList -> { + for (AuctionHistory auctionHistory : auctionHistoryList) { + BiddingDTO biddingDTO = new BiddingDTO(); + biddingDTO.setMemberId(Long.valueOf(auctionHistory.getMemberId())); + biddingDTO.setAuctionId(auctionHistory.getAuctionId()); + biddingDTO.setUsePoints((long) (auctionHistory.getAuctionWinnerBid() * 0.05)); + eventProducer.createAuctionHistory(biddingDTO); + } + }) .then(); } @@ -147,7 +158,9 @@ private Flux createAuctionHistories( }); } - private Mono saveAuctionHistories(Flux auctionHistories) { - return Mono.when(auctionHistories.map(auctionHistoryRepository::save).collectList()); + private Mono> saveAuctionHistories(Flux auctionHistories) { + return auctionHistories + .collectList() + .flatMap(list -> Flux.fromIterable(list).map(auctionHistoryRepository::save).collectList()); } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 72f4267..3fce6bb 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -13,6 +13,23 @@ spring: host: redis-cluster port: 6379 password: 11111111 + kafka: + producer: + bootstrap-servers: kafka-service:9092 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # Leader Broker가 메시지를 받았는지 확인 신호 요청 + acks: 1 + consumer: + bootstrap-servers: kafka-service:9092 + # 컨슈머 그룹 지정 - 컨슈머 그룹안의 하나의 컨슈머가 다운되더라도 컨슈머 그룹 안의 다른 컨슈머가 읽을 수 있도록 함 또한 Offset으로 재시작시 메시지 관리 가능 + group-id: auction-service + # Kafka consumer가 다운되었을 때 가장 빠른 컨슈머 오프셋을 가지는 것 부터 읽는다. + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + ack-mode: manual eureka: client: register-with-eureka: true diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 2918efc..51f3605 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -17,6 +17,24 @@ spring: nodes: redis-cluster-0.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-1.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-2.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-3.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-4.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-5.redis-cluster.prod.svc.cluster.local:6379 max-redirects: 3 password: + + kafka: + producer: + bootstrap-servers: kafka-service:9092 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # Leader Broker가 메시지를 받았는지 확인 신호 요청 + acks: 1 + consumer: + bootstrap-servers: kafka-service:9092 + # 컨슈머 그룹 지정 - 컨슈머 그룹안의 하나의 컨슈머가 다운되더라도 컨슈머 그룹 안의 다른 컨슈머가 읽을 수 있도록 함 또한 Offset으로 재시작시 메시지 관리 가능 + group-id: auction-service + # Kafka consumer가 다운되었을 때 가장 빠른 컨슈머 오프셋을 가지는 것 부터 읽는다. + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + ack-mode: manual eureka: client: register-with-eureka: true diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 58bb1cb..85f7029 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -7,7 +7,9 @@ spring: enabled: false discovery: enabled: false - + kafka: + consumer: + group-id: sjy logging: cloud: aws: