diff --git a/crates/dispatch2/src/object.rs b/crates/dispatch2/src/object.rs index 5547ee2e7..a165afcb4 100644 --- a/crates/dispatch2/src/object.rs +++ b/crates/dispatch2/src/object.rs @@ -1,5 +1,10 @@ //! Dispatch object definition. +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + use super::{ffi::*, queue::Queue, utils::function_wrapper, QualityOfServiceClass}; /// Error returned by [DispatchObject::set_target_queue]. @@ -23,7 +28,7 @@ pub enum QualityOfServiceClassFloorError { #[derive(Debug)] pub struct DispatchObject { object: *mut T, - is_activated: bool, + is_activated: Arc, } impl DispatchObject { @@ -35,7 +40,7 @@ impl DispatchObject { pub unsafe fn new_owned(object: *mut T) -> Self { Self { object, - is_activated: false, + is_activated: Arc::new(AtomicBool::new(false)), } } @@ -47,7 +52,7 @@ impl DispatchObject { pub unsafe fn new_shared(object: *mut T) -> Self { let result = Self { object, - is_activated: false, + is_activated: Arc::new(AtomicBool::new(false)), }; // Safety: We own a reference to the object. @@ -80,7 +85,7 @@ impl DispatchObject { /// /// - DispatchObject should be a queue or queue source. pub unsafe fn set_target_queue(&self, queue: &Queue) -> Result<(), TargetQueueError> { - if self.is_activated { + if self.is_activated.load(Ordering::SeqCst) { return Err(TargetQueueError::ObjectAlreadyActive); } @@ -120,12 +125,15 @@ impl DispatchObject { /// Activate the object. pub fn activate(&mut self) { - // Safety: object cannot be null. - unsafe { - dispatch_activate(self.as_raw().cast()); + if let Ok(true) = + self.is_activated + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + { + // Safety: object cannot be null. + unsafe { + dispatch_activate(self.as_raw().cast()); + } } - - self.is_activated = true; } /// Suspend the invocation of functions on the object. @@ -169,3 +177,9 @@ impl Drop for DispatchObject { } } } + +// Safety: dispatch object can be safely moved between threads. +unsafe impl Send for DispatchObject {} + +// Safety: dispatch object can be shared between threads. +unsafe impl Sync for DispatchObject {}