Skip to content

Commit

Permalink
Fix sender start (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo authored Oct 27, 2020
1 parent cdd5c17 commit aa741aa
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 83 deletions.
9 changes: 9 additions & 0 deletions pkg/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ func (r *router) addSender(p *WebRTCTransport, rr *receiverRouter) error {
log.Errorf("Error closing sender: %s", err)
}
})
for _, t := range p.pc.GetTransceivers() {
if t.Sender() != nil && t.Sender().Track().SSRC() == ssrc {
p.pendingSenders.PushBack(&pendingSender{
transceiver: t,
sender: sender,
})
break
}
}
p.AddSender(rr.stream, sender)
recv.AddSender(sender)
return nil
Expand Down
135 changes: 52 additions & 83 deletions pkg/webrtctransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/gammazero/deque"

"github.com/bep/debounce"
"github.com/lucsky/cuid"
log "github.com/pion/ion-log"
Expand All @@ -24,21 +26,26 @@ type WebRTCTransportConfig struct {
// WebRTCTransport represents a sfu peer connection
type WebRTCTransport struct {
id string
ctx context.Context
cancel context.CancelFunc
pc *webrtc.PeerConnection
me MediaEngine
mu sync.RWMutex
candidates []webrtc.ICECandidateInit
ctx context.Context
cancel context.CancelFunc
router Router
session *Session
mids map[string]Sender
senders map[string][]Sender
router Router
candidates []webrtc.ICECandidateInit
pendingSenders deque.Deque
onTrackHandler func(*webrtc.Track, *webrtc.RTPReceiver)

subOnce sync.Once
}

type pendingSender struct {
transceiver *webrtc.RTPTransceiver
sender Sender
}

// NewWebRTCTransport creates a new WebRTCTransport
func NewWebRTCTransport(ctx context.Context, session *Session, me MediaEngine, cfg WebRTCTransportConfig) (*WebRTCTransport, error) {
api := webrtc.NewAPI(webrtc.WithMediaEngine(me.MediaEngine), webrtc.WithSettingEngine(cfg.setting))
Expand All @@ -59,9 +66,9 @@ func NewWebRTCTransport(ctx context.Context, session *Session, me MediaEngine, c
me: me,
session: session,
router: newRouter(pc, id, cfg.router),
mids: make(map[string]Sender),
senders: make(map[string][]Sender),
}
p.pendingSenders.SetMinCapacity(2)

// Add transport to the session
session.AddTransport(p)
Expand Down Expand Up @@ -124,58 +131,12 @@ func NewWebRTCTransport(ctx context.Context, session *Session, me MediaEngine, c

// CreateOffer generates the localDescription
func (p *WebRTCTransport) CreateOffer() (webrtc.SessionDescription, error) {
offer, err := p.pc.CreateOffer(nil)
if err != nil {
return webrtc.SessionDescription{}, err
}
parsed := sdp.SessionDescription{}
if err := parsed.Unmarshal([]byte(offer.SDP)); err == nil {
for _, md := range parsed.MediaDescriptions {
if md.MediaName.Media != mediaNameAudio && md.MediaName.Media != mediaNameVideo {
continue
}
var msid, mid string

for _, att := range md.Attributes {
switch att.Key {
case sdp.AttrKeyMID:
mid = att.Value
if len(msid) > 0 {
break
}
case sdp.AttrKeyMsid:
msid = att.Value
if len(mid) > 0 {
break
}
}
}
if len(msid) > 0 && len(mid) > 0 {
split := strings.Split(msid, " ")
sid := split[0]
tid := split[1]
// find sender for mid
for _, sender := range p.senders[sid] {
if sender.Track().ID() == tid {
p.mids[mid] = sender
}
}
}
}
}

return offer, nil
return p.pc.CreateOffer(nil)
}

// SetLocalDescription sets the SessionDescription of the remote peer
func (p *WebRTCTransport) SetLocalDescription(desc webrtc.SessionDescription) error {
err := p.pc.SetLocalDescription(desc)
if err != nil {
log.Errorf("SetLocalDescription error: %v", err)
return err
}

return nil
return p.pc.SetLocalDescription(desc)
}

// CreateAnswer generates the localDescription
Expand Down Expand Up @@ -212,42 +173,50 @@ func (p *WebRTCTransport) SetRemoteDescription(desc webrtc.SessionDescription) e
p.candidates = nil
}

for _, md := range pd.MediaDescriptions {
if md.MediaName.Media != mediaNameAudio && md.MediaName.Media != mediaNameVideo {
continue
}
var (
ext int
id string
)

for _, att := range md.Attributes {
if att.Key == sdp.AttrKeyMID {
if p.mids[att.Value] != nil {
p.mids[att.Value].Start()
// remove mid mapping in case transceiver is reused later
p.mids[att.Value] = nil
switch desc.Type {
case webrtc.SDPTypeAnswer:
if p.pendingSenders.Len() != 0 {
for _, md := range pd.MediaDescriptions {
if mid, ok := md.Attribute(sdp.AttrKeyMID); ok {
for i := 0; i < p.pendingSenders.Len(); i++ {
ps := p.pendingSenders.PopFront().(*pendingSender)
if ps.transceiver.Mid() == mid {
ps.sender.Start()
} else {
p.pendingSenders.PushBack(ps)
}
}
}
}

if att.Key == sdp.AttrKeyExtMap && strings.HasSuffix(att.Value, sdp.TransportCCURI) {
ext, _ = strconv.Atoi(att.Value[:1])
if len(id) > 0 {
break
}
}
case webrtc.SDPTypeOffer:
for _, md := range pd.MediaDescriptions {
if md.MediaName.Media != mediaNameAudio && md.MediaName.Media != mediaNameVideo {
continue
}
if att.Key == sdp.AttrKeyMsid {
v := strings.Split(att.Value, " ")
id = v[len(v)-1]
if ext != 0 {
break
var (
ext int
id string
)
for _, att := range md.Attributes {
if att.Key == sdp.AttrKeyExtMap && strings.HasSuffix(att.Value, sdp.TransportCCURI) {
ext, _ = strconv.Atoi(att.Value[:1])
if len(id) > 0 {
break
}
}
if att.Key == sdp.AttrKeyMsid {
v := strings.Split(att.Value, " ")
id = v[len(v)-1]
if ext != 0 {
break
}
}
}
}
p.router.AddTWCCExt(id, ext)
p.router.AddTWCCExt(id, ext)

}
}

return nil
}

Expand Down

0 comments on commit aa741aa

Please sign in to comment.