From 837c3e1bbb7cd0a7ad678bf5343bbbff9e042c9f Mon Sep 17 00:00:00 2001 From: Vipul Bansal Date: Thu, 9 Jan 2025 13:17:03 +0000 Subject: [PATCH] [PLAT-16420]: Fix List CDC replication slot API Summary: Fixes list CDC replication slot api. There could be some CDC replication slot in deleting state which has their namespace in deleting state and if a namespace is in deleting state, it is not listed in namespace list api which shows no record. Made the changes to only populate the namespace in slot details only if it exists. Test Plan: added unit test Reviewers: asharma Reviewed By: asharma Subscribers: yugaware Differential Revision: https://phorge.dev.yugabyte.com/D41130 --- .../yw/common/cdc/CdcStreamManager.java | 12 ++++--- .../yw/common/cdc/CdcStreamManagerTest.java | 32 ++++++++++++++++--- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/managed/src/main/java/com/yugabyte/yw/common/cdc/CdcStreamManager.java b/managed/src/main/java/com/yugabyte/yw/common/cdc/CdcStreamManager.java index af4a2f038f73..fe87a5d35bb7 100644 --- a/managed/src/main/java/com/yugabyte/yw/common/cdc/CdcStreamManager.java +++ b/managed/src/main/java/com/yugabyte/yw/common/cdc/CdcStreamManager.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Optional; import javax.inject.Inject; import javax.inject.Singleton; import org.apache.commons.lang3.StringUtils; @@ -190,14 +191,17 @@ public CDCReplicationSlotResponse listReplicationSlot(Universe universe) throws if (namespaceList == null) { namespaceList = client.getNamespacesList().getNamespacesList(); } - NamespaceIdentifierPB namespaceIdentifier = + Optional namespaceIdentifierOptional = namespaceList.stream() .filter( namespace -> namespace.getId().toStringUtf8().equals(streamInfo.getNamespaceId())) - .findFirst() - .get(); - details.databaseName = namespaceIdentifier.getName(); + .findFirst(); + if (!namespaceIdentifierOptional.isEmpty()) { + details.databaseName = namespaceIdentifierOptional.get().getName(); + } else { + LOG.warn("Namespace not found for namespaceId='{}'", streamInfo.getNamespaceId()); + } result.replicationSlots.add(details); } return result; diff --git a/managed/src/test/java/com/yugabyte/yw/common/cdc/CdcStreamManagerTest.java b/managed/src/test/java/com/yugabyte/yw/common/cdc/CdcStreamManagerTest.java index 49e644fd0499..a3d8e8a96139 100644 --- a/managed/src/test/java/com/yugabyte/yw/common/cdc/CdcStreamManagerTest.java +++ b/managed/src/test/java/com/yugabyte/yw/common/cdc/CdcStreamManagerTest.java @@ -1,6 +1,7 @@ package com.yugabyte.yw.common.cdc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -125,6 +126,12 @@ public void testListReplicationSlot() throws Exception { when(mockStream2.getOptions()).thenReturn(options); when(mockStream2.getCdcsdkYsqlReplicationSlotName()).thenReturn(""); + CDCStreamInfo mockStream3 = mock(CDCStreamInfo.class); + when(mockStream3.getStreamId()).thenReturn(streamId); + when(mockStream3.getOptions()).thenReturn(options); + when(mockStream3.getNamespaceId()).thenReturn(UUID.randomUUID().toString()); + when(mockStream3.getCdcsdkYsqlReplicationSlotName()).thenReturn("test_slot_3"); + ListNamespacesResponse namespacesResponse = mock(ListNamespacesResponse.class); NamespaceIdentifierPB mockNamespaceIdentifierPB = mock(NamespaceIdentifierPB.class); when(mockNamespaceIdentifierPB.getId()).thenReturn(ByteString.copyFrom(namespaceId.getBytes())); @@ -142,6 +149,7 @@ public void testListReplicationSlot() throws Exception { { add(mockStream); add(mockStream2); + add(mockStream3); } }); when(mockClient.listCDCStreams(null, null, MasterReplicationOuterClass.IdTypePB.NAMESPACE_ID)) @@ -149,10 +157,24 @@ public void testListReplicationSlot() throws Exception { CDCReplicationSlotResponse r = streamManager.listReplicationSlot(mock(Universe.class)); - assertEquals(1, r.replicationSlots.size()); - assertEquals("test_database", r.getReplicationSlots().get(0).databaseName); - assertEquals("test_slot", r.getReplicationSlots().get(0).slotName); - assertEquals(streamId, r.getReplicationSlots().get(0).streamID); - assertEquals("ACTIVE", r.getReplicationSlots().get(0).state); + assertEquals(2, r.replicationSlots.size()); + CDCReplicationSlotResponse.CDCReplicationSlotDetails testSlot = + r.replicationSlots.stream() + .filter(slot -> slot.slotName.equals("test_slot")) + .findFirst() + .get(); + assertEquals("test_database", testSlot.databaseName); + assertEquals("test_slot", testSlot.slotName); + assertEquals(streamId, testSlot.streamID); + assertEquals("ACTIVE", testSlot.state); + CDCReplicationSlotResponse.CDCReplicationSlotDetails testSlot3 = + r.replicationSlots.stream() + .filter(slot -> slot.slotName.equals("test_slot_3")) + .findFirst() + .get(); + assertNull(testSlot3.databaseName); + assertEquals("test_slot_3", testSlot3.slotName); + assertEquals(streamId, testSlot3.streamID); + assertEquals("ACTIVE", testSlot3.state); } }