Skip to content

Commit

Permalink
1. seperate timer package
Browse files Browse the repository at this point in the history
2. optimize nbhttp conns holder
3. expose ReadAndGetConn(used for UDP OnRead)
  • Loading branch information
lesismal committed Sep 14, 2022
1 parent 7d356ba commit cb1a3bf
Show file tree
Hide file tree
Showing 14 changed files with 552 additions and 415 deletions.
10 changes: 6 additions & 4 deletions conn_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"net"
"sync"
"time"

"github.com/lesismal/nbio/timer"
)

// Conn wraps net.Conn
Expand All @@ -27,7 +29,7 @@ type Conn struct {
conn net.Conn
connUDP *udpConn

rTimer *htimer
rTimer *timer.Item

typ ConnType
closed bool
Expand Down Expand Up @@ -324,7 +326,7 @@ func (c *Conn) SetDeadline(t time.Time) error {
// SetReadDeadline wraps net.Conn.SetReadDeadline
func (c *Conn) SetReadDeadline(t time.Time) error {
if t.IsZero() {
t = time.Now().Add(timeForever)
t = time.Now().Add(timer.TimeForever)
}

if c.typ == ConnTypeTCP {
Expand All @@ -333,7 +335,7 @@ func (c *Conn) SetReadDeadline(t time.Time) error {

timeout := t.Sub(time.Now())
if c.rTimer == nil {
c.rTimer = c.p.g.afterFunc(timeout, func() {
c.rTimer = c.p.g.AfterFunc(timeout, func() {
c.CloseWithError(errReadTimeout)
})
} else {
Expand All @@ -350,7 +352,7 @@ func (c *Conn) SetWriteDeadline(t time.Time) error {
}

if t.IsZero() {
t = time.Now().Add(timeForever)
t = time.Now().Add(timer.TimeForever)
}

return c.conn.SetWriteDeadline(t)
Expand Down
37 changes: 20 additions & 17 deletions conn_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/lesismal/nbio/mempool"
"github.com/lesismal/nbio/timer"
)

// Conn implements net.Conn.
Expand All @@ -29,8 +30,8 @@ type Conn struct {

connUDP *udpConn

rTimer *htimer
wTimer *htimer
rTimer *timer.Item
wTimer *timer.Item

writeBuffer []byte

Expand Down Expand Up @@ -78,10 +79,11 @@ func (c *Conn) Read(b []byte) (int, error) {

// ReadUDP .
func (c *Conn) ReadUDP(b []byte) (*Conn, int, error) {
return c.readAndGetConn(b)
return c.ReadAndGetConn(b)
}

func (c *Conn) readAndGetConn(b []byte) (*Conn, int, error) {
// ReadAndGetConn .
func (c *Conn) ReadAndGetConn(b []byte) (*Conn, int, error) {
// use lock to prevent multiple conn data confusion when fd is reused on unix.
c.mux.Lock()
if c.closed {
Expand Down Expand Up @@ -255,12 +257,12 @@ func (c *Conn) SetDeadline(t time.Time) error {
g := c.p.g
now := time.Now()
if c.rTimer == nil {
c.rTimer = g.afterFunc(t.Sub(now), func() { c.closeWithError(errReadTimeout) })
c.rTimer = g.AfterFunc(t.Sub(now), func() { c.closeWithError(errReadTimeout) })
} else {
c.rTimer.Reset(t.Sub(now))
}
if c.wTimer == nil {
c.wTimer = g.afterFunc(t.Sub(now), func() { c.closeWithError(errWriteTimeout) })
c.wTimer = g.AfterFunc(t.Sub(now), func() { c.closeWithError(errWriteTimeout) })
} else {
c.wTimer.Reset(t.Sub(now))
}
Expand All @@ -279,7 +281,7 @@ func (c *Conn) SetDeadline(t time.Time) error {
return nil
}

func (c *Conn) setDeadline(timer **htimer, returnErr error, t time.Time) error {
func (c *Conn) setDeadline(timer **timer.Item, returnErr error, t time.Time) error {
c.mux.Lock()
defer c.mux.Unlock()
if c.closed {
Expand All @@ -288,7 +290,7 @@ func (c *Conn) setDeadline(timer **htimer, returnErr error, t time.Time) error {
if !t.IsZero() {
now := time.Now()
if *timer == nil {
*timer = c.p.g.afterFunc(t.Sub(now), func() { c.closeWithError(returnErr) })
*timer = c.p.g.AfterFunc(t.Sub(now), func() { c.closeWithError(returnErr) })
} else {
(*timer).Reset(t.Sub(now))
}
Expand Down Expand Up @@ -529,6 +531,16 @@ func (c *Conn) closeWithError(err error) error {
c.mux.Lock()
if !c.closed {
c.closed = true

if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
if c.rTimer != nil {
c.rTimer.Stop()
c.rTimer = nil
}

c.mux.Unlock()
return c.closeWithErrorWithoutLock(err)
}
Expand All @@ -539,15 +551,6 @@ func (c *Conn) closeWithError(err error) error {
func (c *Conn) closeWithErrorWithoutLock(err error) error {
c.closeErr = err

if c.wTimer != nil {
c.wTimer.Stop()
c.wTimer = nil
}
if c.rTimer != nil {
c.rTimer.Stop()
c.rTimer = nil
}

if c.writeBuffer != nil {
mempool.Free(c.writeBuffer)
c.writeBuffer = nil
Expand Down
180 changes: 11 additions & 169 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
package nbio

import (
"container/heap"
"context"
"net"
"runtime"
"sync"
"time"
"unsafe"

"github.com/lesismal/nbio/logging"
"github.com/lesismal/nbio/timer"
)

const (
Expand Down Expand Up @@ -74,6 +72,9 @@ type Config struct {

// UDPReadTimeout sets the timeout for udp sessions.
UDPReadTimeout time.Duration

// TimerExecute sets the executor for timer callbacks.
TimerExecute func(f func())
}

// Gopher keeps old type to compatible with new name Engine.
Expand All @@ -85,11 +86,13 @@ func NewGopher(conf Config) *Gopher {

// Engine is a manager of poller.
type Engine struct {
*timer.Timer
sync.WaitGroup

Name string

Execute func(f func())
Execute func(f func())
TimerExecute func(f func())

mux sync.Mutex

Expand Down Expand Up @@ -123,12 +126,6 @@ type Engine struct {
afterRead func(c *Conn)
beforeWrite func(c *Conn)
onStop func()

callings []func()
chCalling chan struct{}
timers timerHeap
trigger *time.Timer
chTimer chan struct{}
}

// Stop closes listeners/pollers/conns/timer.
Expand All @@ -147,15 +144,15 @@ func (g *Engine) Stop() {
for c := range conns {
if c != nil {
cc := c
g.atOnce(func() {
g.Async(func() {
cc.Close()
})
}
}
for _, c := range connsUnix {
if c != nil {
cc := c
g.atOnce(func() {
g.Async(func() {
cc.Close()
})
}
Expand All @@ -166,8 +163,7 @@ func (g *Engine) Stop() {

g.onStop()

g.trigger.Stop()
close(g.chTimer)
g.Timer.Stop()

for i := 0; i < g.pollerNum; i++ {
g.pollers[i].stop()
Expand Down Expand Up @@ -221,7 +217,7 @@ func (g *Engine) OnClose(h func(c *Conn, err error)) {
panic("invalid nil handler")
}
g.onCloseOnNoRace(func(c *Conn, err error) {
// g.atOnce(func() {
// g.Async(func() {
defer g.wgConn.Done()
h(c, err)
// })
Expand Down Expand Up @@ -300,160 +296,6 @@ func (g *Engine) OnStop(h func()) {
g.onStopOnNoRace(h)
}

// After used as time.After.
func (g *Engine) After(timeout time.Duration) <-chan time.Time {
c := make(chan time.Time, 1)
g.afterFunc(timeout, func() {
c <- time.Now()
})
return c
}

// AfterFunc used as time.AfterFunc.
func (g *Engine) AfterFunc(timeout time.Duration, f func()) *Timer {
ht := g.afterFunc(timeout, f)
return &Timer{htimer: ht}
}

func (g *Engine) atOnce(f func()) {
if f != nil {
g.mux.Lock()
g.callings = append(g.callings, f)
g.mux.Unlock()
select {
case g.chCalling <- struct{}{}:
default:
}
}
}

func (g *Engine) afterFunc(timeout time.Duration, f func()) *htimer {
g.mux.Lock()

now := time.Now()
it := &htimer{
index: len(g.timers),
expire: now.Add(timeout),
f: f,
parent: g,
}

heap.Push(&g.timers, it)
if g.timers[0] == it {
g.trigger.Reset(timeout)
}

g.mux.Unlock()

return it
}

func (g *Engine) removeTimer(it *htimer) {
g.mux.Lock()
defer g.mux.Unlock()

index := it.index
if index < 0 || index >= len(g.timers) {
return
}

if g.timers[index] == it {
heap.Remove(&g.timers, index)
if len(g.timers) > 0 {
if index == 0 {
g.trigger.Reset(time.Until(g.timers[0].expire))
}
} else {
g.trigger.Reset(timeForever)
}
}
}

// ResetTimer removes a timer.
func (g *Engine) resetTimer(it *htimer) {
g.mux.Lock()
defer g.mux.Unlock()

index := it.index
if index < 0 || index >= len(g.timers) {
return
}

if g.timers[index] == it {
heap.Fix(&g.timers, index)
if index == 0 || it.index == 0 {
g.trigger.Reset(time.Until(g.timers[0].expire))
}
}
}

func (g *Engine) timerLoop() {
defer g.Done()
logging.Debug("NBIO[%v] timer start", g.Name)
defer logging.Debug("NBIO[%v] timer stopped", g.Name)
for {
select {
case <-g.chCalling:
for {
g.mux.Lock()
if len(g.callings) == 0 {
g.callings = nil
g.mux.Unlock()
break
}
f := g.callings[0]
g.callings = g.callings[1:]
g.mux.Unlock()
func() {
defer func() {
err := recover()
if err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("NBIO[%v] exec call failed: %v\n%v\n", g.Name, err, *(*string)(unsafe.Pointer(&buf)))
}
}()
f()
}()
}
case <-g.trigger.C:
for {
g.mux.Lock()
if g.timers.Len() == 0 {
g.trigger.Reset(timeForever)
g.mux.Unlock()
break
}
now := time.Now()
it := g.timers[0]
if now.After(it.expire) {
heap.Remove(&g.timers, it.index)
g.mux.Unlock()
func() {
defer func() {
err := recover()
if err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
logging.Error("NBIO[%v] exec timer failed: %v\n%v\n", g.Name, err, *(*string)(unsafe.Pointer(&buf)))
}
}()
it.f()
}()
} else {
g.trigger.Reset(it.expire.Sub(now))
g.mux.Unlock()
break
}
}
case <-g.chTimer:
return
}
}
}

// PollerBuffer returns Poller's buffer by Conn, can be used on linux/bsd.
func (g *Engine) PollerBuffer(c *Conn) []byte {
return noRaceGetReadBufferFromPoller(c)
Expand Down
Loading

0 comments on commit cb1a3bf

Please sign in to comment.