diff --git a/services/gossip/adapter/tcp/direct_transport_test.go b/services/gossip/adapter/tcp/direct_transport_test.go index 16c9c6f33..6a557c5d4 100644 --- a/services/gossip/adapter/tcp/direct_transport_test.go +++ b/services/gossip/adapter/tcp/direct_transport_test.go @@ -41,6 +41,8 @@ func TestDirectTransport_HandlesStartupWithEmptyPeerList(t *testing.T) { func TestDirectTransport_SupportsTopologyChangeInRuntime(t *testing.T) { with.Concurrency(t, func(ctx context.Context, harness *with.ConcurrencyHarness) { + harness.AllowErrorsMatching("failed sending gossip message") // because the test will send to node3 which is not in topology + node1 := aNode(ctx, harness.Logger) node2 := aNode(ctx, harness.Logger) node3 := aNode(ctx, harness.Logger) @@ -86,12 +88,16 @@ func TestDirectTransport_SupportsTopologyChangeInRuntime(t *testing.T) { node1.requireSendsSuccessfullyTo(t, ctx, node4) node1.requireSendsSuccessfullyTo(t, ctx, node2) - require.Error(t, node2.transport.Send(ctx, &adapter.TransportData{ + node2.listener.ExpectNotReceive() + + node2.transport.Send(ctx, &adapter.TransportData{ SenderNodeAddress: node2.address, RecipientMode: gossipmessages.RECIPIENT_LIST_MODE_LIST, RecipientNodeAddresses: []primitives.NodeAddress{node3.address}, Payloads: aMessage(), - }), "node 2 was able to send a message to node 3 which is no longer a part of its topology") + }) + + require.NoError(t, test.ConsistentlyVerify(test.EVENTUALLY_ADAPTER_TIMEOUT, node1.listener, node2.listener, node3.listener), "node 2 was able to send a message to node 3 which is no longer a part of its topology") }) } @@ -136,6 +142,45 @@ func TestDirectTransport_SupportsBroadcastTransmissions(t *testing.T) { }) } +func TestDirectTransport_FailsGracefullyIfMulticastFailedToSendToASingleRecipient(t *testing.T) { + with.Concurrency(t, func(ctx context.Context, harness *with.ConcurrencyHarness) { + harness.AllowErrorsMatching("failed sending gossip message") // because the test will send to an arbitrary recipient which is not in topology + + node1 := aNode(ctx, harness.Logger) + node2 := aNode(ctx, harness.Logger) + superviseAll(harness, node1, node2) + defer shutdownAll(ctx, node1, node2) + + waitForAllNodesToSatisfy(t, "server did not start", func(node *nodeHarness) bool { return node.transport.IsServerListening() }, node1, node2) + + firstTopology := aTopologyContaining(node1, node2) + node1.transport.UpdateTopology(ctx, firstTopology) + node2.transport.UpdateTopology(ctx, firstTopology) + + waitForAllNodesToSatisfy(t, + "expected all nodes to have peers added", + func(node *nodeHarness) bool { return len(node.transport.outgoingConnections.activeConnections) > 0 }, + node1, node2) + + waitForAllNodesToSatisfy(t, + "expected all outgoing queues to become enabled after topology change", + func(node *nodeHarness) bool { return node.transport.allOutgoingQueuesEnabled() }, + node1, node2) + + payloads := aMessage() + + node2.listener.ExpectReceive(payloads) + require.NoError(t, node1.transport.Send(ctx, &adapter.TransportData{ + SenderNodeAddress: node1.address, + RecipientMode: gossipmessages.RECIPIENT_LIST_MODE_LIST, + RecipientNodeAddresses: []primitives.NodeAddress{{0x1}, node2.address}, + Payloads: payloads, + })) + + require.NoError(t, test.EventuallyVerify(test.EVENTUALLY_ADAPTER_TIMEOUT, node2.listener), "message was not sent to target node") + }) +} + type nodeHarness struct { transport *DirectTransport address primitives.NodeAddress diff --git a/services/gossip/adapter/tcp/outgoing_connections.go b/services/gossip/adapter/tcp/outgoing_connections.go index b5d916472..0a40dad7b 100644 --- a/services/gossip/adapter/tcp/outgoing_connections.go +++ b/services/gossip/adapter/tcp/outgoing_connections.go @@ -152,7 +152,8 @@ func (c *outgoingConnections) send(ctx context.Context, data *adapter.TransportD client.addDataToOutgoingPeerQueue(ctx, data) c.metrics.messageSize.Record(int64(data.TotalSize())) } else { - return errors.Errorf("unknown recipient public key: %s", recipientPublicKey.String()) + err := errors.Errorf("unknown recipient public key: %s", recipientPublicKey.String()) + c.logger.Error("failed sending gossip message", log.Error(err), log.Stringable("recipient-public-key", recipientPublicKey)) } } return nil