diff --git a/datachannel.go b/datachannel.go index 218ebfbec08..8c8870084fa 100644 --- a/datachannel.go +++ b/datachannel.go @@ -446,7 +446,7 @@ func (d *DataChannel) ensureOpen() error { // Please refer to the data-channels-detach example and the // pion/datachannel documentation for the correct way to handle the // resulting DataChannel object. -func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) { +func (d *DataChannel) Detach() (datachannel.ReadWriteCloserDeadliner, error) { d.mu.Lock() if !d.api.settingEngine.detach.DataChannels { diff --git a/go.mod b/go.mod index 15965bddb4c..7c916426ef4 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/pion/webrtc/v4 go 1.20 require ( - github.com/pion/datachannel v1.5.9 + github.com/pion/datachannel v1.5.10 github.com/pion/dtls/v3 v3.0.4 github.com/pion/ice/v4 v4.0.3 github.com/pion/interceptor v0.1.37 @@ -11,7 +11,7 @@ require ( github.com/pion/randutil v0.1.0 github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.9 - github.com/pion/sctp v1.8.34 + github.com/pion/sctp v1.8.35 github.com/pion/sdp/v3 v3.0.9 github.com/pion/srtp/v3 v3.0.4 github.com/pion/stun/v3 v3.0.0 diff --git a/go.sum b/go.sum index 03fc40f741a..8ac0572ce6f 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/pion/datachannel v1.5.9 h1:LpIWAOYPyDrXtU+BW7X0Yt/vGtYxtXQ8ql7dFfYUVZA= -github.com/pion/datachannel v1.5.9/go.mod h1:kDUuk4CU4Uxp82NH4LQZbISULkX/HtzKa4P7ldf9izE= +github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o= +github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M= github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= github.com/pion/ice/v4 v4.0.3 h1:9s5rI1WKzF5DRqhJ+Id8bls/8PzM7mau0mj1WZb4IXE= @@ -53,8 +53,8 @@ github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE= github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= github.com/pion/rtp v1.8.9 h1:E2HX740TZKaqdcPmf4pw6ZZuG8u5RlMMt+l3dxeu6Wk= github.com/pion/rtp v1.8.9/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= -github.com/pion/sctp v1.8.34 h1:rCuD3m53i0oGxCSp7FLQKvqVx0Nf5AUAHhMRXTTQjBc= -github.com/pion/sctp v1.8.34/go.mod h1:yWkCClkXlzVW7BXfI2PjrUGBwUI0CjXJBkhLt+sdo4U= +github.com/pion/sctp v1.8.35 h1:qwtKvNK1Wc5tHMIYgTDJhfZk7vATGVHhXbUDfHbYwzA= +github.com/pion/sctp v1.8.35/go.mod h1:EcXP8zCYVTRy3W9xtOF7wJm1L1aXfKRQzaM33SjQlzg= github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY= github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M= github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= diff --git a/peerconnection.go b/peerconnection.go index 5ac160c9afd..9db31f560ab 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1483,7 +1483,7 @@ func (pc *PeerConnection) startSCTP() { // Start sctp if err := pc.sctpTransport.Start(SCTPCapabilities{ MaxMessageSize: 0, - }); err != nil { + }, WithSCTPBlockWrite(pc.api.settingEngine.detach.DataChannels && pc.api.settingEngine.dataChannelBlockWrite)); err != nil { pc.log.Warnf("Failed to start SCTP: %s", err) if err = pc.sctpTransport.Stop(); err != nil { pc.log.Warnf("Failed to stop SCTPTransport: %s", err) diff --git a/sctptransport.go b/sctptransport.go index 68f40b9079d..bd6ee30b314 100644 --- a/sctptransport.go +++ b/sctptransport.go @@ -95,15 +95,33 @@ func (r *SCTPTransport) GetCapabilities() SCTPCapabilities { } } +type SCTPOptions struct { + blockWrite bool +} + +type SCTPOption func(*SCTPOptions) + +// WithSCTPBlockWrite sets the blockWrite option for the SCTPTransport. +func WithSCTPBlockWrite(blockWrite bool) SCTPOption { + return func(o *SCTPOptions) { + o.blockWrite = blockWrite + } +} + // Start the SCTPTransport. Since both local and remote parties must mutually // create an SCTPTransport, SCTP SO (Simultaneous Open) is used to establish // a connection over SCTP. -func (r *SCTPTransport) Start(SCTPCapabilities) error { +func (r *SCTPTransport) Start(_ SCTPCapabilities, opts ...SCTPOption) error { if r.isStarted { return nil } r.isStarted = true + var options SCTPOptions + for _, o := range opts { + o(&options) + } + dtlsTransport := r.Transport() if dtlsTransport == nil || dtlsTransport.conn == nil { return errSCTPTransportDTLS @@ -114,6 +132,7 @@ func (r *SCTPTransport) Start(SCTPCapabilities) error { EnableZeroChecksum: r.api.settingEngine.sctp.enableZeroChecksum, LoggerFactory: r.api.settingEngine.LoggerFactory, RTOMax: float64(r.api.settingEngine.sctp.rtoMax) / float64(time.Millisecond), + BlockWrite: options.blockWrite, }) if err != nil { return err diff --git a/settingengine.go b/settingengine.go index fb2c40bc5fc..edfb1549bc8 100644 --- a/settingengine.go +++ b/settingengine.go @@ -103,6 +103,7 @@ type SettingEngine struct { iceMaxBindingRequests *uint16 fireOnTrackBeforeFirstRTP bool disableCloseByDTLS bool + dataChannelBlockWrite bool } // getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default @@ -121,6 +122,12 @@ func (e *SettingEngine) DetachDataChannels() { e.detach.DataChannels = true } +// EnableDataChannelBlockWrite allows data channels to block on write, +// it only works if DetachDataChannels is enabled +func (e *SettingEngine) EnableDataChannelBlockWrite(nonblockWrite bool) { + e.dataChannelBlockWrite = nonblockWrite +} + // SetSRTPProtectionProfiles allows the user to override the default SRTP Protection Profiles // The default srtp protection profiles are provided by the function `defaultSrtpProtectionProfiles` func (e *SettingEngine) SetSRTPProtectionProfiles(profiles ...dtls.SRTPProtectionProfile) { diff --git a/settingengine_test.go b/settingengine_test.go index 8b786859fc7..f35451052fa 100644 --- a/settingengine_test.go +++ b/settingengine_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/pion/datachannel" "github.com/pion/dtls/v3/pkg/crypto/elliptic" "github.com/pion/dtls/v3/pkg/protocol/handshake" "github.com/pion/ice/v4" @@ -417,3 +418,45 @@ func TestDisableCloseByDTLS(t *testing.T) { assert.True(t, offer.ConnectionState() == PeerConnectionStateConnected) assert.NoError(t, offer.Close()) } + +func TestEnableDataChannelBlockWrite(t *testing.T) { + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + report := test.CheckRoutines(t) + defer report() + + s := SettingEngine{} + s.DetachDataChannels() + s.EnableDataChannelBlockWrite(true) + s.SetSCTPMaxReceiveBufferSize(1500) + + offer, answer, err := NewAPI(WithSettingEngine(s)).newPair(Configuration{}) + assert.NoError(t, err) + + dc, err := offer.CreateDataChannel("data", nil) + assert.NoError(t, err) + detachChan := make(chan datachannel.ReadWriteCloserDeadliner, 1) + dc.OnOpen(func() { + detached, err1 := dc.Detach() + assert.NoError(t, err1) + detachChan <- detached + }) + + assert.NoError(t, signalPair(offer, answer)) + untilConnectionState(PeerConnectionStateConnected, offer, answer).Wait() + + // write should block and return deadline exceeded since the receiver is not reading + // and the buffer size is 1500 bytes + rawDC := <-detachChan + assert.NoError(t, rawDC.SetWriteDeadline(time.Now().Add(time.Second))) + buf := make([]byte, 1000) + for i := 0; i < 10; i++ { + _, err = rawDC.Write(buf) + if err != nil { + break + } + } + assert.ErrorIs(t, err, context.DeadlineExceeded) + closePairNow(t, offer, answer) +}