From 2da268f7282693f476252f8d15fc35ad43bdd72e Mon Sep 17 00:00:00 2001 From: LiuBo Date: Sat, 14 Sep 2024 09:12:27 +0800 Subject: [PATCH] [bug] proxy: session cannot be transferred after load data local infile (#18787) In the case of "load data local infile" statement, client sends the first packet, then server sends response, which is "0xFB + filename", after that, client sends content of filename and an empty packet, at last, server sends OK packet. The sequence ID of this OK packet is not 1, and will cause the session cannot be transferred after this stmt finished. So, the solution is: when server sends 0xFB and the sequence ID of next packet is 3 bigger than last one, the next packet MUST be an OK packet, and the transfer is allowed. Approved by: @aylei, @sukki37, @zhangxu19830126 --- pkg/proxy/tunnel.go | 37 +++++++++++++++---- pkg/proxy/tunnel_test.go | 80 ++++++++++++++++++++++++++++++---------- pkg/proxy/util.go | 9 +++++ pkg/proxy/util_test.go | 14 +++++++ 4 files changed, 112 insertions(+), 28 deletions(-) diff --git a/pkg/proxy/tunnel.go b/pkg/proxy/tunnel.go index c138056d2ed04..8a312f63c79c3 100644 --- a/pkg/proxy/tunnel.go +++ b/pkg/proxy/tunnel.go @@ -520,6 +520,9 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) { p.mu.started = false p.mu.cond.Broadcast() } + + var firstCond bool + var currSeq int16 var lastSeq int16 = -1 var rotated bool prepareNextMessage := func() (terminate bool, err error) { @@ -557,8 +560,6 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) { // set txn status and cmd time within the mutex together. // only server->client pipe need to set the txn status. if p.name == pipeServerToClient { - var currSeq int16 - // issue#16042 if len(tempBuf) > 3 { currSeq = int16(tempBuf[3]) @@ -576,7 +577,27 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) { rotated = false } - inTxn, ok := checkTxnStatus(tempBuf) + // seqID is mainly used for server side. It records the sequence ID of + // each packet. + // In the case of "load data local infile" statement, client sends the + // first packet, then server sends response, which is "0xFB + filename", + // after that, client sends content of filename and an empty packet, at + // last, server sends OK packet. The sequence ID of this OK packet is not + // 1, and will cause the session cannot be transferred after this stmt + // finished. + // So, the solution is: when server sends 0xFB and the sequence ID of + // next packet is 3 bigger than last one, the next packet MUST be an + // OK packet, and the transfer is allowed. + // Related issue: https://github.com/matrixorigin/mo-cloud/issues/4088 + var mustOK bool + if !firstCond { + firstCond = isLoadDataLocalInfileRespPacket(tempBuf) + } else { + mustOK = currSeq-lastSeq == 3 + firstCond = false + } + + inTxn, ok := checkTxnStatus(tempBuf, mustOK) if ok { p.mu.inTxn = inTxn } @@ -721,10 +742,10 @@ func txnStatus(status uint16) bool { } // handleOKPacket handles the OK packet from server to update the txn state. -func handleOKPacket(msg []byte) bool { +func handleOKPacket(msg []byte, mustOK bool) bool { var mp *frontend.MysqlProtocolImpl - // the sequence ID should be 1 for OK packet. - if msg[3] != 1 { + // if the mustOK is false, then the sequence ID should be 1 for OK packet. + if !mustOK && msg[3] != 1 { return txnStatus(0) } pos := 5 @@ -754,14 +775,14 @@ func handleEOFPacket(msg []byte) bool { // the first return value is the txn status, and the second return value // indicates if we can get the txn status from the packet. If it is a ERROR // packet, the second return value is false. -func checkTxnStatus(msg []byte) (bool, bool) { +func checkTxnStatus(msg []byte, mustOK bool) (bool, bool) { ok := true inTxn := true // For the server->client pipe, we get the transaction status from the // OK and EOF packet, which is used in connection transfer. If the session // is in a transaction, a transfer should not start. if isOKPacket(msg) { - inTxn = handleOKPacket(msg) + inTxn = handleOKPacket(msg, mustOK) } else if isEOFPacket(msg) { inTxn = handleEOFPacket(msg) } else if isErrPacket(msg) { diff --git a/pkg/proxy/tunnel_test.go b/pkg/proxy/tunnel_test.go index 323b859746134..61ff6846d4194 100644 --- a/pkg/proxy/tunnel_test.go +++ b/pkg/proxy/tunnel_test.go @@ -691,24 +691,64 @@ func TestReplaceServerConn(t *testing.T) { } func TestCheckTxnStatus(t *testing.T) { - inTxn, ok := checkTxnStatus(nil) - require.True(t, ok) - require.True(t, inTxn) - - inTxn, ok = checkTxnStatus(makeErrPacket(8)) - require.False(t, ok) - require.True(t, inTxn) - - p1 := makeOKPacket(5) - value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED - binary.LittleEndian.PutUint16(p1[7:], value) - inTxn, ok = checkTxnStatus(p1) - require.True(t, ok) - require.False(t, inTxn) - - value |= frontend.SERVER_STATUS_IN_TRANS - binary.LittleEndian.PutUint16(p1[7:], value) - inTxn, ok = checkTxnStatus(p1) - require.True(t, ok) - require.True(t, inTxn) + t.Run("mustOK false", func(t *testing.T) { + inTxn, ok := checkTxnStatus(nil, false) + require.True(t, ok) + require.True(t, inTxn) + + inTxn, ok = checkTxnStatus(makeErrPacket(8), false) + require.False(t, ok) + require.True(t, inTxn) + + p1 := makeOKPacket(5) + value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, false) + require.True(t, ok) + require.False(t, inTxn) + + value |= frontend.SERVER_STATUS_IN_TRANS + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, false) + require.True(t, ok) + require.True(t, inTxn) + }) + + t.Run("mustOK true", func(t *testing.T) { + inTxn, ok := checkTxnStatus(nil, true) + require.True(t, ok) + require.True(t, inTxn) + + inTxn, ok = checkTxnStatus(makeErrPacket(8), true) + require.False(t, ok) + require.True(t, inTxn) + + p1 := makeOKPacket(5) + value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.False(t, inTxn) + + value |= frontend.SERVER_STATUS_IN_TRANS + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.True(t, inTxn) + + value ^= frontend.SERVER_STATUS_IN_TRANS + binary.LittleEndian.PutUint16(p1[7:], value) + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.False(t, inTxn) + + p1[3] = 4 + inTxn, ok = checkTxnStatus(p1, false) + require.True(t, ok) + require.True(t, inTxn) + + inTxn, ok = checkTxnStatus(p1, true) + require.True(t, ok) + require.False(t, inTxn) + }) } diff --git a/pkg/proxy/util.go b/pkg/proxy/util.go index ecc73506a05c3..20dfd4b6ff417 100644 --- a/pkg/proxy/util.go +++ b/pkg/proxy/util.go @@ -98,6 +98,15 @@ func isErrPacket(p []byte) bool { return false } +// isLoadDataLocalInfileRespPacket returns true if []byte is a packet +// of load data local infile response. +func isLoadDataLocalInfileRespPacket(p []byte) bool { + if len(p) > 4 && p[4] == 0xFB { + return true + } + return false +} + // isEmptyPacket returns true if []byte is an empty packet. func isEmptyPacket(p []byte) bool { return len(p) == 0 diff --git a/pkg/proxy/util_test.go b/pkg/proxy/util_test.go index 8667f35c42106..902d31be03f3c 100644 --- a/pkg/proxy/util_test.go +++ b/pkg/proxy/util_test.go @@ -200,6 +200,20 @@ func TestIsErrPacket(t *testing.T) { require.True(t, ret) } +func TestIsLoadDataLocalInfileRespPacket(t *testing.T) { + var data []byte + ret := isLoadDataLocalInfileRespPacket(data) + require.False(t, ret) + + data = []byte{0, 0, 0, 0, 2, 0} + ret = isLoadDataLocalInfileRespPacket(data) + require.False(t, ret) + + data = []byte{0, 0, 0, 0, 0xFB, 0} + ret = isLoadDataLocalInfileRespPacket(data) + require.True(t, ret) +} + func TestIsDeallocatePacket(t *testing.T) { var data []byte ret := isDeallocatePacket(data)