Skip to content

Commit

Permalink
Cleanup docs
Browse files Browse the repository at this point in the history
  • Loading branch information
emcfarlane committed Oct 31, 2023
1 parent 7ac9679 commit 74ccbc6
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 24 deletions.
13 changes: 6 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,20 @@ func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamFo
if c.err != nil {
return &ClientStreamForClient[Req, Res]{err: c.err}
}
return &ClientStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeClient, nil /* header */, nil /* onRequestSend */)}
return &ClientStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeClient, nil /* onRequestSend */)}
}

// CallServerStream calls a server streaming procedure.
func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Request[Req]) (*ServerStreamForClient[Res], error) {
if c.err != nil {
return nil, c.err
}
conn := c.newConn(ctx, StreamTypeServer, request.header, func(r *http.Request) {
conn := c.newConn(ctx, StreamTypeServer, func(r *http.Request) {
request.method = r.Method
})
request.spec = conn.Spec()
request.peer = conn.Peer()
mergeHeaders(conn.RequestHeader(), request.header)
// Send always returns an io.EOF unless the error is from the client-side.
// We want the user to continue to call Receive in those cases to get the
// full error from the server-side.
Expand All @@ -167,14 +168,12 @@ func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForCli
if c.err != nil {
return &BidiStreamForClient[Req, Res]{err: c.err}
}
return &BidiStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeBidi, nil /* header */, nil /* onRequestSend */)}
return &BidiStreamForClient[Req, Res]{conn: c.newConn(ctx, StreamTypeBidi, nil /* onRequestSend */)}
}

func (c *Client[Req, Res]) newConn(ctx context.Context, streamType StreamType, header http.Header, onRequestSend func(r *http.Request)) StreamingClientConn {
if header == nil {
header = make(http.Header, 8) // arbitrary power of two, prevent immediate resizing
}
func (c *Client[Req, Res]) newConn(ctx context.Context, streamType StreamType, onRequestSend func(r *http.Request)) StreamingClientConn {
newConn := func(ctx context.Context, spec Spec) StreamingClientConn {
header := make(http.Header, 8) // arbitrary power of two, prevent immediate resizing
c.protocolClient.WriteRequestHeader(streamType, header)
conn := c.protocolClient.NewConn(ctx, spec, header)
conn.onRequestSend(onRequestSend)
Expand Down
2 changes: 1 addition & 1 deletion client_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestClientPeer(t *testing.T) {
err = clientStream.Send(&pingv1.SumRequest{})
assert.Nil(t, err)
// server streaming
serverStream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{Number: 1}))
serverStream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{}))
t.Cleanup(func() {
assert.Nil(t, serverStream.Close())
})
Expand Down
25 changes: 10 additions & 15 deletions http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,17 @@ import (
"sync/atomic"
)

// httpCall is a full-duplex stream between the client and server. The
// request body is the stream from client to server, and the response body is
// the reverse.
//
// Be warned: we need to use some lesser-known APIs to do this with net/http.
// httpCall builds a HTTP request and sends it to a servcer. Unary calls block
// until the response is received. Streaming calls return immediately.
type httpCall struct {
ctx context.Context
client HTTPClient
streamType StreamType
onRequestSend func(*http.Request)
validateResponse func(*http.Response) *Error

// We'll use a pipe as the request body. We hand the read side of the pipe to
// net/http, and we write to the write side (naturally). The two ends are
// safe to use concurrently.
// io.Pipe is used to implement the request body for client streaming calls.
// If the request is unary, this is nil.
requestBodyWriter *io.PipeWriter

requestSent atomic.Bool
Expand Down Expand Up @@ -229,7 +225,7 @@ func (c *httpCall) sendUnary(buffer *bytes.Buffer) error {
// from Send to ensure the buffer can safely be reused.
defer payload.Wait()
}
c.makeRequest() // blocks until the response is ready
c.makeRequest() // Blocks until the response is ready
return nil // Only report response errors on Read
}

Expand Down Expand Up @@ -325,8 +321,9 @@ func cloneURL(oldURL *url.URL) *url.URL {
}

// payloadCloser is a ReadCloser that wraps a bytes.Buffer. It's used to
// implement the request body for unary calls. On full reads or Close, it
// signals that the payload has been fully read.
// implement the request body for unary calls. To safely reuse the buffer
// call Wait after the response is received to ensure the payload has been
// fully read.
type payloadCloser struct {
wait sync.WaitGroup

Expand All @@ -340,7 +337,7 @@ func newPayloadCloser(buf *bytes.Buffer) *payloadCloser {
payload := &payloadCloser{
buf: buf,
}
payload.wait.Add(1)
payload.wait.Add(1) // Wait until complete is called
return payload
}

Expand Down Expand Up @@ -370,9 +367,7 @@ func (p *payloadCloser) Close() error {
}

// Rewind resets the payload to the beginning. It returns false if the buffer
// has been discarded.
// Note: it should not be possible for GetBody to be called after the response
// is received.
// has been discarded from a previous call to Wait.
func (p *payloadCloser) Rewind() bool {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion protocol_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestConnectEndOfResponseCanonicalTrailers(t *testing.T) {
},
}
src := &buffer
err = unmarshaler.Unmarshal(nil, src) // parameter won't be used
err = unmarshaler.Unmarshal(nil /* message unused */, src)
assert.ErrorIs(t, err, errSpecialEnvelope)
assert.Equal(t, unmarshaler.Trailer().Values("Not-Canonical-Header"), []string{"a"})
assert.Equal(t, unmarshaler.Trailer().Values("Mixed-Canonical"), []string{"b", "b"})
Expand Down

0 comments on commit 74ccbc6

Please sign in to comment.