From 8f44837318def9e80414f307b9c8186db568fde5 Mon Sep 17 00:00:00 2001 From: lxzan Date: Fri, 17 Nov 2023 11:40:27 +0800 Subject: [PATCH 1/2] format imports --- README.md | 52 +++++++++++++++++++++++++++-------- README_CN.md | 58 ++++++++++++++++++++++++++++++--------- benchmark_test.go | 5 ++-- client_test.go | 5 ++-- compress.go | 5 ++-- conn.go | 35 +++++++++++------------ examples/chatroom/main.go | 3 +- examples/echo/main.go | 3 +- examples/push/main.go | 8 +++--- init.go | 2 +- internal/pool_test.go | 3 +- internal/utils_test.go | 3 +- option.go | 5 ++-- reader.go | 3 +- reader_test.go | 5 ++-- session_storage.go | 9 +++--- session_storage_test.go | 3 +- task.go | 5 ++-- task_test.go | 12 +++++--- types.go | 3 +- writer.go | 28 +++++++++++-------- writer_test.go | 24 +++++++++++++++- 22 files changed, 193 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 96b640c8..dcec8bd1 100755 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ GWS (Go WebSocket) is a very simple, fast, reliable and feature-rich WebSocket implementation written in Go. It is designed to be used in highly-concurrent environments, and it is suitable for -building `API`, `PROXY`, `GAME`, `Live Video`, `MESSAGE`, etc. It supports both server and client side with a simple API +building `API`, `Proxy`, `Game`, `Live Video`, `Message`, etc. It supports both server and client side with a simple API which mean you can easily write a server or client by yourself. GWS developed base on Event-Driven model. every connection has a goroutine to handle the event, and the event is able @@ -26,19 +26,17 @@ to be processed in a non-blocking way. ### Why GWS - Simplicity and Ease of Use - - **User-Friendly API**: Straightforward and easy-to-understand API, making server and client setup hassle-free. + - **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy. - **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions. - High-Performance - - **Zero Allocs IO**: Built-in multi-level memory pool to minimize dynamic memory allocation during reads and - writes. - - **Optimized for Speed**: Designed for rapid data transmission and reception, ideal for time-sensitive + - **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive applications. + - **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership. - Reliability and Stability - - **Event-Driven Architecture**: Ensures stable performance even in highly concurrent environments. - **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation. - + - **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 6455`. 99% unit test coverage, covering almost all conditional branches. ### Benchmark #### IOPS (Echo Server) @@ -81,6 +79,7 @@ PASS - [KCP](#kcp) - [Proxy](#proxy) - [Broadcast](#broadcast) + - [Pub / Sub](#pub--sub) - [Autobahn Test](#autobahn-test) - [Communication](#communication) - [Acknowledgments](#acknowledgments) @@ -110,11 +109,11 @@ go get -v github.com/lxzan/gws@latest ```go type Event interface { - OnOpen(socket *Conn) // the connection is established + OnOpen(socket *Conn) // connection is established OnClose(socket *Conn, err error) // received a close frame or I/O error occurs - OnPing(socket *Conn, payload []byte) // receive a ping frame - OnPong(socket *Conn, payload []byte) // receive a pong frame - OnMessage(socket *Conn, message *Message) // receive a text/binary frame + OnPing(socket *Conn, payload []byte) // received a ping frame + OnPong(socket *Conn, payload []byte) // received a pong frame + OnMessage(socket *Conn, message *Message) // received a text/binary frame } ``` @@ -287,6 +286,37 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { } ``` +#### Pub / Sub + +```go +package main + +import ( + "github.com/lxzan/event_emitter" + "github.com/lxzan/gws" +) + +type Socket struct{ *gws.Conn } + +// GetSubscriberID gets the subscription ID, which needs to be unique. +func (c *Socket) GetSubscriberID() int64 { + userId, _ := c.Session().Load("userId") + return userId.(int64) +} + +func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) { + em.Subscribe(socket, topic, func(subscriber *Socket, msg any) { + _ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn) + }) +} + +func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) { + var broadcaster = gws.NewBroadcaster(op, msg) + defer broadcaster.Close() + em.Publish(topic, broadcaster) +} +``` + ### Autobahn Test ```bash diff --git a/README_CN.md b/README_CN.md index 8a334be7..10bc88aa 100755 --- a/README_CN.md +++ b/README_CN.md @@ -19,16 +19,16 @@ GWS(Go WebSocket)是一个用 Go 编写的非常简单、快速、可靠且 ### 为什么选择 GWS - 简单易用 - - **用户友好的 API 设计**: 简单易懂的应用程序接口,让服务器和客户端的设置变得轻松简单。 - - **编码效率**: 最大限度地减少实施复杂的 WebSocket 解决方案所需的代码量。 + - **用户友好**: 简洁明了的 `WebSocket` 事件接口设计,让服务器和客户端的交互变得轻松简单. + - **编码效率**: 最大限度地减少实施复杂的解决方案所需的代码量. -- 性能良好 - - **零动态内存分配 I/O**: 内置多级内存池,可最大限度地减少读写过程中的动态内存分配。 - - **性能优化**: 专为快速传输和接收数据而设计,是时间敏感型应用的理想之选。 +- 性能出众 + - **高吞吐低延迟**: 专为快速传输和接收数据而设计,是时间敏感型应用的理想之选. + - **低内存占用**: 高度优化的内存复用系统, 最大限度降低内存使用量,降低您的成本. - 稳定可靠 - - **事件驱动式架构**: 即使在高度并发的环境中,也能确保稳定的性能。 - - **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行。 + - **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行. + - **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 完全符合 `RFC 6455` 标准. 单元测试覆盖率达到99%, 几乎覆盖所有条件分支. ### 基准测试 @@ -72,6 +72,7 @@ PASS - [KCP](#kcp) - [代理](#代理) - [广播](#广播) + - [发布订阅](#发布订阅) - [Autobahn 测试](#autobahn-测试) - [交流](#交流) - [致谢](#致谢) @@ -88,9 +89,9 @@ PASS ### 注意 -- 所有 gws.Conn 导出的方法错误都是可忽略的, 它们在内部已经被妥善处理了 +- 所有 gws.Conn 导出方法返回的错误都是可忽略的, 它们在内部已经被妥善处理了 - 传输大文件有阻塞连接的风险 -- 如果复用HTTP服务器, 建议调用ReadLoop时开启新的goroutine, 以避免请求上下文内存不能被回收. +- 如果复用HTTP服务器, 建议开启新的Goroutine来调用ReadLoop, 以避免请求上下文内存不能及时回收. ### 安装 @@ -102,11 +103,11 @@ go get -v github.com/lxzan/gws@latest ```go type Event interface { - OnOpen(socket *Conn) // the connection is established + OnOpen(socket *Conn) // connection is established OnClose(socket *Conn, err error) // received a close frame or I/O error occurs - OnPing(socket *Conn, payload []byte) // receive a ping frame - OnPong(socket *Conn, payload []byte) // receive a pong frame - OnMessage(socket *Conn, message *Message) // receive a text/binary frame + OnPing(socket *Conn, payload []byte) // received a ping frame + OnPong(socket *Conn, payload []byte) // received a pong frame + OnMessage(socket *Conn, message *Message) // received a text/binary frame } ``` @@ -279,6 +280,37 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { } ``` +#### 发布订阅 + +```go +package main + +import ( + "github.com/lxzan/event_emitter" + "github.com/lxzan/gws" +) + +type Socket struct{ *gws.Conn } + +// GetSubscriberID 获取订阅ID, 需要保证唯一 +func (c *Socket) GetSubscriberID() int64 { + userId, _ := c.Session().Load("userId") + return userId.(int64) +} + +func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) { + em.Subscribe(socket, topic, func(subscriber *Socket, msg any) { + _ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn) + }) +} + +func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) { + var broadcaster = gws.NewBroadcaster(op, msg) + defer broadcaster.Close() + em.Publish(topic, broadcaster) +} +``` + ### Autobahn 测试 ```bash diff --git a/benchmark_test.go b/benchmark_test.go index f1b08b20..98162def 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -6,11 +6,12 @@ import ( "compress/flate" _ "embed" "encoding/binary" - klauspost "github.com/klauspost/compress/flate" - "github.com/lxzan/gws/internal" "io" "net" "testing" + + klauspost "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) //go:embed assets/github.json diff --git a/client_test.go b/client_test.go index 4c477045..bfb4c264 100644 --- a/client_test.go +++ b/client_test.go @@ -3,13 +3,14 @@ package gws import ( "crypto/tls" "errors" - "github.com/lxzan/gws/internal" - "github.com/stretchr/testify/assert" "io" "net" "net/http" "testing" "time" + + "github.com/lxzan/gws/internal" + "github.com/stretchr/testify/assert" ) func TestNewClient(t *testing.T) { diff --git a/compress.go b/compress.go index d6043dfa..1632dae3 100644 --- a/compress.go +++ b/compress.go @@ -3,12 +3,13 @@ package gws import ( "bytes" "encoding/binary" - "github.com/klauspost/compress/flate" - "github.com/lxzan/gws/internal" "io" "math" "sync" "sync/atomic" + + "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) // FlateTail Add four bytes as specified in RFC diff --git a/conn.go b/conn.go index fa489f27..53d4031c 100644 --- a/conn.go +++ b/conn.go @@ -91,24 +91,25 @@ func (c *Conn) emitError(err error) { return } - var responseCode = internal.CloseNormalClosure - var responseErr error = internal.CloseNormalClosure - switch v := err.(type) { - case internal.StatusCode: - responseCode = v - case *internal.Error: - responseCode = v.Code - responseErr = v.Err - default: - responseErr = err - } - - var content = responseCode.Bytes() - content = append(content, err.Error()...) - if len(content) > internal.ThresholdV1 { - content = content[:internal.ThresholdV1] - } if atomic.CompareAndSwapUint32(&c.closed, 0, 1) { + var responseCode = internal.CloseNormalClosure + var responseErr error = internal.CloseNormalClosure + switch v := err.(type) { + case internal.StatusCode: + responseCode = v + case *internal.Error: + responseCode = v.Code + responseErr = v.Err + default: + responseErr = err + } + + var content = responseCode.Bytes() + content = append(content, err.Error()...) + if len(content) > internal.ThresholdV1 { + content = content[:internal.ThresholdV1] + } + c.close(content, responseErr) } } diff --git a/examples/chatroom/main.go b/examples/chatroom/main.go index 4f80fbcf..91ae8284 100644 --- a/examples/chatroom/main.go +++ b/examples/chatroom/main.go @@ -3,10 +3,11 @@ package main import ( _ "embed" "encoding/json" - "github.com/lxzan/gws" "log" "net/http" "time" + + "github.com/lxzan/gws" ) const ( diff --git a/examples/echo/main.go b/examples/echo/main.go index a63baab6..c74da910 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -1,9 +1,10 @@ package main import ( - "github.com/lxzan/gws" "log" "net/http" + + "github.com/lxzan/gws" ) func main() { diff --git a/examples/push/main.go b/examples/push/main.go index 1d42b7ca..740eb776 100644 --- a/examples/push/main.go +++ b/examples/push/main.go @@ -35,13 +35,13 @@ type Handler struct { gws.BuiltinEventHandler } -func (c *Handler) MustGet(ss gws.SessionStorage, key string) any { - v, _ := ss.Load(key) +func (c *Handler) getSession(socket *gws.Conn, key string) any { + v, _ := socket.Session().Load(key) return v } func (c *Handler) Send(socket *gws.Conn, payload []byte) { - var channel = c.MustGet(socket.Session(), "channel").(chan []byte) + var channel = c.getSession(socket, "channel").(chan []byte) select { case channel <- payload: default: @@ -50,7 +50,7 @@ func (c *Handler) Send(socket *gws.Conn, payload []byte) { } func (c *Handler) OnClose(socket *gws.Conn, err error) { - var closer = c.MustGet(socket.Session(), "closer").(chan struct{}) + var closer = c.getSession(socket, "closer").(chan struct{}) closer <- struct{}{} } diff --git a/init.go b/init.go index 3367a97a..48f127f7 100644 --- a/init.go +++ b/init.go @@ -3,7 +3,7 @@ package gws import "github.com/lxzan/gws/internal" var ( - myPadding = frameHeader{} // 帧头填充物 + framePadding = frameHeader{} // 帧头填充物 binaryPool = internal.NewBufferPool() // 缓冲池 defaultLogger = new(stdLogger) // 默认日志工具 ) diff --git a/internal/pool_test.go b/internal/pool_test.go index a17a8733..10ec32ae 100644 --- a/internal/pool_test.go +++ b/internal/pool_test.go @@ -2,8 +2,9 @@ package internal import ( "bytes" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestBufferPool(t *testing.T) { diff --git a/internal/utils_test.go b/internal/utils_test.go index 52a2e925..87707226 100644 --- a/internal/utils_test.go +++ b/internal/utils_test.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "encoding/hex" - "github.com/stretchr/testify/assert" "hash/fnv" "io" "net/http" @@ -12,6 +11,8 @@ import ( "strings" "testing" "unsafe" + + "github.com/stretchr/testify/assert" ) func TestStringToBytes(t *testing.T) { diff --git a/option.go b/option.go index 2f9fb87b..02b399c9 100644 --- a/option.go +++ b/option.go @@ -2,12 +2,13 @@ package gws import ( "bufio" - "compress/flate" "crypto/tls" - "github.com/lxzan/gws/internal" "net" "net/http" "time" + + "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) const ( diff --git a/reader.go b/reader.go index 08ceae84..e60b8b6a 100644 --- a/reader.go +++ b/reader.go @@ -3,8 +3,9 @@ package gws import ( "bytes" "fmt" - "github.com/lxzan/gws/internal" "unsafe" + + "github.com/lxzan/gws/internal" ) func (c *Conn) checkMask(enabled bool) error { diff --git a/reader_test.go b/reader_test.go index b58db3c2..6ab3a8ae 100644 --- a/reader_test.go +++ b/reader_test.go @@ -6,12 +6,13 @@ import ( _ "embed" "encoding/hex" "encoding/json" - "github.com/lxzan/gws/internal" - "github.com/stretchr/testify/assert" "net" "sync" "testing" "time" + + "github.com/lxzan/gws/internal" + "github.com/stretchr/testify/assert" ) // 测试同步读 diff --git a/session_storage.go b/session_storage.go index d15a8948..a2dbf601 100644 --- a/session_storage.go +++ b/session_storage.go @@ -1,8 +1,9 @@ package gws import ( - "github.com/lxzan/gws/internal" "sync" + + "github.com/lxzan/gws/internal" ) type SessionStorage interface { @@ -45,8 +46,8 @@ func (c *smap) Store(key string, value any) { } func (c *smap) Range(f func(key string, value any) bool) { - c.Lock() - defer c.Unlock() + c.RLock() + defer c.RUnlock() for k, v := range c.data { if !f(k, v) { @@ -74,7 +75,7 @@ type ( func NewConcurrentMap[K Comparable, V any](segments uint64) *ConcurrentMap[K, V] { segments = internal.SelectValue(segments == 0, 16, segments) segments = internal.ToBinaryNumber(segments) - var cm = &ConcurrentMap[K, V]{segments: segments, buckets: make([]*bucket[K, V], segments, segments)} + var cm = &ConcurrentMap[K, V]{segments: segments, buckets: make([]*bucket[K, V], segments)} for i, _ := range cm.buckets { cm.buckets[i] = &bucket[K, V]{m: make(map[K]V)} } diff --git a/session_storage_test.go b/session_storage_test.go index fa1fae02..5eac95e9 100644 --- a/session_storage_test.go +++ b/session_storage_test.go @@ -1,9 +1,10 @@ package gws import ( + "testing" + "github.com/lxzan/gws/internal" "github.com/stretchr/testify/assert" - "testing" ) func TestMap(t *testing.T) { diff --git a/task.go b/task.go index 30c7008f..adcd802d 100644 --- a/task.go +++ b/task.go @@ -17,7 +17,7 @@ type ( serial int socket *Conn frame *bytes.Buffer - execute func(conn *Conn, buffer *bytes.Buffer) + execute func(conn *Conn, buffer *bytes.Buffer) error } ) @@ -54,7 +54,8 @@ func (c *workerQueue) getJob(newJob *asyncJob, delta int32) *asyncJob { // 循环执行任务 func (c *workerQueue) do(job *asyncJob) { for job != nil { - job.execute(job.socket, job.frame) + err := job.execute(job.socket, job.frame) + job.socket.emitError(err) job = c.getJob(nil, -1) } } diff --git a/task_test.go b/task_test.go index 52fbbb8f..7d72499f 100644 --- a/task_test.go +++ b/task_test.go @@ -235,13 +235,14 @@ func TestTaskQueue(t *testing.T) { listA = append(listA, i) v := i - q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { defer wg.Done() var latency = time.Duration(internal.AlphabetNumeric.Intn(100)) * time.Microsecond time.Sleep(latency) mu.Lock() listB = append(listB, v) mu.Unlock() + return nil }}) } wg.Wait() @@ -255,10 +256,11 @@ func TestTaskQueue(t *testing.T) { wg.Add(1000) for i := int64(1); i <= 1000; i++ { var tmp = i - w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { time.Sleep(time.Millisecond) atomic.AddInt64(&sum, tmp) wg.Done() + return nil }}) } wg.Wait() @@ -272,10 +274,11 @@ func TestTaskQueue(t *testing.T) { wg.Add(1000) for i := int64(1); i <= 1000; i++ { var tmp = i - w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { time.Sleep(time.Millisecond) atomic.AddInt64(&sum, tmp) wg.Done() + return nil }}) } wg.Wait() @@ -350,7 +353,7 @@ func TestRQueue(t *testing.T) { var serial = int64(0) var done = make(chan struct{}) for i := 0; i < total; i++ { - q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { x := atomic.AddInt64(&concurrency, 1) assert.LessOrEqual(t, x, int64(limit)) time.Sleep(10 * time.Millisecond) @@ -358,6 +361,7 @@ func TestRQueue(t *testing.T) { if atomic.AddInt64(&serial, 1) == total { done <- struct{}{} } + return nil }}) } <-done diff --git a/types.go b/types.go index 720e99e2..1dde35e3 100644 --- a/types.go +++ b/types.go @@ -5,12 +5,13 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/lxzan/gws/internal" "io" "log" "net" "runtime" "unsafe" + + "github.com/lxzan/gws/internal" ) const frameHeaderSize = 14 diff --git a/writer.go b/writer.go index abab8e3e..2dda35f7 100644 --- a/writer.go +++ b/writer.go @@ -3,10 +3,11 @@ package gws import ( "bytes" "errors" - "github.com/lxzan/gws/internal" "math" "sync" "sync/atomic" + + "github.com/lxzan/gws/internal" ) // WriteClose 发送关闭帧, 主动断开连接 @@ -40,13 +41,13 @@ func (c *Conn) WriteString(s string) error { return c.WriteMessage(OpcodeText, internal.StringToBytes(s)) } -func writeAsync(socket *Conn, buffer *bytes.Buffer) { +func writeAsyncFunc(socket *Conn, frame *bytes.Buffer) error { if socket.isClosed() { - return + return ErrConnClosed } - err := internal.WriteN(socket.conn, buffer.Bytes()) - binaryPool.Put(buffer) - socket.emitError(err) + err := internal.WriteN(socket.conn, frame.Bytes()) + binaryPool.Put(frame) + return err } // WriteAsync 异步非阻塞地写入消息 @@ -57,7 +58,7 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error { c.emitError(err) return err } - job := &asyncJob{socket: c, frame: frame, execute: writeAsync} + job := &asyncJob{socket: c, frame: frame, execute: writeAsyncFunc} c.writeQueue.Push(job) return nil } @@ -116,7 +117,7 @@ func (c *Conn) genFrame(opcode Opcode, payload []byte) (*bytes.Buffer, error) { func (c *Conn) compressData(opcode Opcode, payload []byte) (*bytes.Buffer, error) { var buf = binaryPool.Get(len(payload) + frameHeaderSize) - buf.Write(myPadding[0:]) + buf.Write(framePadding[0:]) err := c.compressor.Compress(payload, buf) if err != nil { return nil, err @@ -164,13 +165,15 @@ func NewBroadcaster(opcode Opcode, payload []byte) *Broadcaster { return c } -func (c *Broadcaster) writeAsync(socket *Conn, buffer *bytes.Buffer) { - if !socket.isClosed() { - socket.emitError(internal.WriteN(socket.conn, buffer.Bytes())) +func (c *Broadcaster) writeAsyncFunc(socket *Conn, frame *bytes.Buffer) error { + if socket.isClosed() { + return ErrConnClosed } + err := internal.WriteN(socket.conn, frame.Bytes()) if atomic.AddInt64(&c.state, -1) == 0 { c.doClose() } + return err } // Broadcast 广播 @@ -179,13 +182,14 @@ func (c *Broadcaster) writeAsync(socket *Conn, buffer *bytes.Buffer) { func (c *Broadcaster) Broadcast(socket *Conn) error { var idx = internal.SelectValue(socket.compressEnabled, 1, 0) var msg = c.msgs[idx] + msg.once.Do(func() { msg.frame, msg.err = socket.genFrame(c.opcode, c.payload) }) if msg.err != nil { return msg.err } atomic.AddInt64(&c.state, 1) - var job = &asyncJob{socket: socket, frame: msg.frame, execute: c.writeAsync} + var job = &asyncJob{socket: socket, frame: msg.frame, execute: c.writeAsyncFunc} socket.writeQueue.Push(job) return nil } diff --git a/writer_test.go b/writer_test.go index 70434995..4f4dd392 100644 --- a/writer_test.go +++ b/writer_test.go @@ -102,7 +102,7 @@ func TestWriteClose(t *testing.T) { } func TestConn_WriteAsyncError(t *testing.T) { - t.Run("", func(t *testing.T) { + t.Run("write async", func(t *testing.T) { var serverHandler = new(webSocketMocker) var clientHandler = new(webSocketMocker) var serverOption = &ServerOption{} @@ -257,6 +257,28 @@ func TestNewBroadcaster(t *testing.T) { b.Close() handler.wg.Wait() }) + + t.Run("conn closed", func(t *testing.T) { + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{} + var clientOption = &ClientOption{} + var wg = &sync.WaitGroup{} + wg.Add(1) + + serverHandler.onClose = func(socket *Conn, err error) { + as.Error(err) + wg.Done() + } + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + _ = server.conn.Close() + var broadcaster = NewBroadcaster(OpcodeText, internal.AlphabetNumeric.Generate(16)) + _ = broadcaster.Broadcast(server) + wg.Wait() + }) } type broadcastHandler struct { From ddcfce83865ac33dc528f08744f2fe871c8263be Mon Sep 17 00:00:00 2001 From: kuma Date: Tue, 21 Nov 2023 11:37:28 +0800 Subject: [PATCH 2/2] Update README --- README.md | 33 ++++++++++++++++++++------------- README_CN.md | 39 ++++++++++++++++++++++----------------- 2 files changed, 42 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index dcec8bd1..66aa6d14 100755 --- a/README.md +++ b/README.md @@ -26,17 +26,20 @@ to be processed in a non-blocking way. ### Why GWS - Simplicity and Ease of Use - - **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy. - - **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions. + + - **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy. + - **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions. - High-Performance - - **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive - applications. - - **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership. + + - **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive + applications. + - **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership. - Reliability and Stability - - **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation. - - **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 6455`. 99% unit test coverage, covering almost all conditional branches. + - **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation. + - **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 6455`. 99% unit test coverage, covering almost all conditional branches. + ### Benchmark #### IOPS (Echo Server) @@ -66,8 +69,8 @@ PASS - [Introduction](#introduction) - [Why GWS](#why-gws) - [Benchmark](#benchmark) - - [IOPS (Echo Server)](#iops-echo-server) - - [GoBench](#gobench) + - [IOPS (Echo Server)](#iops-echo-server) + - [GoBench](#gobench) - [Index](#index) - [Feature](#feature) - [Attention](#attention) @@ -76,10 +79,10 @@ PASS - [Quick Start](#quick-start) - [Best Practice](#best-practice) - [More Examples](#more-examples) - - [KCP](#kcp) - - [Proxy](#proxy) - - [Broadcast](#broadcast) - - [Pub / Sub](#pub--sub) + - [KCP](#kcp) + - [Proxy](#proxy) + - [Broadcast](#broadcast) + - [Pub / Sub](#pub--sub) - [Autobahn Test](#autobahn-test) - [Communication](#communication) - [Acknowledgments](#acknowledgments) @@ -288,6 +291,10 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { #### Pub / Sub +Use the event_emitter package to implement the publish-subscribe model. Wrap `gws.Conn` in a structure and implement the GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the subscriber, who can only receive messages on the subject of his subscription. + +This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message. + ```go package main diff --git a/README_CN.md b/README_CN.md index 10bc88aa..ee513089 100755 --- a/README_CN.md +++ b/README_CN.md @@ -19,17 +19,19 @@ GWS(Go WebSocket)是一个用 Go 编写的非常简单、快速、可靠且 ### 为什么选择 GWS - 简单易用 - - **用户友好**: 简洁明了的 `WebSocket` 事件接口设计,让服务器和客户端的交互变得轻松简单. - - **编码效率**: 最大限度地减少实施复杂的解决方案所需的代码量. + + - **用户友好**: 简洁明了的 `WebSocket` 事件接口设计,让服务器和客户端的交互变得轻松简单. + - **编码效率**: 最大限度地减少实施复杂的解决方案所需的代码量. - 性能出众 - - **高吞吐低延迟**: 专为快速传输和接收数据而设计,是时间敏感型应用的理想之选. - - **低内存占用**: 高度优化的内存复用系统, 最大限度降低内存使用量,降低您的成本. + + - **高吞吐低延迟**: 专为快速传输和接收数据而设计,是时间敏感型应用的理想之选. + - **低内存占用**: 高度优化的内存复用系统, 最大限度降低内存使用量,降低您的成本. - 稳定可靠 - - **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行. - - **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 完全符合 `RFC 6455` 标准. 单元测试覆盖率达到99%, 几乎覆盖所有条件分支. - + - **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行. + - **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 完全符合 `RFC 6455` 标准. 单元测试覆盖率达到 99%, 几乎覆盖所有条件分支. + ### 基准测试 #### IOPS (Echo Server) @@ -59,8 +61,8 @@ PASS - [介绍](#介绍) - [为什么选择 GWS](#为什么选择-gws) - [基准测试](#基准测试) - - [IOPS (Echo Server)](#iops-echo-server) - - [GoBench](#gobench) + - [IOPS (Echo Server)](#iops-echo-server) + - [GoBench](#gobench) - [Index](#index) - [特性](#特性) - [注意](#注意) @@ -69,15 +71,14 @@ PASS - [快速上手](#快速上手) - [最佳实践](#最佳实践) - [更多用例](#更多用例) - - [KCP](#kcp) - - [代理](#代理) - - [广播](#广播) - - [发布订阅](#发布订阅) + - [KCP](#kcp) + - [代理](#代理) + - [广播](#广播) + - [发布/订阅](#发布订阅) - [Autobahn 测试](#autobahn-测试) - [交流](#交流) - [致谢](#致谢) - ### 特性 - [x] 事件驱动式 API @@ -91,7 +92,7 @@ PASS - 所有 gws.Conn 导出方法返回的错误都是可忽略的, 它们在内部已经被妥善处理了 - 传输大文件有阻塞连接的风险 -- 如果复用HTTP服务器, 建议开启新的Goroutine来调用ReadLoop, 以避免请求上下文内存不能及时回收. +- 如果复用 HTTP 服务器, 建议开启新的 Goroutine 来调用 ReadLoop, 以避免请求上下文内存不能及时回收. ### 安装 @@ -237,7 +238,7 @@ func main() { #### 代理 -通过代理拨号, 使用socks5协议. +通过代理拨号, 使用 socks5 协议. ```go package main @@ -280,7 +281,11 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { } ``` -#### 发布订阅 +#### 发布/订阅 + +使用 event_emitter 包实现发布订阅模式。用结构体包装 `gws.Conn`,并实现 GetSubscriberID 方法以获取订阅 ID,该 ID 必须是唯一的。订阅 ID 用于识别订阅者,订阅者只能接收其订阅主题的消息。 + +此示例对于使用 gws 构建聊天室或消息推送非常有用。这意味着用户可以通过 websocket 订阅一个或多个主题,当向该主题发布消息时,所有订阅用户都会收到消息。 ```go package main