From fc2092a3cf6665a3cf578fa0c504ba809c6c89a1 Mon Sep 17 00:00:00 2001 From: lxzan Date: Fri, 17 Nov 2023 11:40:27 +0800 Subject: [PATCH] format imports --- README.md | 52 +++++++++++++++++++++++++++++-------- README_CN.md | 54 +++++++++++++++++++++++++++++++-------- benchmark_test.go | 5 ++-- client_test.go | 5 ++-- compress.go | 5 ++-- conn.go | 35 +++++++++++++------------ examples/chatroom/main.go | 3 ++- examples/echo/main.go | 3 ++- examples/push/main.go | 8 +++--- init.go | 2 +- internal/pool_test.go | 3 ++- internal/utils_test.go | 3 ++- option.go | 5 ++-- reader.go | 3 ++- reader_test.go | 5 ++-- session_storage.go | 9 ++++--- session_storage_test.go | 3 ++- task.go | 5 ++-- task_test.go | 12 ++++++--- types.go | 3 ++- writer.go | 28 +++++++++++--------- writer_test.go | 24 ++++++++++++++++- 22 files changed, 191 insertions(+), 84 deletions(-) diff --git a/README.md b/README.md index 96b640c8..dcec8bd1 100755 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ GWS (Go WebSocket) is a very simple, fast, reliable and feature-rich WebSocket implementation written in Go. It is designed to be used in highly-concurrent environments, and it is suitable for -building `API`, `PROXY`, `GAME`, `Live Video`, `MESSAGE`, etc. It supports both server and client side with a simple API +building `API`, `Proxy`, `Game`, `Live Video`, `Message`, etc. It supports both server and client side with a simple API which mean you can easily write a server or client by yourself. GWS developed base on Event-Driven model. every connection has a goroutine to handle the event, and the event is able @@ -26,19 +26,17 @@ to be processed in a non-blocking way. ### Why GWS - Simplicity and Ease of Use - - **User-Friendly API**: Straightforward and easy-to-understand API, making server and client setup hassle-free. + - **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy. - **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions. - High-Performance - - **Zero Allocs IO**: Built-in multi-level memory pool to minimize dynamic memory allocation during reads and - writes. - - **Optimized for Speed**: Designed for rapid data transmission and reception, ideal for time-sensitive + - **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive applications. + - **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership. - Reliability and Stability - - **Event-Driven Architecture**: Ensures stable performance even in highly concurrent environments. - **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation. - + - **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 6455`. 99% unit test coverage, covering almost all conditional branches. ### Benchmark #### IOPS (Echo Server) @@ -81,6 +79,7 @@ PASS - [KCP](#kcp) - [Proxy](#proxy) - [Broadcast](#broadcast) + - [Pub / Sub](#pub--sub) - [Autobahn Test](#autobahn-test) - [Communication](#communication) - [Acknowledgments](#acknowledgments) @@ -110,11 +109,11 @@ go get -v github.com/lxzan/gws@latest ```go type Event interface { - OnOpen(socket *Conn) // the connection is established + OnOpen(socket *Conn) // connection is established OnClose(socket *Conn, err error) // received a close frame or I/O error occurs - OnPing(socket *Conn, payload []byte) // receive a ping frame - OnPong(socket *Conn, payload []byte) // receive a pong frame - OnMessage(socket *Conn, message *Message) // receive a text/binary frame + OnPing(socket *Conn, payload []byte) // received a ping frame + OnPong(socket *Conn, payload []byte) // received a pong frame + OnMessage(socket *Conn, message *Message) // received a text/binary frame } ``` @@ -287,6 +286,37 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { } ``` +#### Pub / Sub + +```go +package main + +import ( + "github.com/lxzan/event_emitter" + "github.com/lxzan/gws" +) + +type Socket struct{ *gws.Conn } + +// GetSubscriberID gets the subscription ID, which needs to be unique. +func (c *Socket) GetSubscriberID() int64 { + userId, _ := c.Session().Load("userId") + return userId.(int64) +} + +func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) { + em.Subscribe(socket, topic, func(subscriber *Socket, msg any) { + _ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn) + }) +} + +func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) { + var broadcaster = gws.NewBroadcaster(op, msg) + defer broadcaster.Close() + em.Publish(topic, broadcaster) +} +``` + ### Autobahn Test ```bash diff --git a/README_CN.md b/README_CN.md index 8a334be7..c5189973 100755 --- a/README_CN.md +++ b/README_CN.md @@ -19,16 +19,16 @@ GWS(Go WebSocket)是一个用 Go 编写的非常简单、快速、可靠且 ### 为什么选择 GWS - 简单易用 - - **用户友好的 API 设计**: 简单易懂的应用程序接口,让服务器和客户端的设置变得轻松简单。 - - **编码效率**: 最大限度地减少实施复杂的 WebSocket 解决方案所需的代码量。 + - **用户友好**: 简单明了的 `WebSocket` 事件接口设计,让服务器和客户端的交互变得轻松简单. + - **编码效率**: 最大限度地减少实施复杂的解决方案所需的代码量. -- 性能良好 - - **零动态内存分配 I/O**: 内置多级内存池,可最大限度地减少读写过程中的动态内存分配。 - - **性能优化**: 专为快速传输和接收数据而设计,是时间敏感型应用的理想之选。 +- 性能出众 + - **高吞吐低延迟**: 专为快速传输和接收数据而设计,是时间敏感型应用的理想之选. + - **低内存占用**: 高度优化的内存复用系统, 最大限度降低内存占用,降低您的使用成本. - 稳定可靠 - - **事件驱动式架构**: 即使在高度并发的环境中,也能确保稳定的性能。 - - **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行。 + - **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行. + - **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 完全符合 `RFC 6455` 标准. 单元测试覆盖率达到99%, 几乎覆盖所有条件分支. ### 基准测试 @@ -72,6 +72,7 @@ PASS - [KCP](#kcp) - [代理](#代理) - [广播](#广播) + - [发布订阅](#发布订阅) - [Autobahn 测试](#autobahn-测试) - [交流](#交流) - [致谢](#致谢) @@ -102,11 +103,11 @@ go get -v github.com/lxzan/gws@latest ```go type Event interface { - OnOpen(socket *Conn) // the connection is established + OnOpen(socket *Conn) // connection is established OnClose(socket *Conn, err error) // received a close frame or I/O error occurs - OnPing(socket *Conn, payload []byte) // receive a ping frame - OnPong(socket *Conn, payload []byte) // receive a pong frame - OnMessage(socket *Conn, message *Message) // receive a text/binary frame + OnPing(socket *Conn, payload []byte) // received a ping frame + OnPong(socket *Conn, payload []byte) // received a pong frame + OnMessage(socket *Conn, message *Message) // received a text/binary frame } ``` @@ -279,6 +280,37 @@ func Broadcast(conns []*gws.Conn, opcode gws.Opcode, payload []byte) { } ``` +#### 发布订阅 + +```go +package main + +import ( + "github.com/lxzan/event_emitter" + "github.com/lxzan/gws" +) + +type Socket struct{ *gws.Conn } + +// GetSubscriberID 获取订阅ID, 需要保证唯一 +func (c *Socket) GetSubscriberID() int64 { + userId, _ := c.Session().Load("userId") + return userId.(int64) +} + +func Sub(em *event_emitter.EventEmitter[*Socket], topic string, socket *Socket) { + em.Subscribe(socket, topic, func(subscriber *Socket, msg any) { + _ = msg.(*gws.Broadcaster).Broadcast(subscriber.Conn) + }) +} + +func Pub(em *event_emitter.EventEmitter[*Socket], topic string, op gws.Opcode, msg []byte) { + var broadcaster = gws.NewBroadcaster(op, msg) + defer broadcaster.Close() + em.Publish(topic, broadcaster) +} +``` + ### Autobahn 测试 ```bash diff --git a/benchmark_test.go b/benchmark_test.go index f1b08b20..98162def 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -6,11 +6,12 @@ import ( "compress/flate" _ "embed" "encoding/binary" - klauspost "github.com/klauspost/compress/flate" - "github.com/lxzan/gws/internal" "io" "net" "testing" + + klauspost "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) //go:embed assets/github.json diff --git a/client_test.go b/client_test.go index 4c477045..bfb4c264 100644 --- a/client_test.go +++ b/client_test.go @@ -3,13 +3,14 @@ package gws import ( "crypto/tls" "errors" - "github.com/lxzan/gws/internal" - "github.com/stretchr/testify/assert" "io" "net" "net/http" "testing" "time" + + "github.com/lxzan/gws/internal" + "github.com/stretchr/testify/assert" ) func TestNewClient(t *testing.T) { diff --git a/compress.go b/compress.go index d6043dfa..1632dae3 100644 --- a/compress.go +++ b/compress.go @@ -3,12 +3,13 @@ package gws import ( "bytes" "encoding/binary" - "github.com/klauspost/compress/flate" - "github.com/lxzan/gws/internal" "io" "math" "sync" "sync/atomic" + + "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) // FlateTail Add four bytes as specified in RFC diff --git a/conn.go b/conn.go index fa489f27..53d4031c 100644 --- a/conn.go +++ b/conn.go @@ -91,24 +91,25 @@ func (c *Conn) emitError(err error) { return } - var responseCode = internal.CloseNormalClosure - var responseErr error = internal.CloseNormalClosure - switch v := err.(type) { - case internal.StatusCode: - responseCode = v - case *internal.Error: - responseCode = v.Code - responseErr = v.Err - default: - responseErr = err - } - - var content = responseCode.Bytes() - content = append(content, err.Error()...) - if len(content) > internal.ThresholdV1 { - content = content[:internal.ThresholdV1] - } if atomic.CompareAndSwapUint32(&c.closed, 0, 1) { + var responseCode = internal.CloseNormalClosure + var responseErr error = internal.CloseNormalClosure + switch v := err.(type) { + case internal.StatusCode: + responseCode = v + case *internal.Error: + responseCode = v.Code + responseErr = v.Err + default: + responseErr = err + } + + var content = responseCode.Bytes() + content = append(content, err.Error()...) + if len(content) > internal.ThresholdV1 { + content = content[:internal.ThresholdV1] + } + c.close(content, responseErr) } } diff --git a/examples/chatroom/main.go b/examples/chatroom/main.go index 4f80fbcf..91ae8284 100644 --- a/examples/chatroom/main.go +++ b/examples/chatroom/main.go @@ -3,10 +3,11 @@ package main import ( _ "embed" "encoding/json" - "github.com/lxzan/gws" "log" "net/http" "time" + + "github.com/lxzan/gws" ) const ( diff --git a/examples/echo/main.go b/examples/echo/main.go index a63baab6..c74da910 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -1,9 +1,10 @@ package main import ( - "github.com/lxzan/gws" "log" "net/http" + + "github.com/lxzan/gws" ) func main() { diff --git a/examples/push/main.go b/examples/push/main.go index 1d42b7ca..740eb776 100644 --- a/examples/push/main.go +++ b/examples/push/main.go @@ -35,13 +35,13 @@ type Handler struct { gws.BuiltinEventHandler } -func (c *Handler) MustGet(ss gws.SessionStorage, key string) any { - v, _ := ss.Load(key) +func (c *Handler) getSession(socket *gws.Conn, key string) any { + v, _ := socket.Session().Load(key) return v } func (c *Handler) Send(socket *gws.Conn, payload []byte) { - var channel = c.MustGet(socket.Session(), "channel").(chan []byte) + var channel = c.getSession(socket, "channel").(chan []byte) select { case channel <- payload: default: @@ -50,7 +50,7 @@ func (c *Handler) Send(socket *gws.Conn, payload []byte) { } func (c *Handler) OnClose(socket *gws.Conn, err error) { - var closer = c.MustGet(socket.Session(), "closer").(chan struct{}) + var closer = c.getSession(socket, "closer").(chan struct{}) closer <- struct{}{} } diff --git a/init.go b/init.go index 3367a97a..48f127f7 100644 --- a/init.go +++ b/init.go @@ -3,7 +3,7 @@ package gws import "github.com/lxzan/gws/internal" var ( - myPadding = frameHeader{} // 帧头填充物 + framePadding = frameHeader{} // 帧头填充物 binaryPool = internal.NewBufferPool() // 缓冲池 defaultLogger = new(stdLogger) // 默认日志工具 ) diff --git a/internal/pool_test.go b/internal/pool_test.go index a17a8733..10ec32ae 100644 --- a/internal/pool_test.go +++ b/internal/pool_test.go @@ -2,8 +2,9 @@ package internal import ( "bytes" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestBufferPool(t *testing.T) { diff --git a/internal/utils_test.go b/internal/utils_test.go index 52a2e925..87707226 100644 --- a/internal/utils_test.go +++ b/internal/utils_test.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "encoding/hex" - "github.com/stretchr/testify/assert" "hash/fnv" "io" "net/http" @@ -12,6 +11,8 @@ import ( "strings" "testing" "unsafe" + + "github.com/stretchr/testify/assert" ) func TestStringToBytes(t *testing.T) { diff --git a/option.go b/option.go index 2f9fb87b..02b399c9 100644 --- a/option.go +++ b/option.go @@ -2,12 +2,13 @@ package gws import ( "bufio" - "compress/flate" "crypto/tls" - "github.com/lxzan/gws/internal" "net" "net/http" "time" + + "github.com/klauspost/compress/flate" + "github.com/lxzan/gws/internal" ) const ( diff --git a/reader.go b/reader.go index 08ceae84..e60b8b6a 100644 --- a/reader.go +++ b/reader.go @@ -3,8 +3,9 @@ package gws import ( "bytes" "fmt" - "github.com/lxzan/gws/internal" "unsafe" + + "github.com/lxzan/gws/internal" ) func (c *Conn) checkMask(enabled bool) error { diff --git a/reader_test.go b/reader_test.go index b58db3c2..6ab3a8ae 100644 --- a/reader_test.go +++ b/reader_test.go @@ -6,12 +6,13 @@ import ( _ "embed" "encoding/hex" "encoding/json" - "github.com/lxzan/gws/internal" - "github.com/stretchr/testify/assert" "net" "sync" "testing" "time" + + "github.com/lxzan/gws/internal" + "github.com/stretchr/testify/assert" ) // 测试同步读 diff --git a/session_storage.go b/session_storage.go index d15a8948..a2dbf601 100644 --- a/session_storage.go +++ b/session_storage.go @@ -1,8 +1,9 @@ package gws import ( - "github.com/lxzan/gws/internal" "sync" + + "github.com/lxzan/gws/internal" ) type SessionStorage interface { @@ -45,8 +46,8 @@ func (c *smap) Store(key string, value any) { } func (c *smap) Range(f func(key string, value any) bool) { - c.Lock() - defer c.Unlock() + c.RLock() + defer c.RUnlock() for k, v := range c.data { if !f(k, v) { @@ -74,7 +75,7 @@ type ( func NewConcurrentMap[K Comparable, V any](segments uint64) *ConcurrentMap[K, V] { segments = internal.SelectValue(segments == 0, 16, segments) segments = internal.ToBinaryNumber(segments) - var cm = &ConcurrentMap[K, V]{segments: segments, buckets: make([]*bucket[K, V], segments, segments)} + var cm = &ConcurrentMap[K, V]{segments: segments, buckets: make([]*bucket[K, V], segments)} for i, _ := range cm.buckets { cm.buckets[i] = &bucket[K, V]{m: make(map[K]V)} } diff --git a/session_storage_test.go b/session_storage_test.go index fa1fae02..5eac95e9 100644 --- a/session_storage_test.go +++ b/session_storage_test.go @@ -1,9 +1,10 @@ package gws import ( + "testing" + "github.com/lxzan/gws/internal" "github.com/stretchr/testify/assert" - "testing" ) func TestMap(t *testing.T) { diff --git a/task.go b/task.go index 30c7008f..adcd802d 100644 --- a/task.go +++ b/task.go @@ -17,7 +17,7 @@ type ( serial int socket *Conn frame *bytes.Buffer - execute func(conn *Conn, buffer *bytes.Buffer) + execute func(conn *Conn, buffer *bytes.Buffer) error } ) @@ -54,7 +54,8 @@ func (c *workerQueue) getJob(newJob *asyncJob, delta int32) *asyncJob { // 循环执行任务 func (c *workerQueue) do(job *asyncJob) { for job != nil { - job.execute(job.socket, job.frame) + err := job.execute(job.socket, job.frame) + job.socket.emitError(err) job = c.getJob(nil, -1) } } diff --git a/task_test.go b/task_test.go index 52fbbb8f..7d72499f 100644 --- a/task_test.go +++ b/task_test.go @@ -235,13 +235,14 @@ func TestTaskQueue(t *testing.T) { listA = append(listA, i) v := i - q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { defer wg.Done() var latency = time.Duration(internal.AlphabetNumeric.Intn(100)) * time.Microsecond time.Sleep(latency) mu.Lock() listB = append(listB, v) mu.Unlock() + return nil }}) } wg.Wait() @@ -255,10 +256,11 @@ func TestTaskQueue(t *testing.T) { wg.Add(1000) for i := int64(1); i <= 1000; i++ { var tmp = i - w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { time.Sleep(time.Millisecond) atomic.AddInt64(&sum, tmp) wg.Done() + return nil }}) } wg.Wait() @@ -272,10 +274,11 @@ func TestTaskQueue(t *testing.T) { wg.Add(1000) for i := int64(1); i <= 1000; i++ { var tmp = i - w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + w.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { time.Sleep(time.Millisecond) atomic.AddInt64(&sum, tmp) wg.Done() + return nil }}) } wg.Wait() @@ -350,7 +353,7 @@ func TestRQueue(t *testing.T) { var serial = int64(0) var done = make(chan struct{}) for i := 0; i < total; i++ { - q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) { + q.Push(&asyncJob{execute: func(conn *Conn, buffer *bytes.Buffer) error { x := atomic.AddInt64(&concurrency, 1) assert.LessOrEqual(t, x, int64(limit)) time.Sleep(10 * time.Millisecond) @@ -358,6 +361,7 @@ func TestRQueue(t *testing.T) { if atomic.AddInt64(&serial, 1) == total { done <- struct{}{} } + return nil }}) } <-done diff --git a/types.go b/types.go index 720e99e2..1dde35e3 100644 --- a/types.go +++ b/types.go @@ -5,12 +5,13 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/lxzan/gws/internal" "io" "log" "net" "runtime" "unsafe" + + "github.com/lxzan/gws/internal" ) const frameHeaderSize = 14 diff --git a/writer.go b/writer.go index abab8e3e..2dda35f7 100644 --- a/writer.go +++ b/writer.go @@ -3,10 +3,11 @@ package gws import ( "bytes" "errors" - "github.com/lxzan/gws/internal" "math" "sync" "sync/atomic" + + "github.com/lxzan/gws/internal" ) // WriteClose 发送关闭帧, 主动断开连接 @@ -40,13 +41,13 @@ func (c *Conn) WriteString(s string) error { return c.WriteMessage(OpcodeText, internal.StringToBytes(s)) } -func writeAsync(socket *Conn, buffer *bytes.Buffer) { +func writeAsyncFunc(socket *Conn, frame *bytes.Buffer) error { if socket.isClosed() { - return + return ErrConnClosed } - err := internal.WriteN(socket.conn, buffer.Bytes()) - binaryPool.Put(buffer) - socket.emitError(err) + err := internal.WriteN(socket.conn, frame.Bytes()) + binaryPool.Put(frame) + return err } // WriteAsync 异步非阻塞地写入消息 @@ -57,7 +58,7 @@ func (c *Conn) WriteAsync(opcode Opcode, payload []byte) error { c.emitError(err) return err } - job := &asyncJob{socket: c, frame: frame, execute: writeAsync} + job := &asyncJob{socket: c, frame: frame, execute: writeAsyncFunc} c.writeQueue.Push(job) return nil } @@ -116,7 +117,7 @@ func (c *Conn) genFrame(opcode Opcode, payload []byte) (*bytes.Buffer, error) { func (c *Conn) compressData(opcode Opcode, payload []byte) (*bytes.Buffer, error) { var buf = binaryPool.Get(len(payload) + frameHeaderSize) - buf.Write(myPadding[0:]) + buf.Write(framePadding[0:]) err := c.compressor.Compress(payload, buf) if err != nil { return nil, err @@ -164,13 +165,15 @@ func NewBroadcaster(opcode Opcode, payload []byte) *Broadcaster { return c } -func (c *Broadcaster) writeAsync(socket *Conn, buffer *bytes.Buffer) { - if !socket.isClosed() { - socket.emitError(internal.WriteN(socket.conn, buffer.Bytes())) +func (c *Broadcaster) writeAsyncFunc(socket *Conn, frame *bytes.Buffer) error { + if socket.isClosed() { + return ErrConnClosed } + err := internal.WriteN(socket.conn, frame.Bytes()) if atomic.AddInt64(&c.state, -1) == 0 { c.doClose() } + return err } // Broadcast 广播 @@ -179,13 +182,14 @@ func (c *Broadcaster) writeAsync(socket *Conn, buffer *bytes.Buffer) { func (c *Broadcaster) Broadcast(socket *Conn) error { var idx = internal.SelectValue(socket.compressEnabled, 1, 0) var msg = c.msgs[idx] + msg.once.Do(func() { msg.frame, msg.err = socket.genFrame(c.opcode, c.payload) }) if msg.err != nil { return msg.err } atomic.AddInt64(&c.state, 1) - var job = &asyncJob{socket: socket, frame: msg.frame, execute: c.writeAsync} + var job = &asyncJob{socket: socket, frame: msg.frame, execute: c.writeAsyncFunc} socket.writeQueue.Push(job) return nil } diff --git a/writer_test.go b/writer_test.go index 70434995..4f4dd392 100644 --- a/writer_test.go +++ b/writer_test.go @@ -102,7 +102,7 @@ func TestWriteClose(t *testing.T) { } func TestConn_WriteAsyncError(t *testing.T) { - t.Run("", func(t *testing.T) { + t.Run("write async", func(t *testing.T) { var serverHandler = new(webSocketMocker) var clientHandler = new(webSocketMocker) var serverOption = &ServerOption{} @@ -257,6 +257,28 @@ func TestNewBroadcaster(t *testing.T) { b.Close() handler.wg.Wait() }) + + t.Run("conn closed", func(t *testing.T) { + var serverHandler = new(webSocketMocker) + var clientHandler = new(webSocketMocker) + var serverOption = &ServerOption{} + var clientOption = &ClientOption{} + var wg = &sync.WaitGroup{} + wg.Add(1) + + serverHandler.onClose = func(socket *Conn, err error) { + as.Error(err) + wg.Done() + } + server, client := newPeer(serverHandler, serverOption, clientHandler, clientOption) + go server.ReadLoop() + go client.ReadLoop() + + _ = server.conn.Close() + var broadcaster = NewBroadcaster(OpcodeText, internal.AlphabetNumeric.Generate(16)) + _ = broadcaster.Broadcast(server) + wg.Wait() + }) } type broadcastHandler struct {