Skip to content

Commit

Permalink
Do not leak crossbeam types in the channel-api
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zero committed Dec 4, 2023
1 parent 891eaad commit 30c510e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
101 changes: 88 additions & 13 deletions src/api/channel/data.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) 2018-2021, The rav1e contributors. All rights reserved
// Copyright (c) 2018-2023, The rav1e contributors. All rights reserved
//
// This source code is subject to the terms of the BSD 2 Clause License and
// the Alliance for Open Media Patent License 1.0. If the BSD 2 Clause License
// was not distributed with this source code in the LICENSE file, you can
// obtain it at www.aomedia.org/license/software. If the Alliance for Open
// Media Patent License 1.0 was not distributed with this source code in the
// PATENTS file, you can obtain it at www.aomedia.org/license/patent.
#![allow(missing_docs)]

use crate::api::color::*;
use crate::api::config::EncoderConfig;
Expand All @@ -17,11 +16,87 @@ use crate::frame::*;
use crate::util::Pixel;

use bitstream_io::*;
use crossbeam::channel::*;
use crossbeam::channel::{Receiver, Sender};
use thiserror::Error;

use std::io;
use std::sync::Arc;

/// An error returned from the `send` methods.
///
/// The message could not be sent because the channel is disconnected.
///
/// The error contains the message so it can be recovered.
#[derive(PartialEq, Eq, Clone, Copy, Error)]
#[error("sending on a disconnected channel")]
pub struct SendError<T>(pub T);

/// An error returned from the `try_send` methods.
///
/// The error contains the message being sent so it can be recovered.
#[derive(PartialEq, Eq, Clone, Copy, Error)]
pub enum TrySendError<T> {
/// The message could not be sent because the channel is full.
#[error("sending on a full channel")]
Full(T),

/// The message could not be sent because the channel is disconnected.
#[error("sending on a disconnected channel")]
Disconnected(T),
}

/// An error returned from the `recv` methods.
///
/// A message could not be received because the channel is empty and disconnected.
///
#[derive(PartialEq, Eq, Clone, Copy, Debug, Error)]
#[error("receiving on an empty and disconnected channel")]
pub struct RecvError;

/// An error returned from the `try_recv` methods.
///
#[derive(PartialEq, Eq, Clone, Copy, Debug, Error)]
pub enum TryRecvError {
/// A message could not be received because the channel is empty.
#[error("receiving on an empty channel")]
Empty,

/// The message could not be received because the channel is empty and disconnected.
#[error("receiving on an empty and disconnected channel")]
Disconnected,
}

impl<T> SendError<T> {
fn from(value: crossbeam::channel::SendError<T>) -> Self {
Self(value.0)
}
}

impl<T> TrySendError<T> {
fn from(value: crossbeam::channel::TrySendError<T>) -> Self {
use crossbeam::channel::TrySendError::*;
match value {
Full(v) => TrySendError::Full(v),
Disconnected(v) => TrySendError::Disconnected(v),
}
}
}
impl RecvError {
fn from(_: crossbeam::channel::RecvError) -> Self {
RecvError
}
}

impl TryRecvError {
fn from(value: crossbeam::channel::TryRecvError) -> Self {
use crossbeam::channel::TryRecvError::*;
match value {
Empty => TryRecvError::Empty,
Disconnected => TryRecvError::Disconnected,
}
}
}

/// Endpoint to send previous-pass statistics data
pub struct RcDataSender {
pub(crate) sender: Sender<RcData>,
Expand All @@ -45,7 +120,7 @@ impl RcDataSender {
if self.limit <= self.count {
Err(TrySendError::Full(data))
} else {
let r = self.sender.try_send(data);
let r = self.sender.try_send(data).map_err(TrySendError::from);
if r.is_ok() {
self.count += 1;
}
Expand All @@ -60,7 +135,7 @@ impl RcDataSender {
if self.limit <= self.count {
Err(SendError(data))
} else {
let r = self.sender.send(data);
let r = self.sender.send(data).map_err(SendError::from);
if r.is_ok() {
self.count += 1;
}
Expand Down Expand Up @@ -94,7 +169,7 @@ impl RcDataReceiver {
/// - `TryRecvError::Empty` if the channel is currently empty.
/// - `TryRecvError::Disconnected` if the channel is empty and has been disconnected.
pub fn try_recv(&self) -> Result<RcData, TryRecvError> {
self.0.try_recv()
self.0.try_recv().map_err(TryRecvError::from)
}

/// Blocks the current thread until a message is received or the channel is empty and
Expand All @@ -111,15 +186,15 @@ impl RcDataReceiver {
///
/// - `RecvError` if the channel is empty and has been disconnected.
pub fn recv(&self) -> Result<RcData, RecvError> {
self.0.recv()
self.0.recv().map_err(RecvError::from)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn iter(&self) -> Iter<RcData> {
pub fn iter<'a>(&'a self) -> impl Iterator<Item = RcData> + 'a {
self.0.iter()
}

Expand Down Expand Up @@ -161,7 +236,7 @@ impl<T: Pixel> FrameSender<T> {
if self.limit <= self.count {
Err(TrySendError::Full(frame.into()))
} else {
let r = self.sender.try_send(frame.into());
let r = self.sender.try_send(frame.into()).map_err(TrySendError::from);
if r.is_ok() {
self.count += 1;
}
Expand All @@ -178,7 +253,7 @@ impl<T: Pixel> FrameSender<T> {
if self.limit <= self.count {
Err(SendError(frame.into()))
} else {
let r = self.sender.send(frame.into());
let r = self.sender.send(frame.into()).map_err(SendError::from);
if r.is_ok() {
self.count += 1;
}
Expand Down Expand Up @@ -227,7 +302,7 @@ impl<T: Pixel> PacketReceiver<T> {
/// - `TryRecvError::Empty` if the channel is currently empty.
/// - `TryRecvError::Disconnected` if the channel is empty and has been disconnected.
pub fn try_recv(&self) -> Result<Packet<T>, TryRecvError> {
self.receiver.try_recv()
self.receiver.try_recv().map_err(TryRecvError::from)
}
/// Blocks the current thread until a message is received or the channel is empty and
/// disconnected.
Expand All @@ -243,15 +318,15 @@ impl<T: Pixel> PacketReceiver<T> {
///
/// - `RecvError` if the channel is empty and has been disconnected.
pub fn recv(&self) -> Result<Packet<T>, RecvError> {
self.receiver.recv()
self.receiver.recv().map_err(RecvError::from)
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
pub fn iter(&self) -> Iter<Packet<T>> {
pub fn iter<'a>(&'a self) -> impl Iterator<Item = Packet<T>> + 'a {
self.receiver.iter()
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/api/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use rayon::ThreadPool;
use std::sync::Arc;

mod data;
pub use data::*;
pub use data::{
FrameInput, FrameSender, PacketReceiver, PassDataChannel, RcDataReceiver,
RcDataSender, RecvError, SendError, TryRecvError, TrySendError,
VideoDataChannel,
};

mod by_gop;
pub use by_gop::*;
Expand Down

0 comments on commit 30c510e

Please sign in to comment.