From f99cbe0837eb5198596ba77f9437fe39ce07bdde Mon Sep 17 00:00:00 2001 From: JunYong Sun Date: Mon, 22 Jan 2024 01:01:23 +0900 Subject: [PATCH 1/3] =?UTF-8?q?[FIX]=20=ED=9E=88=EC=8A=A4=ED=86=A0?= =?UTF-8?q?=EB=A6=AC=20=EB=82=B4=EC=97=AD=20=EC=A0=80=EC=9E=A5=20=ED=9B=84?= =?UTF-8?q?=20sqs=EB=B0=9C=EC=86=A1=20=EB=A1=9C=EC=A7=81=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/dailyon/auctionservice/service/BidService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/dailyon/auctionservice/service/BidService.java b/src/main/java/com/dailyon/auctionservice/service/BidService.java index 0d638cc..51dc81a 100644 --- a/src/main/java/com/dailyon/auctionservice/service/BidService.java +++ b/src/main/java/com/dailyon/auctionservice/service/BidService.java @@ -77,10 +77,10 @@ private Mono processAuction(Auction auction, Long bid) { Mono productInfo = productClient.readProductDetail(auction.getAuctionProductId()); - return Mono.when( - saveSuccessfulBiddersHistory(productInfo, auction, bid), - saveRemainBiddersHistory(productInfo, auction, bid), - sendSqsNotification(auction)); + return Mono.zip( + saveSuccessfulBiddersHistory(productInfo, auction, bid), + saveRemainBiddersHistory(productInfo, auction, bid)) + .then(sendSqsNotification(auction)); } private Mono saveSuccessfulBiddersHistory( From 004e6ed04718c83d5d3a0356e740244c5d4e771d Mon Sep 17 00:00:00 2001 From: JunYong Sun Date: Mon, 22 Jan 2024 01:55:10 +0900 Subject: [PATCH 2/3] =?UTF-8?q?[ADD]=20=ED=8F=AC=EC=9D=B8=ED=8A=B8=20?= =?UTF-8?q?=EC=82=AC=EC=9A=A9=20=EB=A1=9C=EC=A7=81=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 1 + .../infra/kafka/AuctionEventListener.java | 30 +++++++++++++++++++ .../infra/kafka/AuctionEventProducer.java | 27 +++++++++++++++++ .../infra/kafka/dto/BiddingDTO.java | 14 +++++++++ .../service/AuctionHistoryService.java | 11 +++++++ .../auctionservice/service/BidService.java | 21 ++++++++++--- src/main/resources/application-dev.yml | 17 +++++++++++ src/main/resources/application-prod.yml | 18 +++++++++++ 8 files changed, 135 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java create mode 100644 src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java create mode 100644 src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java diff --git a/build.gradle b/build.gradle index dae4dab..0525931 100644 --- a/build.gradle +++ b/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-validation' implementation group: 'io.github.dailyon-maven', name: 'daily-on-common', version: '0.0.9' + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'io.jsonwebtoken:jjwt-api:0.11.5' implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j' diff --git a/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java new file mode 100644 index 0000000..138374e --- /dev/null +++ b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventListener.java @@ -0,0 +1,30 @@ +package com.dailyon.auctionservice.infra.kafka; + +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; +import com.dailyon.auctionservice.service.AuctionHistoryService; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class AuctionEventListener { + + private final ObjectMapper objectMapper; + private final AuctionHistoryService auctionHistoryService; + + @KafkaListener(topics = "cancel-bidding") + public void cancel(String message, Acknowledgment ack) { + BiddingDTO biddingDTO = null; + try { + biddingDTO = objectMapper.readValue(message, BiddingDTO.class); + auctionHistoryService.delete(biddingDTO); + ack.acknowledge(); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java new file mode 100644 index 0000000..9bff5e6 --- /dev/null +++ b/src/main/java/com/dailyon/auctionservice/infra/kafka/AuctionEventProducer.java @@ -0,0 +1,27 @@ +package com.dailyon.auctionservice.infra.kafka; + +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class AuctionEventProducer { + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + + public void createAuctionHistory(BiddingDTO biddingDTO) { + log.info("createAuctionHistory -> memberId {}", biddingDTO.getMemberId()); + try { + kafkaTemplate.send("success-bidding", objectMapper.writeValueAsString(biddingDTO)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java b/src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java new file mode 100644 index 0000000..8edafa5 --- /dev/null +++ b/src/main/java/com/dailyon/auctionservice/infra/kafka/dto/BiddingDTO.java @@ -0,0 +1,14 @@ +package com.dailyon.auctionservice.infra.kafka.dto; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class BiddingDTO { + private String auctionId; + private Long memberId; + private Long usePoints; +} diff --git a/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java b/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java index 4f95f0a..663a2e6 100644 --- a/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java +++ b/src/main/java/com/dailyon/auctionservice/service/AuctionHistoryService.java @@ -1,12 +1,14 @@ package com.dailyon.auctionservice.service; import com.dailyon.auctionservice.document.AuctionHistory; +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; import com.dailyon.auctionservice.repository.AuctionHistoryRepository; import lombok.RequiredArgsConstructor; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; @@ -46,4 +48,13 @@ public AuctionHistory getAuctionHistory(String memberId, String auctionId) { .orElseThrow(() -> new RuntimeException("해당 경매 내역 정보가 존재하지 않습니다.")); return auctionHistory; } + + public void delete(BiddingDTO biddingDTO) { + AuctionHistory auctionHistory = + auctionHistoryRepository + .findByAuctionIdAndMemberId( + biddingDTO.getAuctionId(), String.valueOf(biddingDTO.getMemberId())) + .get(); + auctionHistoryRepository.delete(auctionHistory); + } } diff --git a/src/main/java/com/dailyon/auctionservice/service/BidService.java b/src/main/java/com/dailyon/auctionservice/service/BidService.java index 51dc81a..ff51a61 100644 --- a/src/main/java/com/dailyon/auctionservice/service/BidService.java +++ b/src/main/java/com/dailyon/auctionservice/service/BidService.java @@ -8,6 +8,8 @@ import com.dailyon.auctionservice.dto.response.BidInfo; import com.dailyon.auctionservice.dto.response.ReadAuctionDetailResponse; import com.dailyon.auctionservice.dto.response.TopBidderResponse; +import com.dailyon.auctionservice.infra.kafka.AuctionEventProducer; +import com.dailyon.auctionservice.infra.kafka.dto.BiddingDTO; import com.dailyon.auctionservice.infra.sqs.AuctionSqsProducer; import com.dailyon.auctionservice.infra.sqs.dto.RawNotificationData; import com.dailyon.auctionservice.infra.sqs.dto.SQSNotificationDto; @@ -36,6 +38,7 @@ public class BidService { private final ProductClient productClient; private final AuctionSqsProducer auctionSqsProducer; private final AuctionHistoryRepository auctionHistoryRepository; + private final AuctionEventProducer eventProducer; public Mono create(CreateBidRequest request, String memberId) { BidHistory bidHistory = request.toEntity(memberId); @@ -94,8 +97,16 @@ private Mono saveSuccessfulBiddersHistory( tuple -> createAuctionHistories(auction, tuple.getT1(), tuple.getT2(), true)); return saveAuctionHistories(auctionHistories) - .then(auctionHistories.collectList()) - // .doOnNext() kafka메세지 들어갈자리 + .doOnNext( + auctionHistoryList -> { + for (AuctionHistory auctionHistory : auctionHistoryList) { + BiddingDTO biddingDTO = new BiddingDTO(); + biddingDTO.setMemberId(Long.valueOf(auctionHistory.getMemberId())); + biddingDTO.setAuctionId(auctionHistory.getAuctionId()); + biddingDTO.setUsePoints((long) (auctionHistory.getAuctionWinnerBid() * 0.05)); + eventProducer.createAuctionHistory(biddingDTO); + } + }) .then(); } @@ -147,7 +158,9 @@ private Flux createAuctionHistories( }); } - private Mono saveAuctionHistories(Flux auctionHistories) { - return Mono.when(auctionHistories.map(auctionHistoryRepository::save).collectList()); + private Mono> saveAuctionHistories(Flux auctionHistories) { + return auctionHistories + .collectList() + .flatMap(list -> Flux.fromIterable(list).map(auctionHistoryRepository::save).collectList()); } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 72f4267..3fce6bb 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -13,6 +13,23 @@ spring: host: redis-cluster port: 6379 password: 11111111 + kafka: + producer: + bootstrap-servers: kafka-service:9092 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # Leader Broker가 메시지를 받았는지 확인 신호 요청 + acks: 1 + consumer: + bootstrap-servers: kafka-service:9092 + # 컨슈머 그룹 지정 - 컨슈머 그룹안의 하나의 컨슈머가 다운되더라도 컨슈머 그룹 안의 다른 컨슈머가 읽을 수 있도록 함 또한 Offset으로 재시작시 메시지 관리 가능 + group-id: auction-service + # Kafka consumer가 다운되었을 때 가장 빠른 컨슈머 오프셋을 가지는 것 부터 읽는다. + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + ack-mode: manual eureka: client: register-with-eureka: true diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 2918efc..51f3605 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -17,6 +17,24 @@ spring: nodes: redis-cluster-0.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-1.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-2.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-3.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-4.redis-cluster.prod.svc.cluster.local:6379,redis-cluster-5.redis-cluster.prod.svc.cluster.local:6379 max-redirects: 3 password: + + kafka: + producer: + bootstrap-servers: kafka-service:9092 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + # Leader Broker가 메시지를 받았는지 확인 신호 요청 + acks: 1 + consumer: + bootstrap-servers: kafka-service:9092 + # 컨슈머 그룹 지정 - 컨슈머 그룹안의 하나의 컨슈머가 다운되더라도 컨슈머 그룹 안의 다른 컨슈머가 읽을 수 있도록 함 또한 Offset으로 재시작시 메시지 관리 가능 + group-id: auction-service + # Kafka consumer가 다운되었을 때 가장 빠른 컨슈머 오프셋을 가지는 것 부터 읽는다. + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + listener: + ack-mode: manual eureka: client: register-with-eureka: true From 90aeec36e188ec8bc6783119b27fd7982c7796a9 Mon Sep 17 00:00:00 2001 From: JunYong Sun Date: Mon, 22 Jan 2024 02:01:23 +0900 Subject: [PATCH 3/3] =?UTF-8?q?[TEST]=20yml=20kafka=20consumer=20group=20i?= =?UTF-8?q?d=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test/resources/application.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 58bb1cb..85f7029 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -7,7 +7,9 @@ spring: enabled: false discovery: enabled: false - + kafka: + consumer: + group-id: sjy logging: cloud: aws: