Skip to content

Commit

Permalink
Merge branch 'main' into feat/closed-captions
Browse files Browse the repository at this point in the history
# Conflicts:
#	dogfooding/lib/screens/home_screen.dart
#	packages/stream_video/lib/src/call/call.dart
#	packages/stream_video/lib/src/call/session/call_session.dart
#	packages/stream_video/lib/src/models/call_preferences.dart
#	packages/stream_video/lib/src/models/models.dart
#	packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart
#	packages/stream_video/lib/src/webrtc/rtc_manager.dart
#	packages/stream_video_flutter/CHANGELOG.md
  • Loading branch information
Brazol committed Jan 15, 2025
2 parents 11f1ca6 + d4cfa22 commit 4b2e2ec
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,5 +443,3 @@ class TranscriptionSettingsResponseModeEnumTypeTransformer {
/// Singleton [TranscriptionSettingsResponseModeEnumTypeTransformer] instance.
static TranscriptionSettingsResponseModeEnumTypeTransformer? _instance;
}


194 changes: 54 additions & 140 deletions packages/stream_video/lib/src/call/call.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ import '../utils/subscriptions.dart';
import '../webrtc/media/media_constraints.dart';
import '../webrtc/model/rtc_video_dimension.dart';
import '../webrtc/model/rtc_video_parameters.dart';
import '../webrtc/model/stats/rtc_codec.dart';
import '../webrtc/model/stats/rtc_ice_candidate_pair.dart';
import '../webrtc/model/stats/rtc_inbound_rtp_video_stream.dart';
import '../webrtc/model/stats/rtc_outbound_rtp_video_stream.dart';
import '../webrtc/peer_type.dart';
import '../webrtc/rtc_manager.dart';
import '../webrtc/rtc_media_device/rtc_media_device.dart';
import '../webrtc/rtc_media_device/rtc_media_device_notifier.dart';
Expand All @@ -61,7 +56,9 @@ import 'permissions/permissions_manager.dart';
import 'session/call_session.dart';
import 'session/call_session_factory.dart';
import 'session/dynascale_manager.dart';
import 'sfu_stats_reporter.dart';
import 'state/call_state_notifier.dart';
import 'stats_reporter.dart';

typedef OnCallPermissionRequest = void Function(
StreamCallPermissionRequestEvent,
Expand Down Expand Up @@ -250,6 +247,9 @@ class Call {
CallSession? _session;
CallSession? _previousSession;

int? _statsReportingIntervalMs;
SfuStatsReporter? _sfuStatsReporter;

int _reconnectAttempts = 0;
Duration _fastReconnectDeadline = Duration.zero;
SfuReconnectionStrategy _reconnectStrategy =
Expand All @@ -268,8 +268,10 @@ class Call {

StateEmitter<CallState> get state => _stateManager.callStateStream;

SharedEmitter<CallStats> get stats => _stats;
late final _stats = MutableSharedEmitterImpl<CallStats>();
SharedEmitter<({CallStats publisherStats, CallStats subscriberStats})>
get stats => _stats;
late final _stats = MutableSharedEmitterImpl<
({CallStats publisherStats, CallStats subscriberStats})>();

SharedEmitter<StreamCallEvent> get callEvents => _callEvents;
final _callEvents = MutableSharedEmitterImpl<StreamCallEvent>();
Expand Down Expand Up @@ -625,6 +627,7 @@ class Call {

return _callJoinLock.synchronized(() async {
_logger.d(() => '[join] options: $_connectOptions');
final connectionTimeStopwatch = Stopwatch()..start();

final validation =
await _stateManager.validateUserId(_streamVideo.currentUser.id);
Expand Down Expand Up @@ -671,7 +674,6 @@ class Call {
}

_credentials = joinedResult.data;

_previousSession = _session;

final isWsHealthy = _previousSession?.sfuWS.isConnected ?? false;
Expand Down Expand Up @@ -751,6 +753,15 @@ class Call {
);
}

// make sure we only track connection timing if we are not calling this method as part of a migration flow
connectionTimeStopwatch.stop();
if (!performingMigration) {
await _sfuStatsReporter?.sendSfuStats(
reconnectionStrategy: _reconnectStrategy,
connectionTimeMs: connectionTimeStopwatch.elapsedMilliseconds,
);
}

if (performingRejoin) {
_logger.v(() => '[join] leaving previous session');
_previousSession?.leave(
Expand All @@ -773,6 +784,10 @@ class Call {
}

_logger.v(() => '[join] completed');

// reset the reconnect strategy to unspecified after a successful reconnection
_reconnectStrategy = SfuReconnectionStrategy.unspecified;

return const Result.success(none);
});
}
Expand All @@ -789,6 +804,7 @@ class Call {
final prevState = _stateManager.callState;

if (credentials == null ||
_statsReportingIntervalMs == null ||
_reconnectStrategy == SfuReconnectionStrategy.rejoin ||
_reconnectStrategy == SfuReconnectionStrategy.migrate) {
_logger.w(() => '[joinIfNeeded] joining');
Expand All @@ -804,8 +820,7 @@ class Call {
return joinedResult.fold(
success: (success) {
_credentials = success.data.credentials;
_session?.rtcManager
?.updateReportingInterval(success.data.reportingIntervalMs);
_statsReportingIntervalMs = success.data.reportingIntervalMs;

return Result.success(success.data.credentials);
},
Expand Down Expand Up @@ -921,29 +936,18 @@ class Call {

_session = session;

_subscriptions.cancel(_idSessionEvents);
_sfuStatsReporter?.stop();
_subscriptions.cancel(_idSessionStats);
_subscriptions.cancel(_idSessionEvents);

_subscriptions.add(
_idSessionEvents,
session.events.listen((event) {
// _logger.log(
// event.logPriority,
// () => '[listenSfuEvent] event.type: ${event.runtimeType}',
// );
event.mapToCallEvent(state.value).emitIfNotNull(_callEvents);
_onSfuEvent(event);
}),
);

_subscriptions.add(
_idSessionStats,
session.stats.listen((stats) {
_stats.emit(stats);
_processStats(stats);
}),
);

var localStats = state.value.localStats ?? LocalStats.empty();
localStats = localStats.copyWith(
sfu: session.config.sfuUrl,
Expand All @@ -968,6 +972,23 @@ class Call {
},
);

_subscriptions.add(
_idSessionStats,
StatsReporter(
rtcManager: session.rtcManager!,
stateManager: _stateManager,
).run(interval: _preferences.callStatsReportingInterval).listen((stats) {
_stats.emit(stats);
}),
);

if (_statsReportingIntervalMs != null) {
_sfuStatsReporter = SfuStatsReporter(
callSession: session,
stateManager: _stateManager,
)..run(interval: Duration(milliseconds: _statsReportingIntervalMs!));
}

return result.fold(
success: (success) {
_logger.v(() => '[startSession] success: $success');
Expand All @@ -981,119 +1002,6 @@ class Call {
);
}

void _processStats(CallStats stats) {
var publisherStats =
state.value.publisherStats ?? PeerConnectionStats.empty();
var subscriberStats =
state.value.subscriberStats ?? PeerConnectionStats.empty();

if (stats.peerType == StreamPeerType.publisher) {
final allStats = stats.stats
.whereType<RtcOutboundRtpVideoStream>()
.map(MediaStatsInfo.fromRtcOutboundRtpVideoStream);

final mediaStats = allStats.firstWhereOrNull(
(s) => s.width != null && s.height != null && s.fps != null,
);

final jitterInMs = ((mediaStats?.jitter ?? 0) * 1000).toInt();
final resolution = mediaStats != null
? '${mediaStats.width} x ${mediaStats.height} @ ${mediaStats.fps}fps'
: null;

var activeOutbound = allStats.toList();

if (publisherStats.outboundMediaStats.isNotEmpty) {
activeOutbound = activeOutbound
.where(
(s) =>
publisherStats.outboundMediaStats.none((i) => s.id == i.id) ||
publisherStats.outboundMediaStats
.firstWhere((i) => i.id == s.id)
.bytesSent !=
s.bytesSent,
)
.toList();
}

final codec = stats.stats
.whereType<RtcCodec>()
.where((c) => c.mimeType?.startsWith('video') ?? false)
.where((c) => activeOutbound.any((s) => s.videoCodecId == c.id))
.map((c) => c.mimeType?.replaceFirst('video/', ''))
.where((c) => c != null)
.cast<String>()
.toList();

publisherStats = publisherStats.copyWith(
resolution: resolution,
qualityDropReason: mediaStats?.qualityLimit,
jitterInMs: jitterInMs,
videoCodec: codec,
outboundMediaStats: allStats.toList(),
);
}

final inboudRtpVideo =
stats.stats.whereType<RtcInboundRtpVideoStream>().firstOrNull;

if (stats.peerType == StreamPeerType.subscriber && inboudRtpVideo != null) {
final jitterInMs = ((inboudRtpVideo.jitter ?? 0) * 1000).toInt();
final resolution = inboudRtpVideo.frameWidth != null &&
inboudRtpVideo.frameHeight != null &&
inboudRtpVideo.framesPerSecond != null
? '${inboudRtpVideo.frameWidth} x ${inboudRtpVideo.frameHeight} @ ${inboudRtpVideo.framesPerSecond}fps'
: null;

final codecStats = stats.stats
.whereType<RtcCodec>()
.where((c) => c.mimeType?.startsWith('video') ?? false)
.firstOrNull;

final codec = codecStats?.mimeType?.replaceFirst('video/', '');

subscriberStats = subscriberStats.copyWith(
resolution: resolution,
jitterInMs: jitterInMs,
videoCodec: codec != null ? [codec] : [],
);
}

final candidatePair =
stats.stats.whereType<RtcIceCandidatePair>().firstOrNull;
if (candidatePair != null) {
final latency = candidatePair.currentRoundTripTime;
final outgoingBitrate = candidatePair.availableOutgoingBitrate;
final incomingBitrate = candidatePair.availableIncomingBitrate;

if (stats.peerType == StreamPeerType.publisher) {
publisherStats = publisherStats.copyWith(
latency: latency != null ? (latency * 1000).toInt() : null,
bitrateKbps: outgoingBitrate != null ? outgoingBitrate / 1000 : null,
);
} else {
subscriberStats = subscriberStats.copyWith(
bitrateKbps: incomingBitrate != null ? incomingBitrate / 1000 : null,
);
}
}

var latencyHistory = state.value.latencyHistory;
if (stats.peerType == StreamPeerType.publisher &&
publisherStats.latency != null) {
latencyHistory = [
...state.value.latencyHistory.reversed.take(19).toList().reversed,
publisherStats.latency!,
];
}

_stateManager.lifecycleCallStats(
publisherStats: publisherStats,
subscriberStats: subscriberStats,
latencyHistory: latencyHistory,
);
}

Future<void> _onSfuEvent(SfuEvent sfuEvent) async {
if (sfuEvent is SfuParticipantLeftEvent) {
final callParticipants = [...state.value.callParticipants]..removeWhere(
Expand Down Expand Up @@ -1235,6 +1143,8 @@ class Call {
}

Future<void> _reconnectMigrate() async {
final migrateTimeStopwatch = Stopwatch()..start();

_reconnectStrategy = SfuReconnectionStrategy.migrate;
await _join();
final result = await _session?.waitForMigrationComplete();
Expand All @@ -1249,6 +1159,12 @@ class Call {
_reconnectStrategy = SfuReconnectionStrategy.rejoin;
},
);

migrateTimeStopwatch.stop();
await _sfuStatsReporter?.sendSfuStats(
connectionTimeMs: migrateTimeStopwatch.elapsedMilliseconds,
reconnectionStrategy: _reconnectStrategy,
);
}

Future<InternetStatus> _awaitNetworkAvailable() async {
Expand Down Expand Up @@ -1335,13 +1251,11 @@ class Call {
Future<void> _clear(String src) async {
_logger.d(() => '[clear] src: $src');

for (final timer in _reactionTimers) {
for (final timer in [..._reactionTimers, ..._captionsTimers.values]) {
timer.cancel();
}

for (final timer in _captionsTimers.values) {
timer.cancel();
}
_sfuStatsReporter?.stop();

_subscriptions.cancelAll();
_cancelables.cancelAll();
Expand Down
Loading

0 comments on commit 4b2e2ec

Please sign in to comment.