Skip to content

Commit

Permalink
Fixed remarks
Browse files Browse the repository at this point in the history
  • Loading branch information
Guitarheroua committed Jan 9, 2025
1 parent 439176d commit 78192e0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 41 deletions.
2 changes: 1 addition & 1 deletion engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (c *Controller) writeMessages(ctx context.Context) error {
if !ok {
return nil
}

if err := c.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limiter wait failed: %w", err)
}
Expand Down
88 changes: 48 additions & 40 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,49 +681,57 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
// - The number of messages processed matches the total messages sent.
// - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms.
func (s *WsControllerSuite) TestRateLimiter() {
totalMessages := 5 // Number of messages to simulate.
t := s.T()
totalMessages := 5 // Number of messages to simulate.

// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)

// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)

// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)

// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)

// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
close(controller.multiplexedStream)
}()

// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())
}).Return(nil).Times(totalMessages)

// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())

// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")

// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = 5 * time.Millisecond // Allow up to 5ms deviation.

// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit")
assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit")
}
close(controller.multiplexedStream)
}()

// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
msgCounter := 0
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())

// Extract the actual written message
actualMessage := args.Get(0).(map[string]interface{})
expectedMessage := map[string]interface{}{"message": msgCounter}
msgCounter++

assert.Equal(t, expectedMessage, actualMessage, "Received message does not match the expected message")
}).Return(nil).Times(totalMessages)

// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())

// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")

// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = float64(5 * time.Millisecond) // Allow up to 5ms deviation.

// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.InDelta(t, expectedDelay, delay, tolerance, "Messages should respect the rate limit")
}
}

// TestConfigureKeepaliveConnection ensures that the WebSocket connection is configured correctly.
Expand Down

0 comments on commit 78192e0

Please sign in to comment.