Skip to content

Commit

Permalink
Merge branch 'master' into jord/bn2-epoch-config
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanschalm authored Jan 9, 2025
2 parents 3a5eb94 + c6a1f97 commit 2769e66
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 51 deletions.
54 changes: 4 additions & 50 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,51 +1,5 @@
# Collection Stream
/cmd/collection/** @jordanschalm
/engine/collection/** @jordanschalm
/protobuf/services/collection/** @jordanschalm
# Protocol owners are not generally differentiated by sub-area for simplicity
/** @jordanschalm @AlexHentschel @durkmurder @zhangchiqing @peterargue @Kay-Zee @janezpodhostnik @tarakby

# Consensus Stream
/cmd/consensus/** @AlexHentschel @durkmurder @jordanschalm
/engine/consensus/** @AlexHentschel @durkmurder @jordanschalm

# Execution Stream
/cmd/execution/** @zhangchiqing
/engine/execution/** @zhangchiqing

# Access Stream
/access/** @peterargue
/cmd/access/** @peterargue
/cmd/observer/** @peterargue
/engine/access/** @peterargue

# Verification Stream
/cmd/verification/** @zhangchiqing
/engine/verification/** @zhangchiqing
/integration/tests/verification @zhangchiqing

# Ledger Stream
/ledger/** @AlexHentschel

# FVM Stream
/fvm/** @janezpodhostnik

# Networking Stream
/network/** @Kay-Zee

# Cryptography Stream
/crypto/** @tarakby

# Bootstrap and transit scripts
/cmd/bootstrap/** @zhangchiqing

# Dev Tools Stream
.github/workflows/** @Kay-Zee
/insecure/** @Kay-Zee
/integration/benchnet2/** @Kay-Zee
/tools/test_monitor/** @Kay-Zee

# Performance Stream
/integration/benchmark/** @Kay-Zee

# Execution Sync
/module/executiondatasync/** @peterargue
/module/state_synchronization/** @peterargue
# FVM specific
/fvm/** @janezpodhostnik @zhangchiqing
8 changes: 8 additions & 0 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -130,6 +132,7 @@ type Controller struct {
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
dataProvidersGroup *sync.WaitGroup
limiter *rate.Limiter
}

func NewWebSocketController(
Expand All @@ -146,6 +149,7 @@ func NewWebSocketController(
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProviderFactory: dataProviderFactory,
dataProvidersGroup: &sync.WaitGroup{},
limiter: rate.NewLimiter(rate.Limit(config.MaxResponsesPerSecond), 1),
}
}

Expand Down Expand Up @@ -265,6 +269,10 @@ func (c *Controller) writeMessages(ctx context.Context) error {
return nil
}

if err := c.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limiter wait failed: %w", err)
}

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
Expand Down
66 changes: 66 additions & 0 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,72 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
})
}

// TestRateLimiter tests the rate-limiting functionality of the WebSocket controller.
//
// Test Steps:
// 1. Create a mock WebSocket connection with behavior for `SetWriteDeadline` and `WriteJSON`.
// 2. Configure the WebSocket controller with a rate limit of 2 responses per second.
// 3. Simulate sending messages to the `multiplexedStream` channel.
// 4. Collect timestamps of message writes to verify rate-limiting behavior.
// 5. Assert that all messages are processed and that the delay between messages respects the configured rate limit.
//
// The test ensures that:
// - The number of messages processed matches the total messages sent.
// - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms.
func (s *WsControllerSuite) TestRateLimiter() {
t := s.T()
totalMessages := 5 // Number of messages to simulate.

// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)

// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)

// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
}
close(controller.multiplexedStream)
}()

// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
msgCounter := 0
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())

// Extract the actual written message
actualMessage := args.Get(0).(map[string]interface{})
expectedMessage := map[string]interface{}{"message": msgCounter}
msgCounter++

assert.Equal(t, expectedMessage, actualMessage, "Received message does not match the expected message")
}).Return(nil).Times(totalMessages)

// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())

// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")

// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = float64(5 * time.Millisecond) // Allow up to 5ms deviation.

// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.InDelta(t, expectedDelay, delay, tolerance, "Messages should respect the rate limit")
}
}

// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.
func (s *WsControllerSuite) TestConfigureKeepaliveConnection() {
s.T().Run("Happy path", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -223,7 +224,7 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr
// Create a mock subscription and mock the channel
sub := ssmock.NewSubscription(s.T())
sub.On("Channel").Return((<-chan interface{})(txStatusesChan))
sub.On("Err").Return(nil)
sub.On("Err").Return(nil).Once()

s.api.On(
"SubscribeTransactionStatusesFromStartBlockID",
Expand Down Expand Up @@ -255,7 +256,9 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr
defer provider.Close()

// Run the provider in a separate goroutine to simulate subscription processing
done := make(chan struct{})
go func() {
defer close(done)
err = provider.Run()
s.Require().NoError(err)
}()
Expand All @@ -282,6 +285,9 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr
responses = append(responses, txStatusesRes)
}

// Wait for the provider goroutine to finish
unittest.RequireCloseBefore(s.T(), done, time.Second, "provider failed to stop")

// Verifying that indices are starting from 0
s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0")

Expand Down

0 comments on commit 2769e66

Please sign in to comment.