Skip to content

Commit

Permalink
Use new atomic types from Go 1.19
Browse files Browse the repository at this point in the history
This is a cleaner solution for the fix in #438 thanks to the fact that Go 1.19 now is the default and the atomic.Int64 types are automatically aligned correctly on 32 bit systems.

Using this also means that xsync.Int64 can be removed. The new atomic.Int64 type solves the issue and should be quite a lot faster as it avoids the interface conversion.
  • Loading branch information
Jacalz committed Apr 9, 2024
1 parent e87d61a commit afe94af
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 51 deletions.
4 changes: 2 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Conn struct {
closeMu sync.Mutex
closing bool

pingCounter int32
pingCounter atomic.Int32
activePingsMu sync.Mutex
activePings map[string]chan<- struct{}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (c *Conn) flate() bool {
//
// TCP Keepalives should suffice for most use cases.
func (c *Conn) Ping(ctx context.Context) error {
p := atomic.AddInt32(&c.pingCounter, 1)
p := c.pingCounter.Add(1)

err := c.ping(ctx, strconv.Itoa(int(p)))
if err != nil {
Expand Down
23 changes: 0 additions & 23 deletions internal/xsync/int64.go

This file was deleted.

41 changes: 19 additions & 22 deletions netconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NetConn(ctx context.Context, c *Conn, msgType MessageType) net.Conn {
defer nc.writeMu.unlock()

// Prevents future writes from writing until the deadline is reset.
atomic.StoreInt64(&nc.writeExpired, 1)
nc.writeExpired.Store(1)
})
if !nc.writeTimer.Stop() {
<-nc.writeTimer.C
Expand All @@ -84,7 +84,7 @@ func NetConn(ctx context.Context, c *Conn, msgType MessageType) net.Conn {
defer nc.readMu.unlock()

// Prevents future reads from reading until the deadline is reset.
atomic.StoreInt64(&nc.readExpired, 1)
nc.readExpired.Store(1)
})
if !nc.readTimer.Stop() {
<-nc.readTimer.C
Expand All @@ -94,25 +94,22 @@ func NetConn(ctx context.Context, c *Conn, msgType MessageType) net.Conn {
}

type netConn struct {
// These must be first to be aligned on 32 bit platforms.
// https://github.com/nhooyr/websocket/pull/438
readExpired int64
writeExpired int64

c *Conn
msgType MessageType

writeTimer *time.Timer
writeMu *mu
writeCtx context.Context
writeCancel context.CancelFunc

readTimer *time.Timer
readMu *mu
readCtx context.Context
readCancel context.CancelFunc
readEOFed bool
reader io.Reader
writeTimer *time.Timer
writeMu *mu
writeExpired atomic.Int64
writeCtx context.Context
writeCancel context.CancelFunc

readTimer *time.Timer
readMu *mu
readExpired atomic.Int64
readCtx context.Context
readCancel context.CancelFunc
readEOFed bool
reader io.Reader
}

var _ net.Conn = &netConn{}
Expand All @@ -129,7 +126,7 @@ func (nc *netConn) Write(p []byte) (int, error) {
nc.writeMu.forceLock()
defer nc.writeMu.unlock()

if atomic.LoadInt64(&nc.writeExpired) == 1 {
if nc.writeExpired.Load() == 1 {
return 0, fmt.Errorf("failed to write: %w", context.DeadlineExceeded)
}

Expand Down Expand Up @@ -157,7 +154,7 @@ func (nc *netConn) Read(p []byte) (int, error) {
}

func (nc *netConn) read(p []byte) (int, error) {
if atomic.LoadInt64(&nc.readExpired) == 1 {
if nc.readExpired.Load() == 1 {
return 0, fmt.Errorf("failed to read: %w", context.DeadlineExceeded)
}

Expand Down Expand Up @@ -209,7 +206,7 @@ func (nc *netConn) SetDeadline(t time.Time) error {
}

func (nc *netConn) SetWriteDeadline(t time.Time) error {
atomic.StoreInt64(&nc.writeExpired, 0)
nc.writeExpired.Store(0)
if t.IsZero() {
nc.writeTimer.Stop()
} else {
Expand All @@ -223,7 +220,7 @@ func (nc *netConn) SetWriteDeadline(t time.Time) error {
}

func (nc *netConn) SetReadDeadline(t time.Time) error {
atomic.StoreInt64(&nc.readExpired, 0)
nc.readExpired.Store(0)
if t.IsZero() {
nc.readTimer.Stop()
} else {
Expand Down
4 changes: 2 additions & 2 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"io"
"net"
"strings"
"sync/atomic"
"time"

"nhooyr.io/websocket/internal/errd"
"nhooyr.io/websocket/internal/util"
"nhooyr.io/websocket/internal/xsync"
)

// Reader reads from the connection until there is a WebSocket
Expand Down Expand Up @@ -465,7 +465,7 @@ func (mr *msgReader) read(p []byte) (int, error) {
type limitReader struct {
c *Conn
r io.Reader
limit xsync.Int64
limit atomic.Int64
n int64
}

Expand Down
4 changes: 2 additions & 2 deletions ws_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall/js"

"nhooyr.io/websocket/internal/bpool"
"nhooyr.io/websocket/internal/wsjs"
"nhooyr.io/websocket/internal/xsync"
)

// opcode represents a WebSocket opcode.
Expand Down Expand Up @@ -45,7 +45,7 @@ type Conn struct {
ws wsjs.WebSocket

// read limit for a message in bytes.
msgReadLimit xsync.Int64
msgReadLimit atomic.Int64

closeReadMu sync.Mutex
closeReadCtx context.Context
Expand Down

0 comments on commit afe94af

Please sign in to comment.