Skip to content

Commit

Permalink
Merge branch 'dev' into teamgram2
Browse files Browse the repository at this point in the history
  • Loading branch information
wubenqi committed Nov 9, 2024
2 parents 6aaa49f + bdd3fb6 commit c3181f4
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 53 deletions.
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore,
}
if netDial {
var netConn net.Conn
netConn, err = NetDial(network, addr)
netConn, err = stdDial(network, addr)
require.NoError(t, err)
c, err = cli.EnrollContext(netConn, handler)
} else {
Expand Down
27 changes: 18 additions & 9 deletions internal/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,33 @@ import (
)

// Option is used for setting an option on socket.
type Option struct {
SetSockOpt func(int, int) error
Opt int
type Option[T int | string] struct {
SetSockOpt func(int, T) error
Opt T
}

func execSockOpts[T int | string](fd int, opts []Option[T]) error {
for _, opt := range opts {
if err := opt.SetSockOpt(fd, opt.Opt); err != nil {
return err
}
}
return nil
}

// TCPSocket calls the internal tcpSocket.
func TCPSocket(proto, addr string, passive bool, sockOpts ...Option) (int, net.Addr, error) {
return tcpSocket(proto, addr, passive, sockOpts...)
func TCPSocket(proto, addr string, passive bool, sockOptInts []Option[int], sockOptStrs []Option[string]) (int, net.Addr, error) {
return tcpSocket(proto, addr, passive, sockOptInts, sockOptStrs)
}

// UDPSocket calls the internal udpSocket.
func UDPSocket(proto, addr string, connect bool, sockOpts ...Option) (int, net.Addr, error) {
return udpSocket(proto, addr, connect, sockOpts...)
func UDPSocket(proto, addr string, connect bool, sockOptInts []Option[int], sockOptStrs []Option[string]) (int, net.Addr, error) {
return udpSocket(proto, addr, connect, sockOptInts, sockOptStrs)
}

// UnixSocket calls the internal udsSocket.
func UnixSocket(proto, addr string, passive bool, sockOpts ...Option) (int, net.Addr, error) {
return udsSocket(proto, addr, passive, sockOpts...)
func UnixSocket(proto, addr string, passive bool, sockOptInts []Option[int], sockOptStrs []Option[string]) (int, net.Addr, error) {
return udsSocket(proto, addr, passive, sockOptInts, sockOptStrs)
}

// Accept accepts the next incoming socket along with setting
Expand Down
26 changes: 26 additions & 0 deletions internal/socket/sockopts_bsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2024 The Gnet Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build dragonfly || freebsd || netbsd || openbsd
// +build dragonfly freebsd netbsd openbsd

package socket

import errorx "github.com/panjf2000/gnet/v2/pkg/errors"

// SetBindToDevice is not implemented on *BSD because there is
// no equivalent of Linux's SO_BINDTODEVICE.
func SetBindToDevice(_ int, _ string) error {
return errorx.ErrUnsupportedOp
}
8 changes: 8 additions & 0 deletions internal/socket/sockopts_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"os"

"golang.org/x/sys/unix"

errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

// SetKeepAlivePeriod sets whether the operating system should send
Expand Down Expand Up @@ -52,3 +54,9 @@ func SetKeepAlivePeriod(fd, secs int) error {

return os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, unix.IPPROTO_TCP, unix.TCP_KEEPCNT, 5))
}

// SetBindToDevice is not implemented on macOS because there is
// no equivalent of Linux's SO_BINDTODEVICE.
func SetBindToDevice(_ int, _ string) error {
return errorx.ErrUnsupportedOp
}
30 changes: 30 additions & 0 deletions internal/socket/sockopts_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024 The Gnet Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package socket

import (
"os"

"golang.org/x/sys/unix"
)

// SetBindToDevice binds the socket to a specific network interface.
//
// SO_BINDTODEVICE on Linux works in both directions: only process packets
// received from the particular interface along with sending them through
// that interface, instead of following the default route.
func SetBindToDevice(fd int, ifname string) error {
return os.NewSyscallError("setsockopt", unix.BindToDevice(fd, ifname))
}
4 changes: 2 additions & 2 deletions internal/socket/sockopts_openbsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

package socket

import "golang.org/x/sys/unix"
import errorx "github.com/panjf2000/gnet/v2/pkg/errors"

// SetKeepAlivePeriod sets whether the operating system should send
// keep-alive messages on the connection and sets period between TCP keep-alive probes.
func SetKeepAlivePeriod(_, _ int) error {
// OpenBSD has no user-settable per-socket TCP keepalive options.
return unix.ENOPROTOOPT
return errorx.ErrUnsupportedOp
}
8 changes: 4 additions & 4 deletions internal/socket/sockopts_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func SetNoDelay(fd, noDelay int) error {
// SetRecvBuffer sets the size of the operating system's
// receive buffer associated with the connection.
func SetRecvBuffer(fd, size int) error {
return unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, size)
return os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, size))
}

// SetSendBuffer sets the size of the operating system's
// transmit buffer associated with the connection.
func SetSendBuffer(fd, size int) error {
return unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, size)
return os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, size))
}

// SetReuseAddr enables SO_REUSEADDR option on socket.
Expand All @@ -55,7 +55,7 @@ func SetReuseAddr(fd, reuseAddr int) error {

// SetIPv6Only restricts a IPv6 socket to only process IPv6 requests or both IPv4 and IPv6 requests.
func SetIPv6Only(fd, ipv6only int) error {
return unix.SetsockoptInt(fd, unix.IPPROTO_IPV6, unix.IPV6_V6ONLY, ipv6only)
return os.NewSyscallError("setsockopt", unix.SetsockoptInt(fd, unix.IPPROTO_IPV6, unix.IPV6_V6ONLY, ipv6only))
}

// SetLinger sets the behavior of Close on a connection which still
Expand All @@ -79,7 +79,7 @@ func SetLinger(fd, sec int) error {
l.Onoff = 0
l.Linger = 0
}
return unix.SetsockoptLinger(fd, syscall.SOL_SOCKET, syscall.SO_LINGER, &l)
return os.NewSyscallError("setsockopt", unix.SetsockoptLinger(fd, syscall.SOL_SOCKET, syscall.SO_LINGER, &l))
}

// SetMulticastMembership returns with a socket option function based on the IP
Expand Down
11 changes: 6 additions & 5 deletions internal/socket/tcp_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func determineTCPProto(proto string, addr *net.TCPAddr) (string, error) {

// tcpSocket creates an endpoint for communication and returns a file descriptor that refers to that endpoint.
// Argument `reusePort` indicates whether the SO_REUSEPORT flag will be assigned.
func tcpSocket(proto, addr string, passive bool, sockOpts ...Option) (fd int, netAddr net.Addr, err error) {
func tcpSocket(proto, addr string, passive bool, sockOptInts []Option[int], sockOptStrs []Option[string]) (fd int, netAddr net.Addr, err error) {
var (
family int
ipv6only bool
Expand Down Expand Up @@ -114,10 +114,11 @@ func tcpSocket(proto, addr string, passive bool, sockOpts ...Option) (fd int, ne
}
}

for _, sockOpt := range sockOpts {
if err = sockOpt.SetSockOpt(fd, sockOpt.Opt); err != nil {
return
}
if err = execSockOpts(fd, sockOptInts); err != nil {
return
}
if err = execSockOpts(fd, sockOptStrs); err != nil {
return
}

if passive {
Expand Down
11 changes: 6 additions & 5 deletions internal/socket/udp_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func determineUDPProto(proto string, addr *net.UDPAddr) (string, error) {

// udpSocket creates an endpoint for communication and returns a file descriptor that refers to that endpoint.
// Argument `reusePort` indicates whether the SO_REUSEPORT flag will be assigned.
func udpSocket(proto, addr string, connect bool, sockOpts ...Option) (fd int, netAddr net.Addr, err error) {
func udpSocket(proto, addr string, connect bool, sockOptInts []Option[int], sockOptStrs []Option[string]) (fd int, netAddr net.Addr, err error) {
var (
family int
ipv6only bool
Expand Down Expand Up @@ -117,10 +117,11 @@ func udpSocket(proto, addr string, connect bool, sockOpts ...Option) (fd int, ne
return
}

for _, sockOpt := range sockOpts {
if err = sockOpt.SetSockOpt(fd, sockOpt.Opt); err != nil {
return
}
if err = execSockOpts(fd, sockOptInts); err != nil {
return
}
if err = execSockOpts(fd, sockOptStrs); err != nil {
return
}

if connect {
Expand Down
11 changes: 6 additions & 5 deletions internal/socket/unix_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func GetUnixSockAddr(proto, addr string) (sa unix.Sockaddr, family int, unixAddr

// udsSocket creates an endpoint for communication and returns a file descriptor that refers to that endpoint.
// Argument `reusePort` indicates whether the SO_REUSEPORT flag will be assigned.
func udsSocket(proto, addr string, passive bool, sockOpts ...Option) (fd int, netAddr net.Addr, err error) {
func udsSocket(proto, addr string, passive bool, sockOptInts []Option[int], sockOptStrs []Option[string]) (fd int, netAddr net.Addr, err error) {
var (
family int
sa unix.Sockaddr
Expand All @@ -70,10 +70,11 @@ func udsSocket(proto, addr string, passive bool, sockOpts ...Option) (fd int, ne
}
}()

for _, sockOpt := range sockOpts {
if err = sockOpt.SetSockOpt(fd, sockOpt.Opt); err != nil {
return
}
if err = execSockOpts(fd, sockOptInts); err != nil {
return
}
if err = execSockOpts(fd, sockOptStrs); err != nil {
return
}

if passive {
Expand Down
44 changes: 26 additions & 18 deletions listener_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type listener struct {
fd int
addr net.Addr
address, network string
sockOpts []socket.Option
sockOptInts []socket.Option[int]
sockOptStrs []socket.Option[string]
pollAttachment *netpoll.PollAttachment // listener attachment for poller
}

Expand All @@ -52,14 +53,14 @@ func (ln *listener) dup() (int, error) {
func (ln *listener) normalize() (err error) {
switch ln.network {
case "tcp", "tcp4", "tcp6":
ln.fd, ln.addr, err = socket.TCPSocket(ln.network, ln.address, true, ln.sockOpts...)
ln.fd, ln.addr, err = socket.TCPSocket(ln.network, ln.address, true, ln.sockOptInts, ln.sockOptStrs)
ln.network = "tcp"
case "udp", "udp4", "udp6":
ln.fd, ln.addr, err = socket.UDPSocket(ln.network, ln.address, false, ln.sockOpts...)
ln.fd, ln.addr, err = socket.UDPSocket(ln.network, ln.address, false, ln.sockOptInts, ln.sockOptStrs)
ln.network = "udp"
case "unix":
_ = os.RemoveAll(ln.address)
ln.fd, ln.addr, err = socket.UnixSocket(ln.network, ln.address, true, ln.sockOpts...)
ln.fd, ln.addr, err = socket.UnixSocket(ln.network, ln.address, true, ln.sockOptInts, ln.sockOptStrs)
default:
err = errors.ErrUnsupportedProtocol
}
Expand All @@ -79,37 +80,44 @@ func (ln *listener) close() {
}

func initListener(network, addr string, options *Options) (l *listener, err error) {
var sockOpts []socket.Option
var (
sockOptInts []socket.Option[int]
sockOptStrs []socket.Option[string]
)
if options.ReusePort || strings.HasPrefix(network, "udp") {
sockOpt := socket.Option{SetSockOpt: socket.SetReuseport, Opt: 1}
sockOpts = append(sockOpts, sockOpt)
sockOpt := socket.Option[int]{SetSockOpt: socket.SetReuseport, Opt: 1}
sockOptInts = append(sockOptInts, sockOpt)
}
if options.ReuseAddr {
sockOpt := socket.Option{SetSockOpt: socket.SetReuseAddr, Opt: 1}
sockOpts = append(sockOpts, sockOpt)
sockOpt := socket.Option[int]{SetSockOpt: socket.SetReuseAddr, Opt: 1}
sockOptInts = append(sockOptInts, sockOpt)
}
if options.TCPNoDelay == TCPNoDelay && strings.HasPrefix(network, "tcp") {
sockOpt := socket.Option{SetSockOpt: socket.SetNoDelay, Opt: 1}
sockOpts = append(sockOpts, sockOpt)
sockOpt := socket.Option[int]{SetSockOpt: socket.SetNoDelay, Opt: 1}
sockOptInts = append(sockOptInts, sockOpt)
}
if options.SocketRecvBuffer > 0 {
sockOpt := socket.Option{SetSockOpt: socket.SetRecvBuffer, Opt: options.SocketRecvBuffer}
sockOpts = append(sockOpts, sockOpt)
sockOpt := socket.Option[int]{SetSockOpt: socket.SetRecvBuffer, Opt: options.SocketRecvBuffer}
sockOptInts = append(sockOptInts, sockOpt)
}
if options.SocketSendBuffer > 0 {
sockOpt := socket.Option{SetSockOpt: socket.SetSendBuffer, Opt: options.SocketSendBuffer}
sockOpts = append(sockOpts, sockOpt)
sockOpt := socket.Option[int]{SetSockOpt: socket.SetSendBuffer, Opt: options.SocketSendBuffer}
sockOptInts = append(sockOptInts, sockOpt)
}
if strings.HasPrefix(network, "udp") {
udpAddr, err := net.ResolveUDPAddr(network, addr)
if err == nil && udpAddr.IP.IsMulticast() {
if sockoptFn := socket.SetMulticastMembership(network, udpAddr); sockoptFn != nil {
sockOpt := socket.Option{SetSockOpt: sockoptFn, Opt: options.MulticastInterfaceIndex}
sockOpts = append(sockOpts, sockOpt)
sockOpt := socket.Option[int]{SetSockOpt: sockoptFn, Opt: options.MulticastInterfaceIndex}
sockOptInts = append(sockOptInts, sockOpt)
}
}
}
l = &listener{network: network, address: addr, sockOpts: sockOpts}
if options.BindToDevice != "" {
sockOpt := socket.Option[string]{SetSockOpt: socket.SetBindToDevice, Opt: options.BindToDevice}
sockOptStrs = append(sockOptStrs, sockOpt)
}
l = &listener{network: network, address: addr, sockOptInts: sockOptInts, sockOptStrs: sockOptStrs}
err = l.normalize()
return
}
18 changes: 17 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type Options struct {
// MulticastInterfaceIndex is the index of the interface name where the multicast UDP addresses will be bound to.
MulticastInterfaceIndex int

// BindToDevice is the name of the interface to which the listening socket will be bound.
//
// It is only available on Linux at the moment, an error will therefore be returned when
// setting this option on non-linux platforms.
BindToDevice string

// ============================= Options for both server-side and client-side =============================

// ReadBufferCap is the maximum number of bytes that can be read from the remote when the readable event comes.
Expand Down Expand Up @@ -95,7 +101,7 @@ type Options struct {
// Ticker indicates whether the ticker has been set up.
Ticker bool

// TCPKeepAlive enable the TCP keep-alive mechanism (SO_KEEPALIVE) and set its value
// TCPKeepAlive enables the TCP keep-alive mechanism (SO_KEEPALIVE) and set its value
// on TCP_KEEPIDLE, 1/5 of its value on TCP_KEEPINTVL, and 5 on TCP_KEEPCNT.
TCPKeepAlive time.Duration

Expand Down Expand Up @@ -270,6 +276,16 @@ func WithMulticastInterfaceIndex(idx int) Option {
}
}

// WithBindToDevice sets the name of the interface to which the listening socket will be bound.
//
// It is only available on Linux at the moment, an error will therefore be returned when
// setting this option on non-linux platforms.
func WithBindToDevice(iface string) Option {
return func(opts *Options) {
opts.BindToDevice = iface
}
}

// WithEdgeTriggeredIO enables the edge-triggered I/O for the underlying epoll/kqueue event-loop.
func WithEdgeTriggeredIO(et bool) Option {
return func(opts *Options) {
Expand Down
Loading

0 comments on commit c3181f4

Please sign in to comment.