Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize writev method #76

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ to be processed in a non-blocking way.

- <font size=3>Simplicity and Ease of Use</font>

- **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy.
- **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions.
- **User-Friendly**: Simple and clear `WebSocket` Event API design makes server-client interaction easy.
- **Code Efficiency**: Minimizes the amount of code needed to implement complex WebSocket solutions.

- <font size=3>High-Performance</font>

- **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive
applications.
- **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of ownership.
- **High IOPS Low Latency**: Designed for rapid data transmission and reception, ideal for time-sensitive
applications.
- **Low Memory Usage**: Highly optimized memory multiplexing system to minimize memory usage and reduce your cost of
ownership.

- <font size=3>Reliability and Stability</font>
- **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
- **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 7692`. 99% unit test coverage, covering almost all conditional branches.
- **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
- **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 7692`. Unit test
coverage is almost 100%, covering all conditional branches.

### Benchmark

Expand Down Expand Up @@ -94,7 +96,6 @@ PASS
- [x] Broadcast
- [x] Dial via Proxy
- [x] Context-Takeover
- [x] Zero Allocs Read / Write
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] Concurrent & Asynchronous Non-Blocking Write

Expand Down Expand Up @@ -317,9 +318,12 @@ func WriteWithTimeout(socket *gws.Conn, p []byte, timeout time.Duration) error {

#### Pub / Sub

Use the event_emitter package to implement the publish-subscribe model. Wrap `gws.Conn` in a structure and implement the GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the subscriber, who can only receive messages on the subject of his subscription.
Use the event_emitter package to implement the publish-subscribe model. Wrap `gws.Conn` in a structure and implement the
GetSubscriberID method to get the subscription ID, which must be unique. The subscription ID is used to identify the
subscriber, who can only receive messages on the subject of his subscription.

This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.
This example is useful for building chat rooms or push messages using gws. This means that a user can subscribe to one
or more topics via websocket, and when a message is posted to that topic, all subscribers will receive the message.

```go
package main
Expand Down
3 changes: 1 addition & 2 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ GWS(Go WebSocket)是一个用 Go 编写的非常简单、快速、可靠且

- <font size=3>稳定可靠</font>
- **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行.
- **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 符合 `RFC 7692` 标准. 单元测试覆盖率达到 99%, 几乎覆盖所有条件分支.
- **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 符合 `RFC 7692` 标准. 单元测试覆盖率几乎达到 100%, 覆盖所有条件分支.

### 基准测试

Expand Down Expand Up @@ -86,7 +86,6 @@ PASS
- [x] 广播
- [x] 代理拨号
- [x] 上下文接管
- [x] 读写过程零动态内存分配
- [x] 支持并发和异步非阻塞写入
- [x] 通过所有 Autobahn 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

Expand Down
4 changes: 2 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
conn: &benchConn{},
config: upgrader.option.getConfig(),
}
var buf, _ = conn1.genFrame(OpcodeText, githubData, false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), false)

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down Expand Up @@ -98,7 +98,7 @@ func BenchmarkConn_ReadMessage(b *testing.B) {
deflater: new(deflater),
}
conn1.deflater.initialize(false, conn1.pd)
var buf, _ = conn1.genFrame(OpcodeText, githubData, false)
var buf, _ = conn1.genFrame(OpcodeText, internal.Bytes(githubData), false)

var reader = bytes.NewBuffer(buf.Bytes())
var conn2 = &Conn{
Expand Down
19 changes: 12 additions & 7 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ func (c *deflater) Decompress(src *bytes.Buffer, dict []byte) (*bytes.Buffer, er
}

// Compress 压缩
func (c *deflater) Compress(src []byte, dst *bytes.Buffer, dict []byte) error {
func (c *deflater) Compress(src internal.Payload, dst *bytes.Buffer, dict []byte) error {
c.cpsLocker.Lock()
defer c.cpsLocker.Unlock()

c.cpsWriter.ResetDict(dst, dict)
if err := internal.CheckErrors(internal.WriteN(c.cpsWriter, src), c.cpsWriter.Flush()); err != nil {
if _, err := src.WriteTo(c.cpsWriter); err != nil {
return err
}
if err := c.cpsWriter.Flush(); err != nil {
return err
}
if n := dst.Len(); n >= 4 {
Expand Down Expand Up @@ -116,16 +119,17 @@ func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *s
return c
}

func (c *slideWindow) Write(p []byte) {
func (c *slideWindow) Write(p []byte) (int, error) {
if !c.enabled {
return
return 0, nil
}

var n = len(p)
var total = len(p)
var n = total
var length = len(c.dict)
if n+length <= c.size {
c.dict = append(c.dict, p...)
return
return total, nil
}

if m := c.size - length; m > 0 {
Expand All @@ -136,11 +140,12 @@ func (c *slideWindow) Write(p []byte) {

if n >= c.size {
copy(c.dict, p[n-c.size:])
return
return total, nil
}

copy(c.dict, c.dict[n:])
copy(c.dict[c.size-n:], p)
return total, nil
}

func (c *PermessageDeflate) genRequestHeader() string {
Expand Down
77 changes: 77 additions & 0 deletions compress_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gws

import (
"errors"
"io"
"testing"
"time"

Expand Down Expand Up @@ -181,4 +183,79 @@ func TestPermessageNegotiation(t *testing.T) {
assert.NoError(t, err)
client.WriteMessage(OpcodeText, internal.AlphabetNumeric.Generate(1024))
})

t.Run("ok 5", func(t *testing.T) {
var addr = ":" + nextPort()
var serverHandler = &webSocketMocker{}
serverHandler.onMessage = func(socket *Conn, message *Message) {
println(message.Data.String())
}
var server = NewServer(serverHandler, &ServerOption{PermessageDeflate: PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
ServerMaxWindowBits: 10,
ClientMaxWindowBits: 10,
}})
go server.Run(addr)

time.Sleep(100 * time.Millisecond)
client, _, err := NewClient(new(BuiltinEventHandler), &ClientOption{
Addr: "ws://localhost" + addr,
PermessageDeflate: PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
Threshold: 1,
},
})
assert.NoError(t, err)
_ = client.WriteString("he")
assert.Equal(t, string(client.cpsWindow.dict), "he")
_ = client.WriteString("llo")
assert.Equal(t, string(client.cpsWindow.dict), "hello")
_ = client.WriteV(OpcodeText, []byte(", "), []byte("world!"))
assert.Equal(t, string(client.cpsWindow.dict), "hello, world!")
})

t.Run("fail", func(t *testing.T) {
var addr = ":" + nextPort()
var serverHandler = &webSocketMocker{}
var server = NewServer(serverHandler, &ServerOption{PermessageDeflate: PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
ServerMaxWindowBits: 10,
ClientMaxWindowBits: 10,
}})
go server.Run(addr)

time.Sleep(100 * time.Millisecond)
client, _, err := NewClient(new(BuiltinEventHandler), &ClientOption{
Addr: "ws://localhost" + addr,
PermessageDeflate: PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
ClientContextTakeover: true,
Threshold: 1,
},
})
assert.NoError(t, err)
err = client.doWrite(OpcodeText, new(writerTo))
assert.Equal(t, err.Error(), "1")
})
}

type writerTo struct{}

func (c *writerTo) CheckEncoding(enabled bool, opcode uint8) bool {
return true
}

func (c *writerTo) Len() int {
return 10
}

func (c *writerTo) WriteTo(w io.Writer) (n int64, err error) {
return 0, errors.New("1")
}
17 changes: 5 additions & 12 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"bytes"
"crypto/tls"
"encoding/binary"
"github.com/lxzan/gws/internal"
"net"
"sync"
"sync/atomic"
"time"
"unicode/utf8"

"github.com/lxzan/gws/internal"
)

type Conn struct {
Expand Down Expand Up @@ -91,22 +89,17 @@ func (c *Conn) getDpsDict() []byte {
}

func (c *Conn) isTextValid(opcode Opcode, payload []byte) bool {
if !c.config.CheckUtf8Enabled {
return true
}
switch opcode {
case OpcodeText, OpcodeCloseConnection:
return utf8.Valid(payload)
default:
return true
if c.config.CheckUtf8Enabled {
return internal.CheckEncoding(uint8(opcode), payload)
}
return true
}

func (c *Conn) isClosed() bool { return atomic.LoadUint32(&c.closed) == 1 }

func (c *Conn) close(reason []byte, err error) {
c.err.Store(err)
_ = c.doWrite(OpcodeCloseConnection, reason)
_ = c.doWrite(OpcodeCloseConnection, internal.Bytes(reason))
_ = c.conn.Close()
}

Expand Down
85 changes: 85 additions & 0 deletions internal/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package internal

import (
"io"
"unicode/utf8"
)

// ReadN 精准地读取len(data)个字节, 否则返回错误
func ReadN(reader io.Reader, data []byte) error {
_, err := io.ReadFull(reader, data)
return err
}

func WriteN(writer io.Writer, content []byte) error {
_, err := writer.Write(content)
return err
}

func CheckEncoding(opcode uint8, payload []byte) bool {
switch opcode {
case 1, 8:
return utf8.Valid(payload)
default:
return true
}
}

type Payload interface {
io.WriterTo
Len() int
CheckEncoding(enabled bool, opcode uint8) bool
}

type Buffers [][]byte

func (b Buffers) CheckEncoding(enabled bool, opcode uint8) bool {
if enabled {
for i, _ := range b {
if !CheckEncoding(opcode, b[i]) {
return false
}
}
}
return true
}

func (b Buffers) Len() int {
var sum = 0
for i, _ := range b {
sum += len(b[i])
}
return sum
}

// WriteTo 可重复写
func (b Buffers) WriteTo(w io.Writer) (int64, error) {
var n = 0
for i, _ := range b {
x, err := w.Write(b[i])
n += x
if err != nil {
return int64(n), err
}
}
return int64(n), nil
}

type Bytes []byte

func (b Bytes) CheckEncoding(enabled bool, opcode uint8) bool {
if enabled {
return CheckEncoding(opcode, b)
}
return true
}

func (b Bytes) Len() int {
return len(b)
}

// WriteTo 可重复写
func (b Bytes) WriteTo(w io.Writer) (int64, error) {
n, err := w.Write(b)
return int64(n), err
}
Loading