Skip to content

Commit

Permalink
Debug falky TestStreamForServer test
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Nov 6, 2023
1 parent 2be1555 commit 08555a9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
22 changes: 12 additions & 10 deletions connect_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,7 @@ func TestBidiStreamServerSendsFirstMessage(t *testing.T) {

func TestStreamForServer(t *testing.T) {
t.Parallel()
newPingClient := func(pingServer pingv1connect.PingServiceHandler) pingv1connect.PingServiceClient {
newPingClient := func(t *testing.T, pingServer pingv1connect.PingServiceHandler) pingv1connect.PingServiceClient {

Check failure on line 1513 in connect_ext_test.go

View workflow job for this annotation

GitHub Actions / ci (1.21.x)

test helper function should start from t.Helper() (thelper)
mux := http.NewServeMux()
mux.Handle(pingv1connect.NewPingServiceHandler(pingServer))
server := memhttptest.NewServer(t, mux)
Expand All @@ -1522,7 +1522,7 @@ func TestStreamForServer(t *testing.T) {
}
t.Run("not-proto-message", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
cumSum: func(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error {
return stream.Conn().Send("foobar")
},
Expand All @@ -1536,7 +1536,7 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("nil-message", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
cumSum: func(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error {
return stream.Send(nil)
},
Expand All @@ -1550,7 +1550,7 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("get-spec", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
cumSum: func(ctx context.Context, stream *connect.BidiStream[pingv1.CumSumRequest, pingv1.CumSumResponse]) error {
assert.Equal(t, stream.Spec().StreamType, connect.StreamTypeBidi)
assert.Equal(t, stream.Spec().Procedure, pingv1connect.PingServiceCumSumProcedure)
Expand All @@ -1564,7 +1564,7 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("server-stream", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
countUp: func(ctx context.Context, req *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error {
assert.Equal(t, stream.Conn().Spec().StreamType, connect.StreamTypeServer)
assert.Equal(t, stream.Conn().Spec().Procedure, pingv1connect.PingServiceCountUpProcedure)
Expand All @@ -1580,7 +1580,7 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("server-stream-send", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
countUp: func(ctx context.Context, req *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error {
assert.Nil(t, stream.Send(&pingv1.CountUpResponse{Number: 1}))
return nil
Expand All @@ -1596,7 +1596,7 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("server-stream-send-nil", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
countUp: func(ctx context.Context, req *connect.Request[pingv1.CountUpRequest], stream *connect.ServerStream[pingv1.CountUpResponse]) error {
stream.ResponseHeader().Set("foo", "bar")
stream.ResponseTrailer().Set("bas", "blah")
Expand All @@ -1617,7 +1617,7 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("client-stream", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) {
assert.Equal(t, stream.Spec().StreamType, connect.StreamTypeClient)
assert.Equal(t, stream.Spec().Procedure, pingv1connect.PingServiceSumProcedure)
Expand All @@ -1638,8 +1638,9 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("client-stream-conn", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) {
assert.True(t, stream.Receive())
assert.NotNil(t, stream.Conn().Send("not-proto"))
return connect.NewResponse(&pingv1.SumResponse{}), nil
},
Expand All @@ -1652,8 +1653,9 @@ func TestStreamForServer(t *testing.T) {
})
t.Run("client-stream-send-msg", func(t *testing.T) {
t.Parallel()
client := newPingClient(&pluggablePingServer{
client := newPingClient(t, &pluggablePingServer{
sum: func(ctx context.Context, stream *connect.ClientStream[pingv1.SumRequest]) (*connect.Response[pingv1.SumResponse], error) {
assert.True(t, stream.Receive())
assert.Nil(t, stream.Conn().Send(&pingv1.SumResponse{Sum: 2}))
return connect.NewResponse(&pingv1.SumResponse{}), nil
},
Expand Down
6 changes: 6 additions & 0 deletions duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func (d *duplexHTTPCall) Write(data []byte) (int, error) {
// reads from the other side.
bytesWritten, err := d.requestBodyWriter.Write(data)
if err != nil && errors.Is(err, io.ErrClosedPipe) {
// On sending headers we write a zero-length message to the server.
// If the server closes the connection without reading the body,
// we'll get an io.ErrClosedPipe. We can ignore this error.
if len(data) == 0 {
return 0, nil
}
// Signal that the stream is closed with the more-typical io.EOF instead of
// io.ErrClosedPipe. This makes it easier for protocol-specific wrappers to
// match grpc-go's behavior.
Expand Down

0 comments on commit 08555a9

Please sign in to comment.