From 7e98ebf88cf1aeea7759f077b9333ff9a784a6c3 Mon Sep 17 00:00:00 2001 From: Carlos Barcenilla Date: Sun, 8 Dec 2024 19:18:33 +0100 Subject: [PATCH] Fix: docker exec hangs indefinitely when reading from stdin This commit fixes the improper handling of half-closed connection behavior when attaching stdin to a container. The proper approach requires: - The docker client receives a half-close of the connection from client to daemon (stdin) - The connection from daemon to client (stdout, stderr) remains open until no more data is to be received The first part of the issue is resolved by enabling MessageMode on Windows named pipes. This ensures that when stdin ends, an EOF is received on the reader, allowing proper half-closing of the connection. The previous implementation used stdlib's httputil.DockerProxy to proxy from Windows named pipe to WSL hvsock. This proxy does not use CloseWrite on hijacked connections, preventing proper handling of this use case. By using Close() instead of CloseWrite(), both connection directions are closed simultaneously, causing the client to miss pending stdout/stderr content after stdin EOF. This commit introduces a simpler custom `util.ReverseProxy` to replace httputil.DockerProxy, implementing proper handling of half-closed connections with explicit support for: - HTTP protocol upgrades - Half-close TCP connection management - Precise stream handling for Docker API interactions Closes #2094 Signed-off-by: Carlos Barcenilla --- .../pkg/dockerproxy/platform/serve_windows.go | 7 +- src/go/wsl-helper/pkg/dockerproxy/serve.go | 15 +- src/go/wsl-helper/pkg/dockerproxy/start.go | 16 +- .../wsl-helper/pkg/dockerproxy/util/pipe.go | 44 ++-- .../pkg/dockerproxy/util/pipe_test.go | 131 +++++++--- .../pkg/dockerproxy/util/reverse_proxy.go | 247 ++++++++++++++++++ 6 files changed, 398 insertions(+), 62 deletions(-) create mode 100644 src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go diff --git a/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go b/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go index c3592ec5e8c..45ef00e29e6 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go +++ b/src/go/wsl-helper/pkg/dockerproxy/platform/serve_windows.go @@ -81,7 +81,12 @@ func Listen(endpoint string) (net.Listener, error) { return nil, fmt.Errorf("endpoint %s does not start with protocol %s", endpoint, prefix) } - listener, err := winio.ListenPipe(endpoint[len(prefix):], nil) + // Configure pipe in MessageMode to support Docker's half-close semantics + // - Enables zero-byte writes as EOF signals (CloseWrite) + // - Crucial for stdin stream termination in interactive containers + pipeConfig := &winio.PipeConfig{MessageMode: true} + + listener, err := winio.ListenPipe(endpoint[len(prefix):], pipeConfig) if err != nil { return nil, fmt.Errorf("could not listen on %s: %w", endpoint, err) } diff --git a/src/go/wsl-helper/pkg/dockerproxy/serve.go b/src/go/wsl-helper/pkg/dockerproxy/serve.go index 287a9c950f8..79e1bdae885 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/serve.go +++ b/src/go/wsl-helper/pkg/dockerproxy/serve.go @@ -23,10 +23,8 @@ import ( "encoding/json" "fmt" "io" - "log" "net" "net/http" - "net/http/httputil" "os" "os/signal" "regexp" @@ -38,6 +36,7 @@ import ( "github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/models" "github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/platform" + "github.com/rancher-sandbox/rancher-desktop/src/go/wsl-helper/pkg/dockerproxy/util" ) // RequestContextValue contains things we attach to incoming requests @@ -74,7 +73,10 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error { logWriter := logrus.StandardLogger().Writer() defer logWriter.Close() munger := newRequestMunger() - proxy := &httputil.ReverseProxy{ + proxy := &util.ReverseProxy{ + Dial: func(string, string) (net.Conn, error) { + return dialer() + }, Director: func(req *http.Request) { logrus.WithField("request", req). WithField("headers", req.Header). @@ -96,12 +98,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error { Error("could not munge request") } }, - Transport: &http.Transport{ - Dial: func(string, string) (net.Conn, error) { - return dialer() - }, - DisableCompression: true, // for debugging - }, ModifyResponse: func(resp *http.Response) error { logEntry := logrus.WithField("response", resp) defer func() { logEntry.Debug("got backend response") }() @@ -124,7 +120,6 @@ func Serve(endpoint string, dialer func() (net.Conn, error)) error { } return nil }, - ErrorLog: log.New(logWriter, "", 0), } server := &http.Server{ diff --git a/src/go/wsl-helper/pkg/dockerproxy/start.go b/src/go/wsl-helper/pkg/dockerproxy/start.go index bd084c3aaa7..68ac9a5eb34 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/start.go +++ b/src/go/wsl-helper/pkg/dockerproxy/start.go @@ -177,7 +177,21 @@ func handleConnection(conn net.Conn, dockerPath string) { return } defer dockerConn.Close() - err = util.Pipe(conn, dockerConn) + + // Cast backend and client connections to HalfReadWriteCloser + var xConn util.HalfReadWriteCloser + var xDockerConn util.HalfReadWriteCloser + if x, ok := conn.(util.HalfReadWriteCloser); !ok { + panic("client connection does not implement HalfReadCloseWriter") + } else { + xConn = x + } + if x, ok := dockerConn.(util.HalfReadWriteCloser); !ok { + panic("daemon connection does not implement HalfReadCloseWriter") + } else { + xDockerConn = x + } + err = util.Pipe(xConn, xDockerConn) if err != nil { logrus.Errorf("error forwarding docker connection: %s", err) return diff --git a/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go b/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go index f05a261cdff..aee7ded749b 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go +++ b/src/go/wsl-helper/pkg/dockerproxy/util/pipe.go @@ -17,11 +17,11 @@ limitations under the License. package util import ( + "fmt" "io" ) -// Pipe bidirectionally between two streams. -func Pipe(c1, c2 io.ReadWriteCloser) error { +func Pipe(c1, c2 HalfReadWriteCloser) error { ioCopy := func(reader io.Reader, writer io.Writer) <-chan error { ch := make(chan error) go func() { @@ -33,22 +33,32 @@ func Pipe(c1, c2 io.ReadWriteCloser) error { ch1 := ioCopy(c1, c2) ch2 := ioCopy(c2, c1) - select { - case err := <-ch1: - c1.Close() - c2.Close() - <-ch2 - if err != io.EOF { - return err - } - case err := <-ch2: - c1.Close() - c2.Close() - <-ch1 - if err != io.EOF { - return err + for i := 0; i < 2; i++ { + select { + case err := <-ch1: + cwErr := c2.CloseWrite() + if cwErr != nil { + return fmt.Errorf("error closing write end of c2: %w", cwErr) + } + if err != nil && err != io.EOF { + return err + } + case err := <-ch2: + cwErr := c1.CloseWrite() + if cwErr != nil { + return fmt.Errorf("error closing write end of c1: %w", cwErr) + } + if err != nil && err != io.EOF { + return err + } } } - return nil } + +type HalfReadWriteCloser interface { + // CloseWrite closes the write-side of the connection. + CloseWrite() error + // Write is a passthrough to the underlying connection. + io.ReadWriteCloser +} diff --git a/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go b/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go index ab9bda1e8e3..a0e4c5b96c8 100644 --- a/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go +++ b/src/go/wsl-helper/pkg/dockerproxy/util/pipe_test.go @@ -18,55 +18,120 @@ package util import ( "bytes" - "errors" "io" + "sync" "testing" "github.com/stretchr/testify/assert" ) -type nopReadWriteCloser struct { - io.ReadWriter +// bidirectionalHalfClosePipe is a testing utility that simulates a bidirectional pipe +// with the ability to half-close connections. It's designed to mimic scenarios +// like interactive command-line operations where a client can send data and +// then half-close the connection while waiting for a response. +type bidirectionalHalfClosePipe struct { + r io.ReadCloser + w io.WriteCloser } -func (nopReadWriteCloser) Close() error { - return nil +// newBidirectionalHalfClosePipe creates two interconnected bidirectional pipe endpoints. +// +// The function returns two bidirectionalHalfClosePipe instances that are connected +// such that what is written to one's write endpoint can be read from the other's +// read endpoint, and vice versa. +// +// Returns: +// - h1: First bidirectional pipe endpoint +// - h2: Second bidirectional pipe endpoint +func newBidirectionalHalfClosePipe() (h1, h2 *bidirectionalHalfClosePipe) { + pr1, pw1 := io.Pipe() + pr2, pw2 := io.Pipe() + + h1 = &bidirectionalHalfClosePipe{ + r: pr1, w: pw2, + } + + h2 = &bidirectionalHalfClosePipe{ + r: pr2, w: pw1, + } + return } -type passthroughReadWriteCloser struct { - io.ReadCloser - io.WriteCloser +func (h *bidirectionalHalfClosePipe) CloseWrite() error { + return h.w.Close() } -func newPipeReadWriter() io.ReadWriteCloser { - r, w := io.Pipe() - return &passthroughReadWriteCloser{ - ReadCloser: r, - WriteCloser: w, +func (h *bidirectionalHalfClosePipe) Close() error { + wErr := h.w.Close() + rErr := h.r.Close() + + if wErr != nil { + return wErr } + return rErr } -func (p *passthroughReadWriteCloser) Close() error { - err := p.ReadCloser.Close() - if err != nil && !errors.Is(err, io.ErrClosedPipe) { - return err - } - err = p.WriteCloser.Close() - if err != nil && !errors.Is(err, io.ErrClosedPipe) { - return err - } - return nil +func (h *bidirectionalHalfClosePipe) Read(p []byte) (n int, err error) { + return h.r.Read(p) +} + +func (h *bidirectionalHalfClosePipe) Write(p []byte) (n int, err error) { + return h.w.Write(p) } +// TestPipe verifies the functionality of the bidirectional pipe utility. +// +// The test simulates a scenario similar to interactive command execution, +// such as a docker run -i command, which requires bidirectional communication. +// This test case mimics scenarios like: +// - Sending input to a Docker container via stdin +// - Half-closing the input stream +// - Receiving output from the container +// +// The test steps are: +// 1. A client sends data +// 2. The client half-closes the connection +// 3. The server reads the data +// 4. The server sends a return response +// 5. The server half-closes the connection +// +// This approach is particularly relevant for interactive Docker runs where +// the client needs to send input and then wait for the container's response, +// while maintaining the ability to close streams independently. func TestPipe(t *testing.T) { - rw := newPipeReadWriter() - output := bytes.Buffer{} - data := &passthroughReadWriteCloser{ - ReadCloser: nopReadWriteCloser{bytes.NewBufferString("some data")}, - WriteCloser: nopReadWriteCloser{&output}, - } - err := Pipe(rw, data) - if assert.NoError(t, err) { - assert.Equal(t, "some data", output.String()) - } + h1a, h1b := newBidirectionalHalfClosePipe() + h2a, h2b := newBidirectionalHalfClosePipe() + var wg sync.WaitGroup + wg.Add(2) + + // Goroutine simulating the client-side operation + go func() { + defer wg.Done() + dataToSend := bytes.NewBufferString("some data") + _, err := h1a.Write(dataToSend.Bytes()) + assert.NoError(t, err) + h1a.CloseWrite() + + output, err := io.ReadAll(h1a) + assert.NoError(t, err) + assert.EqualValues(t, output, "return data") + }() + + // Goroutine simulating the server-side operation + go func() { + defer wg.Done() + output, err := io.ReadAll(h2b) + assert.NoError(t, err) + assert.EqualValues(t, output, "some data") + + dataToSend := bytes.NewBufferString("return data") + _, err = h2b.Write(dataToSend.Bytes()) + assert.NoError(t, err) + + h2b.CloseWrite() + }() + + err := Pipe(h1b, h2a) + assert.NoError(t, err) + wg.Wait() } diff --git a/src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go b/src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go new file mode 100644 index 00000000000..795d13d7b16 --- /dev/null +++ b/src/go/wsl-helper/pkg/dockerproxy/util/reverse_proxy.go @@ -0,0 +1,247 @@ +package util + +import ( + "bufio" + "context" + "io" + "log" + "net" + "net/http" + "time" +) + +const ( + hostHeaderValue = "api.moby.localhost" + targetProtocol = "http://" +) + +// ReverseProxy is a custom reverse proxy specifically designed for Rancher Desktop's +// Docker API communication. Unlike the standard library's ReverseProxy, this +// implementation provides explicit support for half-close connections and +// HTTP protocol upgrades required by the Docker API. +// +// Key design features: +// - Handles HTTP protocol upgrades (WebSocket-like connections) +// - Supports half-close TCP connections +// - Provides hooks for request/response modification +// - Designed for specific Docker API interaction requirements +type ReverseProxy struct { + // Dial provides a custom connection establishment method + Dial func(network, addr string) (net.Conn, error) + // Director allows modification of the outgoing request before forwarding + Director func(*http.Request) + // ModifyResponse enables post-processing of the backend response + ModifyResponse func(*http.Response) error +} + +// ServeHTTP implements the http.Handler interface, routing incoming +// HTTP requests through the custom reverse proxy +func (proxy ReverseProxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + proxy.forwardRequest(rw, r) +} + +// forwardRequest is the core method that handles request proxying, +// with special handling for Docker API-specific requirements. +// +// Primary responsibilities: +// - Establish backend connection +// - Forward request to backend +// - Handle response streaming +// - Support protocol upgrades +// - Ensure proper connection management +func (proxy *ReverseProxy) forwardRequest(w http.ResponseWriter, r *http.Request) { + // periodicHttpFlush is a critical component for supporting + // long-running, streaming connections like "docker log -f" + periodicHttpFlush := func(w http.ResponseWriter, ctx context.Context) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + // Validate flushing capability of the ResponseWriter + flusher, ok := w.(http.Flusher) + if !ok { + log.Println("error: ResponseWriter does not support http.Flusher") + return + } + + // Continuous flushing loop with context-aware cancellation + for { + select { + case <-ctx.Done(): + // Context cancellation stops the flushing + return + case <-ticker.C: + select { + case <-ctx.Done(): + return + default: + flusher.Flush() + } + } + } + } + + // Leverage the original request's context as the base + ctx := r.Context() + + // Create a new context with cancellation to ensure we can stop the flush + // The context will be canceled when the request is done or if needed earlier + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Establish a connection to the backend using a custom Dial method + backendConn, err := proxy.Dial("", "") + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer backendConn.Close() + + // Create a new HTTP request with the same headers + url := targetProtocol + hostHeaderValue + r.RequestURI + newReq, err := http.NewRequestWithContext(ctx, r.Method, url, r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + newReq.Header = r.Header + + // Director function + // Allows complete customization of the outgoing request + if proxy.Director != nil { + proxy.Director(newReq) + } + // Prevent automatic connection closure + newReq.Close = false + + // Forward the modified request to the backend + err = newReq.Write(backendConn) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + + // Read the response from the backend + backendResponse, err := http.ReadResponse(bufio.NewReader(backendConn), newReq) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer backendResponse.Body.Close() + + // ModifyResponse function + // Allows post-processing of the backend response + if proxy.ModifyResponse != nil { + err := proxy.ModifyResponse(backendResponse) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + } + + // Propagate backend response headers to the client + for key, values := range backendResponse.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + + // Write the response status code and headers and flush it immediately + w.WriteHeader(backendResponse.StatusCode) + flusher, ok := w.(http.Flusher) + if !ok { + panic("expected http.ResponseWriter to be an http.Flusher") + } + flusher.Flush() + + // Check if the response has a status code of 101 (Switching Protocols) + if backendResponse.StatusCode == http.StatusSwitchingProtocols { + proxy.handleUpgradedConnection(w, backendConn) + return + } + + // Start periodic flushing in a background goroutine + // Supports long-running, streaming responses + go periodicHttpFlush(w, ctx) + + // Stream the response body back to the client + _, err = io.Copy(w, backendResponse.Body) + if err != nil { + return + } +} + +// handleUpgradedConnection manages HTTP protocol upgrades (e.g., WebSocket), +// specifically tailored for Docker API's hijacking mechanism. +// +// This method: +// - Hijacks the existing connection +// - Manages buffered data +// - Enables bidirectional communication after protocol upgrade +func (*ReverseProxy) handleUpgradedConnection(w http.ResponseWriter, backendConn net.Conn) { + // Cast writer to safely hijack the connection + hijacker, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "client response writer does not support http.Hijacker", http.StatusInternalServerError) + return + } + + // Hijack attempts to take control of the underlying connection + // Returns: + // - clientConn: The raw network connection + // - bufferedClientConn: A buffered reader/writer for any pending data + clientConn, bufferedClientConn, err := hijacker.Hijack() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer clientConn.Close() + + // Flush any buffered data in the writer to ensure no data is lost + if bufferedClientConn.Writer.Buffered() > 0 { + if err := bufferedClientConn.Writer.Flush(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // Process any data already buffered in the reader before full duplex communication + // This prevents losing any data that might have been read but not yet processed + if bufferedLen := bufferedClientConn.Reader.Buffered(); bufferedLen > 0 { + bufferedData := make([]byte, bufferedLen) + _, err := bufferedClientConn.Reader.Read(bufferedData) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, err = backendConn.Write(bufferedData) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // Cast backend and client connections to HalfReadWriteCloser + var xBackendConn HalfReadWriteCloser + var xClientConn HalfReadWriteCloser + if x, ok := backendConn.(HalfReadWriteCloser); !ok { + http.Error(w, "backend connection does not implement HalfReadCloseWriter", http.StatusInternalServerError) + return + } else { + xBackendConn = x + } + if x, ok := clientConn.(HalfReadWriteCloser); !ok { + http.Error(w, "client connection does not implement HalfReadCloseWriter", http.StatusInternalServerError) + return + } else { + xClientConn = x + } + + // Establish a bidirectional pipe between client and backend connections + // This allows full-duplex communication with support for half-closes + // Critical for Docker API's stream-based communication model + err = Pipe(xClientConn, xBackendConn) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +}