Skip to content

Commit

Permalink
feat: report connection times to SFU stats (#818)
Browse files Browse the repository at this point in the history
* Video filters

* tweak

* fix

* custom filter added

* fix

* tweaks

* Codec negotiation

* client publish options added

* force TF build

* dep fix

* replace duplicate resource bundle name

* fixed target name

* use xcode 16 to distribte

* podfile test

* remove duplicate bundle

* remove resource bundle 2

* podfile fix

* fixes

* tweaks

* don't log stats request

* setting publish options when reconnecting

* log tweaks

* dogfooding: disable autocorrect in call id input

* announce tracks change for reconnect

* enable env switcher

* log tweak

* added codec to track info

* added muted to track info

* revert temp changes

* Report connection times

* publish option id added to trackinfo

* fmtp line

* fix

* fix for race between negotiation and initial track creation

* cleanup

* tweaks

* fix telemetry reporting

* stopwatch used instead of datetime

* comment removed
  • Loading branch information
Brazol authored Jan 15, 2025
1 parent 32f8517 commit d4cfa22
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 373 deletions.
188 changes: 53 additions & 135 deletions packages/stream_video/lib/src/call/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,16 @@ import '../utils/cancelables.dart';
import '../utils/extensions.dart';
import '../utils/future.dart';
import '../utils/standard.dart';
import '../webrtc/model/stats/rtc_codec.dart';
import '../webrtc/model/stats/rtc_ice_candidate_pair.dart';
import '../webrtc/model/stats/rtc_inbound_rtp_video_stream.dart';
import '../webrtc/model/stats/rtc_outbound_rtp_video_stream.dart';
import '../webrtc/rtc_manager.dart';
import '../webrtc/sdp/editor/sdp_editor_impl.dart';
import '../webrtc/sdp/policy/sdp_policy.dart';
import '../ws/ws.dart';
import 'permissions/permissions_manager.dart';
import 'session/call_session.dart';
import 'session/call_session_factory.dart';
import 'sfu_stats_reporter.dart';
import 'state/call_state_notifier.dart';
import 'stats_reporter.dart';

typedef OnCallPermissionRequest = void Function(
CoordinatorCallPermissionRequestEvent,
Expand Down Expand Up @@ -224,6 +222,9 @@ class Call {
CallSession? _session;
CallSession? _previousSession;

int? _statsReportingIntervalMs;
SfuStatsReporter? _sfuStatsReporter;

int _reconnectAttempts = 0;
Duration _fastReconnectDeadline = Duration.zero;
SfuReconnectionStrategy _reconnectStrategy =
Expand All @@ -241,8 +242,10 @@ class Call {

StateEmitter<CallState> get state => _stateManager.callStateStream;

SharedEmitter<CallStats> get stats => _stats;
late final _stats = MutableSharedEmitterImpl<CallStats>();
SharedEmitter<({CallStats publisherStats, CallStats subscriberStats})>
get stats => _stats;
late final _stats = MutableSharedEmitterImpl<
({CallStats publisherStats, CallStats subscriberStats})>();

SharedEmitter<StreamCallEvent> get callEvents => _callEvents;
final _callEvents = MutableSharedEmitterImpl<StreamCallEvent>();
Expand Down Expand Up @@ -578,6 +581,7 @@ class Call {

return _callJoinLock.synchronized(() async {
_logger.d(() => '[join] options: $_connectOptions');
final connectionTimeStopwatch = Stopwatch()..start();

final validation =
await _stateManager.validateUserId(_streamVideo.currentUser.id);
Expand Down Expand Up @@ -624,7 +628,6 @@ class Call {
}

_credentials = joinedResult.data;

_previousSession = _session;

final isWsHealthy = _previousSession?.sfuWS.isConnected ?? false;
Expand Down Expand Up @@ -704,6 +707,15 @@ class Call {
);
}

// make sure we only track connection timing if we are not calling this method as part of a migration flow
connectionTimeStopwatch.stop();
if (!performingMigration) {
await _sfuStatsReporter?.sendSfuStats(
reconnectionStrategy: _reconnectStrategy,
connectionTimeMs: connectionTimeStopwatch.elapsedMilliseconds,
);
}

if (performingRejoin) {
_logger.v(() => '[join] leaving previous session');
_previousSession?.leave(
Expand All @@ -726,6 +738,10 @@ class Call {
}

_logger.v(() => '[join] completed');

// reset the reconnect strategy to unspecified after a successful reconnection
_reconnectStrategy = SfuReconnectionStrategy.unspecified;

return const Result.success(none);
});
}
Expand All @@ -742,6 +758,7 @@ class Call {
final prevState = _stateManager.callState;

if (credentials == null ||
_statsReportingIntervalMs == null ||
_reconnectStrategy == SfuReconnectionStrategy.rejoin ||
_reconnectStrategy == SfuReconnectionStrategy.migrate) {
_logger.w(() => '[joinIfNeeded] joining');
Expand All @@ -757,8 +774,7 @@ class Call {
return joinedResult.fold(
success: (success) {
_credentials = success.data.credentials;
_session?.rtcManager
?.updateReportingInterval(success.data.reportingIntervalMs);
_statsReportingIntervalMs = success.data.reportingIntervalMs;

return Result.success(success.data.credentials);
},
Expand Down Expand Up @@ -874,29 +890,18 @@ class Call {

_session = session;

_subscriptions.cancel(_idSessionEvents);
_sfuStatsReporter?.stop();
_subscriptions.cancel(_idSessionStats);
_subscriptions.cancel(_idSessionEvents);

_subscriptions.add(
_idSessionEvents,
session.events.listen((event) {
// _logger.log(
// event.logPriority,
// () => '[listenSfuEvent] event.type: ${event.runtimeType}',
// );
event.mapToCallEvent(state.value).emitIfNotNull(_callEvents);
_onSfuEvent(event);
}),
);

_subscriptions.add(
_idSessionStats,
session.stats.listen((stats) {
_stats.emit(stats);
_processStats(stats);
}),
);

var localStats = state.value.localStats ?? LocalStats.empty();
localStats = localStats.copyWith(
sfu: session.config.sfuUrl,
Expand All @@ -921,6 +926,23 @@ class Call {
},
);

_subscriptions.add(
_idSessionStats,
StatsReporter(
rtcManager: session.rtcManager!,
stateManager: _stateManager,
).run(interval: _preferences.callStatsReportingInterval).listen((stats) {
_stats.emit(stats);
}),
);

if (_statsReportingIntervalMs != null) {
_sfuStatsReporter = SfuStatsReporter(
callSession: session,
stateManager: _stateManager,
)..run(interval: Duration(milliseconds: _statsReportingIntervalMs!));
}

return result.fold(
success: (success) {
_logger.v(() => '[startSession] success: $success');
Expand All @@ -934,119 +956,6 @@ class Call {
);
}

void _processStats(CallStats stats) {
var publisherStats =
state.value.publisherStats ?? PeerConnectionStats.empty();
var subscriberStats =
state.value.subscriberStats ?? PeerConnectionStats.empty();

if (stats.peerType == StreamPeerType.publisher) {
final allStats = stats.stats
.whereType<RtcOutboundRtpVideoStream>()
.map(MediaStatsInfo.fromRtcOutboundRtpVideoStream);

final mediaStats = allStats.firstWhereOrNull(
(s) => s.width != null && s.height != null && s.fps != null,
);

final jitterInMs = ((mediaStats?.jitter ?? 0) * 1000).toInt();
final resolution = mediaStats != null
? '${mediaStats.width} x ${mediaStats.height} @ ${mediaStats.fps}fps'
: null;

var activeOutbound = allStats.toList();

if (publisherStats.outboundMediaStats.isNotEmpty) {
activeOutbound = activeOutbound
.where(
(s) =>
publisherStats.outboundMediaStats.none((i) => s.id == i.id) ||
publisherStats.outboundMediaStats
.firstWhere((i) => i.id == s.id)
.bytesSent !=
s.bytesSent,
)
.toList();
}

final codec = stats.stats
.whereType<RtcCodec>()
.where((c) => c.mimeType?.startsWith('video') ?? false)
.where((c) => activeOutbound.any((s) => s.videoCodecId == c.id))
.map((c) => c.mimeType?.replaceFirst('video/', ''))
.where((c) => c != null)
.cast<String>()
.toList();

publisherStats = publisherStats.copyWith(
resolution: resolution,
qualityDropReason: mediaStats?.qualityLimit,
jitterInMs: jitterInMs,
videoCodec: codec,
outboundMediaStats: allStats.toList(),
);
}

final inboudRtpVideo =
stats.stats.whereType<RtcInboundRtpVideoStream>().firstOrNull;

if (stats.peerType == StreamPeerType.subscriber && inboudRtpVideo != null) {
final jitterInMs = ((inboudRtpVideo.jitter ?? 0) * 1000).toInt();
final resolution = inboudRtpVideo.frameWidth != null &&
inboudRtpVideo.frameHeight != null &&
inboudRtpVideo.framesPerSecond != null
? '${inboudRtpVideo.frameWidth} x ${inboudRtpVideo.frameHeight} @ ${inboudRtpVideo.framesPerSecond}fps'
: null;

final codecStats = stats.stats
.whereType<RtcCodec>()
.where((c) => c.mimeType?.startsWith('video') ?? false)
.firstOrNull;

final codec = codecStats?.mimeType?.replaceFirst('video/', '');

subscriberStats = subscriberStats.copyWith(
resolution: resolution,
jitterInMs: jitterInMs,
videoCodec: codec != null ? [codec] : [],
);
}

final candidatePair =
stats.stats.whereType<RtcIceCandidatePair>().firstOrNull;
if (candidatePair != null) {
final latency = candidatePair.currentRoundTripTime;
final outgoingBitrate = candidatePair.availableOutgoingBitrate;
final incomingBitrate = candidatePair.availableIncomingBitrate;

if (stats.peerType == StreamPeerType.publisher) {
publisherStats = publisherStats.copyWith(
latency: latency != null ? (latency * 1000).toInt() : null,
bitrateKbps: outgoingBitrate != null ? outgoingBitrate / 1000 : null,
);
} else {
subscriberStats = subscriberStats.copyWith(
bitrateKbps: incomingBitrate != null ? incomingBitrate / 1000 : null,
);
}
}

var latencyHistory = state.value.latencyHistory;
if (stats.peerType == StreamPeerType.publisher &&
publisherStats.latency != null) {
latencyHistory = [
...state.value.latencyHistory.reversed.take(19).toList().reversed,
publisherStats.latency!,
];
}

_stateManager.lifecycleCallStats(
publisherStats: publisherStats,
subscriberStats: subscriberStats,
latencyHistory: latencyHistory,
);
}

Future<void> _onSfuEvent(SfuEvent sfuEvent) async {
if (sfuEvent is SfuParticipantLeftEvent) {
final callParticipants = [...state.value.callParticipants]..removeWhere(
Expand Down Expand Up @@ -1188,6 +1097,8 @@ class Call {
}

Future<void> _reconnectMigrate() async {
final migrateTimeStopwatch = Stopwatch()..start();

_reconnectStrategy = SfuReconnectionStrategy.migrate;
await _join();
final result = await _session?.waitForMigrationComplete();
Expand All @@ -1202,6 +1113,12 @@ class Call {
_reconnectStrategy = SfuReconnectionStrategy.rejoin;
},
);

migrateTimeStopwatch.stop();
await _sfuStatsReporter?.sendSfuStats(
connectionTimeMs: migrateTimeStopwatch.elapsedMilliseconds,
reconnectionStrategy: _reconnectStrategy,
);
}

Future<InternetStatus> _awaitNetworkAvailable() async {
Expand Down Expand Up @@ -1292,6 +1209,7 @@ class Call {
timer.cancel();
}

_sfuStatsReporter?.stop();
_subscriptions.cancelAll();
_cancelables.cancelAll();
await _session?.dispose();
Expand Down
Loading

0 comments on commit d4cfa22

Please sign in to comment.