Skip to content

Commit

Permalink
[PLAT-16420]: Fix List CDC replication slot API
Browse files Browse the repository at this point in the history
Summary:
Fixes list CDC replication slot api.
There could be some CDC replication slot in deleting state which has their namespace in deleting state
and if a namespace is in deleting state, it is not listed in namespace list api which shows no record.
Made the changes to only populate the namespace in slot details only if it exists.

Test Plan: added unit test

Reviewers: asharma

Reviewed By: asharma

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D41130
  • Loading branch information
vipul-yb committed Jan 13, 2025
1 parent 9d39b3c commit 837c3e1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -190,14 +191,17 @@ public CDCReplicationSlotResponse listReplicationSlot(Universe universe) throws
if (namespaceList == null) {
namespaceList = client.getNamespacesList().getNamespacesList();
}
NamespaceIdentifierPB namespaceIdentifier =
Optional<NamespaceIdentifierPB> namespaceIdentifierOptional =
namespaceList.stream()
.filter(
namespace ->
namespace.getId().toStringUtf8().equals(streamInfo.getNamespaceId()))
.findFirst()
.get();
details.databaseName = namespaceIdentifier.getName();
.findFirst();
if (!namespaceIdentifierOptional.isEmpty()) {
details.databaseName = namespaceIdentifierOptional.get().getName();
} else {
LOG.warn("Namespace not found for namespaceId='{}'", streamInfo.getNamespaceId());
}
result.replicationSlots.add(details);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.yugabyte.yw.common.cdc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -125,6 +126,12 @@ public void testListReplicationSlot() throws Exception {
when(mockStream2.getOptions()).thenReturn(options);
when(mockStream2.getCdcsdkYsqlReplicationSlotName()).thenReturn("");

CDCStreamInfo mockStream3 = mock(CDCStreamInfo.class);
when(mockStream3.getStreamId()).thenReturn(streamId);
when(mockStream3.getOptions()).thenReturn(options);
when(mockStream3.getNamespaceId()).thenReturn(UUID.randomUUID().toString());
when(mockStream3.getCdcsdkYsqlReplicationSlotName()).thenReturn("test_slot_3");

ListNamespacesResponse namespacesResponse = mock(ListNamespacesResponse.class);
NamespaceIdentifierPB mockNamespaceIdentifierPB = mock(NamespaceIdentifierPB.class);
when(mockNamespaceIdentifierPB.getId()).thenReturn(ByteString.copyFrom(namespaceId.getBytes()));
Expand All @@ -142,17 +149,32 @@ public void testListReplicationSlot() throws Exception {
{
add(mockStream);
add(mockStream2);
add(mockStream3);
}
});
when(mockClient.listCDCStreams(null, null, MasterReplicationOuterClass.IdTypePB.NAMESPACE_ID))
.thenReturn(response);

CDCReplicationSlotResponse r = streamManager.listReplicationSlot(mock(Universe.class));

assertEquals(1, r.replicationSlots.size());
assertEquals("test_database", r.getReplicationSlots().get(0).databaseName);
assertEquals("test_slot", r.getReplicationSlots().get(0).slotName);
assertEquals(streamId, r.getReplicationSlots().get(0).streamID);
assertEquals("ACTIVE", r.getReplicationSlots().get(0).state);
assertEquals(2, r.replicationSlots.size());
CDCReplicationSlotResponse.CDCReplicationSlotDetails testSlot =
r.replicationSlots.stream()
.filter(slot -> slot.slotName.equals("test_slot"))
.findFirst()
.get();
assertEquals("test_database", testSlot.databaseName);
assertEquals("test_slot", testSlot.slotName);
assertEquals(streamId, testSlot.streamID);
assertEquals("ACTIVE", testSlot.state);
CDCReplicationSlotResponse.CDCReplicationSlotDetails testSlot3 =
r.replicationSlots.stream()
.filter(slot -> slot.slotName.equals("test_slot_3"))
.findFirst()
.get();
assertNull(testSlot3.databaseName);
assertEquals("test_slot_3", testSlot3.slotName);
assertEquals(streamId, testSlot3.streamID);
assertEquals("ACTIVE", testSlot3.state);
}
}

0 comments on commit 837c3e1

Please sign in to comment.