Skip to content

Commit

Permalink
Merge pull request #317 from lesismal/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
lesismal authored Jun 14, 2023
2 parents 46de564 + 68b52b0 commit 9d2ae1d
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 402 deletions.
2 changes: 1 addition & 1 deletion conn_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Conn struct {
conn net.Conn
connUDP *udpConn

rTimer *timer.Item
rTimer *time.Timer

typ ConnType
closed bool
Expand Down
24 changes: 11 additions & 13 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/lesismal/nbio/mempool"
"github.com/lesismal/nbio/timer"
)

// Conn implements net.Conn.
Expand All @@ -30,8 +29,8 @@ type Conn struct {

connUDP *udpConn

rTimer *timer.Item
wTimer *timer.Item
rTimer *time.Timer
wTimer *time.Timer

writeBuffer []byte

Expand Down Expand Up @@ -253,16 +252,15 @@ func (c *Conn) SetDeadline(t time.Time) error {
if !c.closed {
if !t.IsZero() {
g := c.p.g
now := time.Now()
if c.rTimer == nil {
c.rTimer = g.AfterFunc(t.Sub(now), func() { c.closeWithError(errReadTimeout) })
c.rTimer = g.AfterFunc(time.Until(t), func() { c.closeWithError(errReadTimeout) })
} else {
c.rTimer.Reset(t.Sub(now))
c.rTimer.Reset(time.Until(t))
}
if c.wTimer == nil {
c.wTimer = g.AfterFunc(t.Sub(now), func() { c.closeWithError(errWriteTimeout) })
c.wTimer = g.AfterFunc(time.Until(t), func() { c.closeWithError(errWriteTimeout) })
} else {
c.wTimer.Reset(t.Sub(now))
c.wTimer.Reset(time.Until(t))
}
} else {
if c.rTimer != nil {
Expand All @@ -279,17 +277,17 @@ func (c *Conn) SetDeadline(t time.Time) error {
return nil
}

func (c *Conn) setDeadline(timer **timer.Item, returnErr error, t time.Time) error {
func (c *Conn) setDeadline(timer **time.Timer, returnErr error, t time.Time) error {
c.mux.Lock()
defer c.mux.Unlock()
if c.closed {
return nil
}
if !t.IsZero() {
if *timer == nil {
*timer = c.p.g.UntilFunc(t, func() { c.closeWithError(returnErr) })
*timer = c.p.g.AfterFunc(time.Until(t), func() { c.closeWithError(returnErr) })
} else {
(*timer).ResetUntil(t)
(*timer).Reset(time.Until(t))
}
} else if *timer != nil {
(*timer).Stop()
Expand Down Expand Up @@ -387,7 +385,7 @@ func (c *Conn) write(b []byte) (int, error) {
}

if c.overflow(len(b)) {
return -1, syscall.EINVAL
return -1, errOverflow
}

if len(c.writeBuffer) == 0 {
Expand Down Expand Up @@ -469,7 +467,7 @@ func (c *Conn) writev(in [][]byte) (int, error) {
size += len(v)
}
if c.overflow(size) {
return -1, syscall.EINVAL
return -1, errOverflow
}
if len(c.writeBuffer) > 0 {
for _, v := range in {
Expand Down
10 changes: 5 additions & 5 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Config struct {
// NPoller represents poller goroutine num, it's set to runtime.NumCPU() by default.
NPoller int

// ReadBufferSize represents buffer size for reading, it's set to 16k by default.
// ReadBufferSize represents buffer size for reading, it's set to 64k by default.
ReadBufferSize int

// MaxWriteBufferSize represents max write buffer size for Conn, it's set to 1m by default.
Expand Down Expand Up @@ -233,10 +233,10 @@ func (g *Engine) OnClose(h func(c *Conn, err error)) {
panic("invalid nil handler")
}
g.onClose = func(c *Conn, err error) {
// g.Async(func() {
defer g.wgConn.Done()
h(c, err)
// })
g.Async(func() {
defer g.wgConn.Done()
h(c, err)
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion engine_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (g *Engine) Start() error {
}
}

g.Timer.Start()
// g.Timer.Start()

if len(g.addrs) == 0 {
logging.Info("NBIO[%v] start", g.Name)
Expand Down
1 change: 1 addition & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ import (
var (
errReadTimeout = errors.New("read timeout")
errWriteTimeout = errors.New("write timeout")
errOverflow = errors.New("write overflow")
)
3 changes: 2 additions & 1 deletion logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package logging

import (
"fmt"
"io"
"os"
"time"
)
Expand All @@ -15,7 +16,7 @@ var (
TimeFormat = "2006/01/02 15:04:05.000"

// Output is used to receive log output.
Output = os.Stdout
Output io.Writer = os.Stdout

// DefaultLogger is the default logger and is used by arpc.
DefaultLogger Logger = &logger{level: LevelInfo}
Expand Down
31 changes: 21 additions & 10 deletions nbhttp/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package nbhttp
import (
"context"
"errors"
"io"
"net"
"net/http"
"runtime"
Expand Down Expand Up @@ -53,7 +54,7 @@ const (
// DefaultKeepaliveTime .
DefaultKeepaliveTime = time.Second * 120

// DefaultBlockingReadBufferSize sets to 4k(<= goroutine stack size).
// DefaultBlockingReadBufferSize sets to 4k.
DefaultBlockingReadBufferSize = 1024 * 4
)

Expand Down Expand Up @@ -116,7 +117,7 @@ type Config struct {
// ReadLimit represents the max size for parser reading, it's set to 64M by default.
ReadLimit int

// ReadBufferSize represents buffer size for reading, it's set to 32k by default.
// ReadBufferSize represents buffer size for reading, it's set to 64k by default.
ReadBufferSize int

// MaxWriteBufferSize represents max write buffer size for Conn, it's set to 1m by default.
Expand Down Expand Up @@ -200,15 +201,10 @@ type Config struct {
ReadBufferPool mempool.Allocator

// WebsocketCompressor .
WebsocketCompressor func() interface {
Compress([]byte) []byte
Close()
}
WebsocketCompressor func(w io.WriteCloser, level int) io.WriteCloser

// WebsocketDecompressor .
WebsocketDecompressor func() interface {
Decompress([]byte) ([]byte, error)
Close()
}
WebsocketDecompressor func(r io.Reader) io.ReadCloser
}

// Engine .
Expand Down Expand Up @@ -546,13 +542,15 @@ func (engine *Engine) AddTransferredConn(nbc *nbio.Conn) error {
key, err := conn2Array(nbc)
if err != nil {
nbc.Close()
logging.Error("AddTransferredConn failed: %v", err)
return err
}

engine.mux.Lock()
if len(engine.conns) >= engine.MaxLoad {
engine.mux.Unlock()
nbc.Close()
logging.Error("AddTransferredConn failed: overload, already has %v online", engine.MaxLoad)
return ErrServiceOverload
}
engine.conns[key] = struct{}{}
Expand All @@ -567,6 +565,7 @@ func (engine *Engine) AddConnNonTLSNonBlocking(c net.Conn, tlsConfig *tls.Config
nbc, err := nbio.NBConn(c)
if err != nil {
c.Close()
logging.Error("AddConnNonTLSNonBlocking failed: %v", err)
return
}
if nbc.Session() != nil {
Expand All @@ -576,13 +575,15 @@ func (engine *Engine) AddConnNonTLSNonBlocking(c net.Conn, tlsConfig *tls.Config
key, err := conn2Array(nbc)
if err != nil {
nbc.Close()
logging.Error("AddConnNonTLSNonBlocking failed: %v", err)
return
}

engine.mux.Lock()
if len(engine.conns) >= engine.MaxLoad {
engine.mux.Unlock()
nbc.Close()
logging.Error("AddConnNonTLSNonBlocking failed: overload, already has %v online", engine.MaxLoad)
return
}
engine.conns[key] = struct{}{}
Expand All @@ -606,6 +607,7 @@ func (engine *Engine) AddConnNonTLSBlocking(conn net.Conn, tlsConfig *tls.Config
engine.mux.Lock()
if len(engine.conns) >= engine.MaxLoad {
engine.mux.Unlock()
logging.Error("AddConnNonTLSBlocking failed: overload, already has %v online", engine.MaxLoad)
conn.Close()
decrease()
return
Expand All @@ -617,13 +619,15 @@ func (engine *Engine) AddConnNonTLSBlocking(conn net.Conn, tlsConfig *tls.Config
engine.mux.Unlock()
conn.Close()
decrease()
logging.Error("AddConnNonTLSBlocking failed: %v", err)
return
}
engine.conns[key] = struct{}{}
default:
engine.mux.Unlock()
conn.Close()
decrease()
logging.Error("AddConnNonTLSBlocking failed: unknown conn type: %v", vt)
return
}
engine.mux.Unlock()
Expand All @@ -641,22 +645,26 @@ func (engine *Engine) AddConnTLSNonBlocking(conn net.Conn, tlsConfig *tls.Config
nbc, err := nbio.NBConn(conn)
if err != nil {
conn.Close()
logging.Error("AddConnTLSNonBlocking failed: %v", err)
return
}
if nbc.Session() != nil {
nbc.Close()
logging.Error("AddConnTLSNonBlocking failed: session should not be nil")
return
}
key, err := conn2Array(nbc)
if err != nil {
nbc.Close()
logging.Error("AddConnTLSNonBlocking failed: %v", err)
return
}

engine.mux.Lock()
if len(engine.conns) >= engine.MaxLoad {
engine.mux.Unlock()
nbc.Close()
logging.Error("AddConnTLSNonBlocking failed: overload, already has %v online", engine.MaxLoad)
return
}

Expand Down Expand Up @@ -689,6 +697,7 @@ func (engine *Engine) AddConnTLSBlocking(conn net.Conn, tlsConfig *tls.Config, d
engine.mux.Unlock()
conn.Close()
decrease()
logging.Error("AddConnTLSBlocking failed: overload, already has %v online", engine.MaxLoad)
return
}

Expand All @@ -699,13 +708,15 @@ func (engine *Engine) AddConnTLSBlocking(conn net.Conn, tlsConfig *tls.Config, d
engine.mux.Unlock()
conn.Close()
decrease()
logging.Error("AddConnTLSBlocking failed: %v", err)
return
}
engine.conns[key] = struct{}{}
default:
engine.mux.Unlock()
conn.Close()
decrease()
logging.Error("AddConnTLSBlocking unknown conn type: %v", vt)
return
}
engine.mux.Unlock()
Expand Down
Loading

0 comments on commit 9d2ae1d

Please sign in to comment.