Skip to content

Commit

Permalink
Merge pull request #78 from traP-jp/feat/#77-spmc-channel
Browse files Browse the repository at this point in the history
✨ spmc
  • Loading branch information
comavius authored Dec 20, 2024
2 parents e4da096 + 9a87591 commit f428e8f
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/container.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![allow(clippy::module_inception)]
pub mod container;
use crate::custom_rc::FileLink;
use crate::custom_rc::SymlinkLink;
use crate::remote_exec::ExecutionOutput;
Expand All @@ -18,3 +17,7 @@ pub trait Container {
file_links: HashMap<PathBuf, MutexGuard<'a, FileLinkType>>,
) -> Result<ExecutionOutput>;
}

pub trait ContainerFactory<ContainerType: Container, Priority: Ord> {
async fn get(&self, priority: Priority) -> Result<ContainerType>;
}
86 changes: 86 additions & 0 deletions src/submission_logic/spmc_oneshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::collections::BinaryHeap;
mod sender_with_ord;
use sender_with_ord::OneshotSenderWithOrd;
use tokio::sync::Mutex;
use std::sync::Arc;


/// single-producer multi consumer oneshot channel
pub struct Spmc<PriorityType: Ord, ItemType> {
heap: BinaryHeap<OneshotSenderWithOrd<PriorityType, ItemType>>,
_phantom: std::marker::PhantomData<ItemType>,
}


impl <
PriorityType: Ord,
ItemType,
> Spmc<PriorityType, ItemType> {
fn new() -> Self {
Self {
heap: BinaryHeap::new(),
_phantom: std::marker::PhantomData,
}
}
}


/// sender of single-producer multi consumer oneshot channel
pub struct SpmcSender<PriorityType: Ord, ItemType> {
channel: Arc<Mutex<Spmc<PriorityType, ItemType>>>,
}

impl <
PriorityType: Ord,
ItemType,
> SpmcSender<PriorityType, ItemType> {
async fn send(&self, item: ItemType) -> Result<(), ItemType> {
let sender = {
let mut heap = self.channel.lock().await;
match heap.heap.pop() {
Some(OneshotSenderWithOrd { sender: top_sender, .. }) => {
Some(top_sender)
},
None => None,
}
};
match sender {
Some(sender) => {
sender.send(item)
},
None => {
Err(item)
},
}
}
}

/// receiver factory of single-producer multi consumer oneshot channel
pub struct SpmcReceiverFactory<PriorityType: Ord, ItemType> {
channel: Arc<Mutex<Spmc<PriorityType, ItemType>>>,
}

impl <
PriorityType: Ord,
ItemType,
> SpmcReceiverFactory<PriorityType, ItemType> {
async fn new_receiver(&self, priority: PriorityType) -> SpmcReceiver<ItemType> {
let (sender, receiver) = tokio::sync::oneshot::channel();
let sender_with_ord = OneshotSenderWithOrd {
priority,
sender,
};
let mut heap = self.channel.lock().await;
heap.heap.push(sender_with_ord);
receiver
}
}

/// receiver of single-producer multi consumer oneshot channel
pub type SpmcReceiver<ItemType> = tokio::sync::oneshot::Receiver<ItemType>;


pub fn channel<PriorityType: Ord, ItemType>() -> (SpmcSender<PriorityType, ItemType>, SpmcReceiverFactory<PriorityType, ItemType>) {
let channel = Arc::new(Mutex::new(Spmc::new()));
(SpmcSender { channel: channel.clone() }, SpmcReceiverFactory { channel })
}
38 changes: 38 additions & 0 deletions src/submission_logic/spmc_oneshot/sender_with_ord.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use uuid::Uuid;
use tokio::sync::oneshot::Sender;
pub struct OneshotSenderWithOrd<PriorityType: Ord, ItemType> {
pub priority: PriorityType,
pub sender: Sender<ItemType>,
}

impl <
PriorityType: Ord,
ItemType,
> PartialEq for OneshotSenderWithOrd<PriorityType, ItemType> {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}

impl <
PriorityType: Ord,
ItemType,
> Eq for OneshotSenderWithOrd<PriorityType, ItemType> {}

impl <
PriorityType: Ord,
ItemType,
> PartialOrd for OneshotSenderWithOrd<PriorityType, ItemType> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.priority.cmp(&other.priority))
}
}

impl <
PriorityType: Ord,
ItemType,
> Ord for OneshotSenderWithOrd<PriorityType, ItemType> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
}
}

0 comments on commit f428e8f

Please sign in to comment.