Skip to content

Commit

Permalink
fix(core): improve algorithm performance that sort audio and video fr…
Browse files Browse the repository at this point in the history
…ames by timestamp for FLV/RTMP
  • Loading branch information
ThibaultBee committed Nov 23, 2023
1 parent 6d446ca commit 55c7dd1
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2023 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.internal.muxers

import io.github.thibaultbee.streampack.internal.data.Packet
import io.github.thibaultbee.streampack.internal.utils.SyncQueue
import io.github.thibaultbee.streampack.logger.Logger

/**
* An abstract class that implements [IMuxer] and output frames in their natural order.
*
* Frames are not in order because audio and video frames are encoded in parallel and video encoding takes more time.
* So some new audio frame could arrive sooner than older video frame.
*
* Some protocols (like RTMP) require frames to be in order. Use this class for this kind of protocols.
* If the protocol doesn't need ordered frames, use [IMuxer] directly.
*
* Don't call [IMuxerListener.onOutputFrame] directly, use [queue] instead.
* Don't forget to call [stopStream] at the end of [IMuxer.stopStream] implementation.
*/
abstract class AbstractSortedMuxer : IMuxer {
private val syncQueue =
SyncQueue(
{ packet1, packet2 -> packet1.ts.compareTo(packet2.ts) },
object : SyncQueue.Listener<Packet> {
override fun onElement(element: Packet) {
listener?.onOutputFrame(element)
}
})

fun queue(packet: Packet) {
syncQueue.add(packet, packet.isVideo)
}

/**
* To be called at the end of [stopStream] implementation.
*/
override fun stopStream() {
syncQueue.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.github.thibaultbee.streampack.internal.data.Frame
import io.github.thibaultbee.streampack.internal.data.Packet
import io.github.thibaultbee.streampack.internal.data.PacketType
import io.github.thibaultbee.streampack.internal.interfaces.ISourceOrientationProvider
import io.github.thibaultbee.streampack.internal.muxers.IMuxer
import io.github.thibaultbee.streampack.internal.muxers.AbstractSortedMuxer
import io.github.thibaultbee.streampack.internal.muxers.IMuxerListener
import io.github.thibaultbee.streampack.internal.muxers.flv.tags.AVTagsFactory
import io.github.thibaultbee.streampack.internal.muxers.flv.tags.FlvHeader
Expand All @@ -33,7 +33,7 @@ class FlvMuxer(
override var listener: IMuxerListener? = null,
initialStreams: List<Config>? = null,
private val writeToFile: Boolean,
) : IMuxer {
) : AbstractSortedMuxer() {
override val helper = FlvMuxerHelper()
private val streams = mutableListOf<Config>()
private val hasAudio: Boolean
Expand Down Expand Up @@ -74,7 +74,7 @@ class FlvMuxer(
frame.pts -= startUpTime!!
val flvTags = AVTagsFactory(frame, streams[streamPid]).build()
flvTags.forEach {
listener?.onOutputFrame(
queue(
Packet(
it.write(), frame.pts, if (frame.isVideo) {
PacketType.VIDEO
Expand Down Expand Up @@ -122,6 +122,7 @@ class FlvMuxer(
startUpTime = null
hasFirstFrame = false
streams.clear()
super.stopStream()
}

override fun release() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (C) 2023 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.internal.utils

import java.util.PriorityQueue
import java.util.concurrent.Executors

/**
* A synchronized queue that allows to add elements in order. Elements are stored till a sync
* element is added. When a sync element is added, all elements that are comparatively lower are
* sent to the listener.
*
* The purpose of this class is to allow to put elements in order.
*
* @param E the type of elements held in this collection
* @param comparator the comparator that will be used to order the elements
* @param listener the listener that will be called when a sync element is added
*/
class SyncQueue<E>(
private val comparator: Comparator<in E>,
private val listener: Listener<E>
) {
private val priorityQueue: PriorityQueue<E> = PriorityQueue(comparator)

private val executor = Executors.newSingleThreadExecutor()

val size: Int
get() = priorityQueue.size

fun add(element: E, isSync: Boolean = false) {
if (isSync) {
var polledElement: E? = pollIf(comparator, element)
while (polledElement != null) {
listener.onElement(polledElement)
polledElement = pollIf(comparator, element)
}
// Send sync element
listener.onElement(element)
} else {
synchronized(this) {
priorityQueue.add(element)
}
}
}

private fun pollIf(comparator: Comparator<in E>, comparableElement: E): E? {
return synchronized(this) {
val element = priorityQueue.peek()
if ((element != null) && comparator.compare(element, comparableElement) <= 0) {
priorityQueue.poll()
}
null
}
}

fun clear() {
synchronized(this) {
priorityQueue.clear()
}
}

interface Listener<E> {
/**
* Called when element is polled.
*/
fun onElement(element: E)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2023 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.internal.utils

import org.junit.Assert.assertEquals
import org.junit.Test

class SyncQueueTest {
@Test
fun `test sync 1 element`() {
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(1, element)
}
})
syncQueue.add(1, true)
assertEquals(0, syncQueue.size)
}

@Test
fun `test already sorted elements`() {
var i = 1
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(i++, element)
}
})
syncQueue.add(1)
syncQueue.add(2)
syncQueue.add(3)
syncQueue.add(4, isSync = true)

assertEquals(5, i)
assertEquals(0, syncQueue.size)
}

@Test
fun `test equals elements`() {
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(1, element)
}
})
syncQueue.add(1)
syncQueue.add(1, isSync = true)

assertEquals(0, syncQueue.size)
}

@Test
fun `test not sorted elements`() {
var i = 1
val syncQueue =
SyncQueue({ o1, o2 -> o1.compareTo(o2) }, object : SyncQueue.Listener<Int> {
override fun onElement(element: Int) {
assertEquals(i++, element)
}
})
syncQueue.add(1)
syncQueue.add(2)
syncQueue.add(3)
syncQueue.add(5)
syncQueue.add(6)
syncQueue.add(4, isSync = true)

assertEquals(5, i)
assertEquals(2, syncQueue.size)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import video.api.rtmpdroid.Rtmp
import java.security.InvalidParameterException

class RtmpProducer(
private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.IO,
private val hasAudio: Boolean = true,
private val hasVideo: Boolean = true,
private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.IO
) :
ILiveEndpoint {
override var onConnectionListener: OnConnectionListener? = null
Expand All @@ -40,8 +38,6 @@ class RtmpProducer(
override val isConnected: Boolean
get() = _isConnected

private val audioPacketQueue = mutableListOf<Packet>()

/**
* Sets/gets supported video codecs.
*/
Expand All @@ -68,7 +64,6 @@ class RtmpProducer(
withContext(coroutineDispatcher) {
try {
isOnError = false
audioPacketQueue.clear()
socket.connect("$url live=1 flashver=FMLE/3.0\\20(compatible;\\20FMSc/1.0)")
_isConnected = true
onConnectionListener?.onSuccess()
Expand Down Expand Up @@ -101,31 +96,7 @@ class RtmpProducer(
}

try {

if (hasAudio && hasVideo) {
/**
* Audio and video packets are received out of timestamp order. We need to reorder them.
* We suppose that video packets arrive after audio packets.
* We store audio packets in a queue and send them before video packets.
*/
if (packet.isAudio) {
// Store audio packet to send them later
audioPacketQueue.add(packet)
} else {
// Send audio packets
val audioPackets = audioPacketQueue.filter {
it.ts <= packet.ts
}

audioPackets.forEach { socket.write(it.buffer) }
audioPacketQueue.removeAll(audioPackets)

// Send video packet
socket.write(packet.buffer)
}
} else {
socket.write(packet.buffer)
}
socket.write(packet.buffer)
} catch (e: Exception) {
disconnect()
isOnError = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class AudioOnlyRtmpLiveStreamer(
) : BaseAudioOnlyLiveStreamer(
context = context,
muxer = FlvMuxer(writeToFile = false),
endpoint = RtmpProducer(hasAudio = true, hasVideo = false),
endpoint = RtmpProducer(),
initialOnErrorListener = initialOnErrorListener,
initialOnConnectionListener = initialOnConnectionListener
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CameraRtmpLiveStreamer(
context = context,
enableAudio = enableAudio,
muxer = FlvMuxer(writeToFile = false),
endpoint = RtmpProducer(hasAudio = enableAudio, hasVideo = true),
endpoint = RtmpProducer(),
initialOnErrorListener = initialOnErrorListener,
initialOnConnectionListener = initialOnConnectionListener
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ScreenRecorderRtmpLiveStreamer(
context = context,
enableAudio = enableAudio,
muxer = FlvMuxer(writeToFile = false),
endpoint = RtmpProducer(hasAudio = enableAudio, hasVideo = true),
endpoint = RtmpProducer(),
initialOnErrorListener = initialOnErrorListener,
initialOnConnectionListener = initialOnConnectionListener
) {
Expand Down

0 comments on commit 55c7dd1

Please sign in to comment.