Skip to content

Commit

Permalink
Merge pull request #75 from lxzan/testing
Browse files Browse the repository at this point in the history
reuse cps/dps slide window
  • Loading branch information
lxzan authored Jan 21, 2024
2 parents 07a1363 + 2eee7f5 commit 0f857d4
Show file tree
Hide file tree
Showing 20 changed files with 248 additions and 89 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ to be processed in a non-blocking way.

- <font size=3>Reliability and Stability</font>
- **Robust Error Handling**: Advanced mechanisms to manage and mitigate errors, ensuring continuous operation.
- **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 6455`. 99% unit test coverage, covering almost all conditional branches.
- **Well-Developed Test Cases**: Passed all `Autobahn` test cases, fully compliant with `RFC 7692`. 99% unit test coverage, covering almost all conditional branches.

### Benchmark

Expand Down Expand Up @@ -95,7 +95,7 @@ PASS
- [x] Dial via Proxy
- [x] Context-Takeover
- [x] Zero Allocs Read / Write
- [x] Passed `Autobahn` Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] Passed Autobahn Test Cases [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] Concurrent & Asynchronous Non-Blocking Write

### Attention
Expand Down Expand Up @@ -152,7 +152,7 @@ const (

func main() {
upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
ReadAsyncEnabled: true, // Parallel message processing
ParallelEnabled: true, // Parallel message processing
Recovery: gws.Recovery, // Exception recovery
PermessageDeflate: gws.PermessageDeflate{Enabled: true}, // Enable compression
})
Expand Down
6 changes: 3 additions & 3 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ GWS(Go WebSocket)是一个用 Go 编写的非常简单、快速、可靠且

- <font size=3>稳定可靠</font>
- **健壮的错误处理**: 管理和减少错误的先进机制,确保持续运行.
- **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 完全符合 `RFC 6455` 标准. 单元测试覆盖率达到 99%, 几乎覆盖所有条件分支.
- **完善的测试用例**: 通过了所有 `Autobahn` 测试用例, 符合 `RFC 7692` 标准. 单元测试覆盖率达到 99%, 几乎覆盖所有条件分支.

### 基准测试

Expand Down Expand Up @@ -88,7 +88,7 @@ PASS
- [x] 上下文接管
- [x] 读写过程零动态内存分配
- [x] 支持并发和异步非阻塞写入
- [x] 通过所有 `Autobahn` 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)
- [x] 通过所有 Autobahn 测试用例 [Server](https://lxzan.github.io/gws/reports/servers/) / [Client](https://lxzan.github.io/gws/reports/clients/)

### 注意

Expand Down Expand Up @@ -145,7 +145,7 @@ const (

func main() {
upgrader := gws.NewUpgrader(&Handler{}, &gws.ServerOption{
ReadAsyncEnabled: true, // 开启并行消息处理
ParallelEnabled: true, // 开启并行消息处理
Recovery: gws.Recovery, // 开启异常恢复
PermessageDeflate: gws.PermessageDeflate{Enabled: true}, // 开启压缩
})
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ func (c *connector) handshake() (*Conn, *http.Response, error) {
closed: 0,
deflater: new(deflater),
writeQueue: workerQueue{maxConcurrency: 1},
readQueue: make(channel, c.option.ReadAsyncGoLimit),
readQueue: make(channel, c.option.ParallelGolimit),
}
if pd.Enabled {
socket.deflater.initialize(false, pd)
if pd.ServerContextTakeover {
socket.dpsWindow.initialize(pd.ServerMaxWindowBits)
socket.dpsWindow.initialize(nil, pd.ServerMaxWindowBits)
}
if pd.ClientContextTakeover {
socket.cpsWindow.initialize(pd.ClientMaxWindowBits)
socket.cpsWindow.initialize(nil, pd.ClientMaxWindowBits)
}
}
return socket, resp, c.conn.SetDeadline(time.Time{})
Expand Down
22 changes: 12 additions & 10 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ func (c *deflater) Compress(src []byte, dst *bytes.Buffer, dict []byte) error {
defer c.cpsLocker.Unlock()

c.cpsWriter.ResetDict(dst, dict)
if err := internal.WriteN(c.cpsWriter, src); err != nil {
return err
}
if err := c.cpsWriter.Flush(); err != nil {
if err := internal.CheckErrors(internal.WriteN(c.cpsWriter, src), c.cpsWriter.Flush()); err != nil {
return err
}
if n := dst.Len(); n >= 4 {
Expand All @@ -108,10 +105,14 @@ type slideWindow struct {
size int
}

func (c *slideWindow) initialize(windowBits int) *slideWindow {
func (c *slideWindow) initialize(pool *internal.Pool[[]byte], windowBits int) *slideWindow {
c.enabled = true
c.size = internal.BinaryPow(windowBits)
c.dict = make([]byte, 0, c.size)
if pool != nil {
c.dict = pool.Get()[:0]
} else {
c.dict = make([]byte, 0, c.size)
}
return c
}

Expand All @@ -127,10 +128,11 @@ func (c *slideWindow) Write(p []byte) {
return
}

var m = c.size - length
c.dict = append(c.dict, p[:m]...)
p = p[m:]
n = len(p)
if m := c.size - length; m > 0 {
c.dict = append(c.dict, p[:m]...)
p = p[m:]
n = len(p)
}

if n >= c.size {
copy(c.dict, p[n-c.size:])
Expand Down
26 changes: 24 additions & 2 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestSlideWindow(t *testing.T) {
t.Run("", func(t *testing.T) {
var sw = new(slideWindow).initialize(3)
var sw = new(slideWindow).initialize(nil, 3)
sw.Write([]byte("abc"))
assert.Equal(t, string(sw.dict), "abc")

Expand All @@ -23,13 +23,35 @@ func TestSlideWindow(t *testing.T) {
})

t.Run("", func(t *testing.T) {
var sw = new(slideWindow).initialize(3)
var sw = new(slideWindow).initialize(nil, 3)
sw.Write([]byte("abc"))
assert.Equal(t, string(sw.dict), "abc")

sw.Write([]byte("defgh123456789"))
assert.Equal(t, string(sw.dict), "23456789")
})

t.Run("", func(t *testing.T) {
const size = 4 * 1024
var sw = slideWindow{enabled: true, size: size}
for i := 0; i < 1000; i++ {
var n = internal.AlphabetNumeric.Intn(100)
sw.Write(internal.AlphabetNumeric.Generate(n))
}
assert.Equal(t, len(sw.dict), size)
})

t.Run("", func(t *testing.T) {
const size = 4 * 1024
for i := 0; i < 10; i++ {
var sw = slideWindow{enabled: true, size: size, dict: make([]byte, internal.AlphabetNumeric.Intn(size))}
for j := 0; j < 1000; j++ {
var n = internal.AlphabetNumeric.Intn(100)
sw.Write(internal.AlphabetNumeric.Generate(n))
}
assert.Equal(t, len(sw.dict), size)
}
})
}

func TestNegotiation(t *testing.T) {
Expand Down
11 changes: 10 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,17 @@ func (c *Conn) ReadLoop() {
// 回收资源
if c.isServer {
c.br.Reset(nil)
c.config.readerPool.Put(c.br)
c.config.brPool.Put(c.br)
c.br = nil

if c.cpsWindow.enabled {
c.config.cswPool.Put(c.cpsWindow.dict)
c.cpsWindow.dict = nil
}
if c.dpsWindow.enabled {
c.config.dswPool.Put(c.dpsWindow.dict)
c.dpsWindow.dict = nil
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions examples/autobahn/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {
})

s2 := gws.NewServer(&Handler{Sync: false}, &gws.ServerOption{
ReadAsyncEnabled: true,
ParallelEnabled: true,
PermessageDeflate: gws.PermessageDeflate{
Enabled: true,
ServerContextTakeover: true,
Expand All @@ -39,7 +39,7 @@ func main() {
})

s4 := gws.NewServer(&Handler{Sync: false}, &gws.ServerOption{
ReadAsyncEnabled: true,
ParallelEnabled: true,
PermessageDeflate: gws.PermessageDeflate{
Enabled: true,
ServerContextTakeover: false,
Expand Down
1 change: 0 additions & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ var (
framePadding = frameHeader{} // 帧头填充物
binaryPool = internal.NewBufferPool() // 缓冲池
defaultLogger = new(stdLogger) // 默认日志工具
callbackFunc = func(err error) {} // 回调函数
)
11 changes: 10 additions & 1 deletion internal/deque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package internal

import (
"container/list"
"github.com/stretchr/testify/assert"
"math/rand"
"testing"
"unsafe"

"github.com/stretchr/testify/assert"
)

type Ordered interface {
Expand Down Expand Up @@ -463,3 +464,11 @@ func TestDeque_Clone(t *testing.T) {
assert.NotEqual(t, addr, addr1)
assert.Equal(t, addr, addr2)
}

func TestDeque_PushFront(t *testing.T) {
var q Deque[int]
q.PushFront(1)
q.PushFront(3)
q.PushFront(5)
assert.Equal(t, q.PopFront(), 5)
}
16 changes: 16 additions & 0 deletions internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ func Min[T int | int64](a, b T) T {
return b
}

func Max[T int | int64](a, b T) T {
if a > b {
return a
}
return b
}

func IsSameSlice[T comparable](a, b []T) bool {
if len(a) != len(b) {
return false
Expand All @@ -243,3 +250,12 @@ func IsSameSlice[T comparable](a, b []T) bool {
}
return true
}

func CheckErrors(errs ...error) error {
for _, item := range errs {
if item != nil {
return item
}
}
return nil
}
15 changes: 15 additions & 0 deletions internal/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"hash/fnv"
"io"
"net/http"
Expand Down Expand Up @@ -249,6 +250,11 @@ func TestMin(t *testing.T) {
assert.Equal(t, Min(4, 3), 3)
}

func TestMax(t *testing.T) {
assert.Equal(t, Max(1, 2), 2)
assert.Equal(t, Max(4, 3), 4)
}

func TestIsSameSlice(t *testing.T) {
assert.True(t, IsSameSlice(
[]int{1, 2, 3},
Expand All @@ -265,3 +271,12 @@ func TestIsSameSlice(t *testing.T) {
[]int{1, 2, 4},
))
}

func TestCheckErrors(t *testing.T) {
var err0 error
var err1 error
var err2 = errors.New("1")
assert.NoError(t, CheckErrors(err0, err1))
assert.Error(t, CheckErrors(err0, err1, err2))
assert.True(t, errors.Is(CheckErrors(err0, err1, err2), err2))
}
Loading

0 comments on commit 0f857d4

Please sign in to comment.