diff --git a/README.md b/README.md index a87e817d..827568f2 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,14 @@ # gws ### minimal websocket server +#### Highlight +- No dependency +- Fully passes the WebSocket [autobahn-testsuite](https://github.com/crossbario/autobahn-testsuite) + #### Attention -- it's designed for api server, do not write big message -- it's recommended not to enable data compression in the intranet -- need to manage your own message handling coroutine +- It's designed for api server, do not write big message +- It's recommended not to enable data compression in the intranet +- You need to manage your own message handling coroutine #### Quick Start ```go diff --git a/example/tests/handler.go b/example/tests/handler.go deleted file mode 100644 index 0123735d..00000000 --- a/example/tests/handler.go +++ /dev/null @@ -1,88 +0,0 @@ -package main - -import ( - "github.com/lxzan/gws" - "github.com/lxzan/gws/internal" - "math/rand" -) - -func NewWebSocketHandler() *WebSocketHandler { - return &WebSocketHandler{} -} - -type WebSocketHandler struct{} - -func (c *WebSocketHandler) OnOpen(socket *gws.Conn) { - -} - -func (c *WebSocketHandler) OnMessage(socket *gws.Conn, m *gws.Message) { - body := m.Bytes() - defer m.Close() - - var key string - if len(key) <= 10 { - key = string(body) - } - switch key { - case "test": - c.OnTest(socket) - case "bench": - c.OnBench(socket) - case "verify": - c.OnVerify(socket) - case "ok": - case "ping": - socket.WriteMessage(gws.OpcodePing, nil) - case "pong": - socket.WriteMessage(gws.OpcodePong, nil) - case "close": - socket.WriteClose(gws.CloseGoingAway, []byte("goodbye")) - socket.Close() - default: - socket.Storage.Delete(key) - } -} - -func (c *WebSocketHandler) OnClose(socket *gws.Conn, code gws.CloseCode, reason []byte) { -} - -func (c *WebSocketHandler) OnError(socket *gws.Conn, err error) { - println(err.Error()) -} - -func (c *WebSocketHandler) OnPing(socket *gws.Conn, m []byte) { - println("onping") -} - -func (c *WebSocketHandler) OnPong(socket *gws.Conn, m []byte) { - println("onpong") -} - -func (c *WebSocketHandler) OnTest(socket *gws.Conn) { - const count = 1000 - for i := 0; i < count; i++ { - var size = internal.AlphabetNumeric.Intn(8 * 1024) - var k = internal.AlphabetNumeric.Generate(size) - socket.Storage.Put(string(k), 1) - socket.WriteMessage(gws.OpcodeText, k) - } -} - -func (c *WebSocketHandler) OnVerify(socket *gws.Conn) { - if socket.Storage.Len() != 0 { - socket.WriteMessage(gws.OpcodeText, []byte("failed")) - } - - socket.WriteMessage(gws.OpcodeText, []byte("ok")) -} - -func (c *WebSocketHandler) OnBench(socket *gws.Conn) { - const count = 1000000 - for i := 0; i < count; i++ { - var size = 10 + rand.Intn(1024) - var k = internal.AlphabetNumeric.Generate(size) - socket.WriteMessage(gws.OpcodeText, k) - //socket.WriteMessage(gws.OpcodeText, []byte("Hello")) - } -} diff --git a/example/tests/index.html b/example/tests/index.html deleted file mode 100644 index 18486ada..00000000 --- a/example/tests/index.html +++ /dev/null @@ -1,91 +0,0 @@ - - - - - Title - - - - -
-
- - - -
- -
- - - -
-
- - - - - diff --git a/example/tests/server.go b/example/tests/server.go deleted file mode 100644 index 674bc975..00000000 --- a/example/tests/server.go +++ /dev/null @@ -1,78 +0,0 @@ -package main - -import ( - "context" - "errors" - "flag" - "github.com/lxzan/gws" - "net/http" - _ "net/http/pprof" - "os" - "os/signal" - "path/filepath" - "strconv" - "syscall" - "time" -) - -var directory string - -func main() { - flag.StringVar(&directory, "d", "./", "directory") - flag.Parse() - - var upgrader = gws.Upgrader{} - - var handler = NewWebSocketHandler() - ctx, cancel := context.WithCancel(context.Background()) - - http.HandleFunc("/connect", func(writer http.ResponseWriter, request *http.Request) { - socket, err := upgrader.Upgrade(ctx, writer, request) - if err != nil { - return - } - defer socket.Close() - - handler.OnOpen(socket) - for { - select { - case <-ctx.Done(): - handler.OnError(socket, gws.CloseServiceRestart) - return - case msg := <-socket.ReadMessage(): - if err := msg.Err(); err != nil { - handler.OnError(socket, err) - return - } - - switch msg.Typ() { - case gws.OpcodeText, gws.OpcodeBinary: - handler.OnMessage(socket, msg) - case gws.OpcodePing: - handler.OnPing(socket, msg.Bytes()) - case gws.OpcodePong: - handler.OnPong(socket, msg.Bytes()) - default: - handler.OnError(socket, errors.New("unexpected opcode: "+strconv.Itoa(int(msg.Typ())))) - return - } - } - } - }) - - http.HandleFunc("/index.html", func(writer http.ResponseWriter, request *http.Request) { - writer.Header().Set("Content-Type", "text/html; charset=utf-8") - writer.WriteHeader(http.StatusOK) - d, _ := filepath.Abs(directory) - content, _ := os.ReadFile(d + "/index.html") - writer.Write(content) - }) - - go http.ListenAndServe(":3000", nil) - - quit := make(chan os.Signal) - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit - cancel() - time.Sleep(100 * time.Millisecond) -} diff --git a/example/testsuite/main.go b/example/testsuite/main.go index 13271273..6b8ff338 100644 --- a/example/testsuite/main.go +++ b/example/testsuite/main.go @@ -13,7 +13,7 @@ import ( ) func main() { - var upgrader = gws.Upgrader{} + var upgrader = gws.Upgrader{CompressEnabled: true, MaxContentLength: 32 * 1024 * 1024} var handler = new(WebSocketHandler) ctx, cancel := context.WithCancel(context.Background()) diff --git a/init.go b/init.go deleted file mode 100644 index 3c1eddb9..00000000 --- a/init.go +++ /dev/null @@ -1,9 +0,0 @@ -package gws - -import "github.com/lxzan/gws/internal" - -var _pool *internal.BufferPool - -func init() { - _pool = internal.NewBufferPool() -} diff --git a/message.go b/message.go index a80824fe..1c44d949 100644 --- a/message.go +++ b/message.go @@ -5,10 +5,11 @@ import ( ) type Message struct { - err error - opcode Opcode - dbuf *internal.Buffer // 数据缓冲 - cbuf *internal.Buffer // 解码器缓冲 + err error + opcode Opcode + compressed bool + dbuf *internal.Buffer // 数据缓冲 + cbuf *internal.Buffer // 解码器缓冲 } func (c *Message) Read(p []byte) (n int, err error) { diff --git a/protocol.go b/protocol.go index d1f5b976..d24aec0d 100644 --- a/protocol.go +++ b/protocol.go @@ -22,6 +22,7 @@ type EventHandler interface { } var closeErrorMap = map[CloseCode]string{ + 0: "empty code", CloseNormalClosure: "close normal", CloseGoingAway: "client going away", CloseProtocolError: "protocol error", diff --git a/reader.go b/reader.go index ad35687f..53341bc2 100644 --- a/reader.go +++ b/reader.go @@ -10,6 +10,12 @@ import ( "unicode/utf8" ) +var _pool *internal.BufferPool + +func init() { + _pool = internal.NewBufferPool() +} + func (c *Conn) ReadMessage() <-chan *Message { return c.messageChan } @@ -93,7 +99,7 @@ func (c *Conn) readMessage() error { // the negotiated extensions defines the meaning of such a nonzero // value, the receiving endpoint MUST _Fail the WebSocket // Connection_. - if c.fh.GetRSV1() != c.compressEnabled || c.fh.GetRSV2() || c.fh.GetRSV3() { + if !c.compressEnabled && (c.fh.GetRSV1() || c.fh.GetRSV2() || c.fh.GetRSV3()) { return CloseProtocolError } @@ -103,6 +109,7 @@ func (c *Conn) readMessage() error { // read control frame var opcode = c.fh.GetOpcode() + var compressed = c.compressEnabled && c.fh.GetRSV1() if !isDataFrame(opcode) { return c.readControl() } @@ -161,7 +168,7 @@ func (c *Conn) readMessage() error { return CloseMessageTooLarge } if fin { - msg := &Message{opcode: c.continuationOpcode, dbuf: c.continuationBuffer} + msg := &Message{opcode: c.continuationOpcode, dbuf: c.continuationBuffer, compressed: compressed} c.continuationOpcode = 0 c.continuationBuffer = nil return c.emitMessage(msg) @@ -176,7 +183,7 @@ func (c *Conn) readMessage() error { switch opcode { case OpcodeText, OpcodeBinary: - return c.emitMessage(&Message{opcode: opcode, dbuf: buf}) + return c.emitMessage(&Message{opcode: opcode, dbuf: buf, compressed: compressed}) default: return errors.New("unexpected opcode: " + strconv.Itoa(int(opcode))) } diff --git a/writer.go b/writer.go index 4b08c237..0402ec6f 100644 --- a/writer.go +++ b/writer.go @@ -59,6 +59,14 @@ func (c *Conn) WriteClose(code CloseCode, reason []byte) { c.emitError(c.writeFrame(OpcodeCloseConnection, content, false, true)) } +func (c *Conn) WritePing(payload []byte) { + c.emitError(c.writeFrame(OpcodePing, payload, false, true)) +} + +func (c *Conn) WritePong(payload []byte) { + c.emitError(c.writeFrame(OpcodePong, payload, false, true)) +} + // WriteMessage send message // 发送消息 func (c *Conn) WriteMessage(messageType Opcode, content []byte) { @@ -66,15 +74,15 @@ func (c *Conn) WriteMessage(messageType Opcode, content []byte) { } // WriteBatch -// 批量写入消息,最后一次写入后需要调用Flush +// 批量写入消息,最后一次写入后需要调用FlushWriter func (c *Conn) WriteBatch(messageType Opcode, content []byte) { c.emitError(c.writeMessage(messageType, content, false)) } -// Flush +// FlushWriter // 刷新写入缓冲区 // flush write buffer -func (c *Conn) Flush() { +func (c *Conn) FlushWriter() { c.emitError(c.wbuf.Flush()) }