Skip to content

Commit

Permalink
fix: re-registering interest on Windows (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
EliseZeroTwo committed Nov 5, 2024
1 parent 90c9c3e commit c30db89
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
41 changes: 18 additions & 23 deletions monoio/src/driver/legacy/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ impl Poller {
token: mio::Token,
interests: mio::Interest,
) -> std::io::Result<()> {
if state.inner.is_none() {
let mut state_inner = state.inner.lock().unwrap();
if state_inner.inner.is_none() {
let flags = interests_to_afd_flags(interests);

let inner = {
Expand All @@ -143,9 +144,9 @@ impl Poller {

self.queue_state(inner.clone());
unsafe { self.update_sockets_events_if_polling()? };
state.inner = Some(inner);
state.token = token;
state.interest = interests;
state_inner.inner = Some(inner);
state_inner.token = token;
state_inner.interest = interests;

Ok(())
} else {
Expand All @@ -155,37 +156,31 @@ impl Poller {

pub fn reregister(
&self,
state: &mut SocketState,
state: Pin<Arc<Mutex<SockState>>>,
token: mio::Token,
interests: mio::Interest,
) -> std::io::Result<()> {
if let Some(inner) = state.inner.as_mut() {
{
let event = Event {
flags: interests_to_afd_flags(interests),
data: token.0 as u64,
};

inner.lock().unwrap().set_event(event);
}

state.token = token;
state.interest = interests;
{
let event = Event {
flags: interests_to_afd_flags(interests),
data: token.0 as u64,
};

self.queue_state(inner.clone());
unsafe { self.update_sockets_events_if_polling() }
} else {
Err(std::io::ErrorKind::NotFound.into())
state.lock().unwrap().set_event(event);
}

self.queue_state(state.clone());
unsafe { self.update_sockets_events_if_polling() }
}

pub fn deregister(&mut self, state: &mut SocketState) -> std::io::Result<()> {
if let Some(inner) = state.inner.as_mut() {
let mut state_inner = state.inner.lock().unwrap();
if let Some(inner) = state_inner.inner.as_mut() {
{
let mut sock_state = inner.lock().unwrap();
sock_state.mark_delete();
}
state.inner = None;
state_inner.inner = None;
Ok(())
} else {
Err(std::io::ErrorKind::NotFound.into())
Expand Down
17 changes: 12 additions & 5 deletions monoio/src/driver/legacy/iocp/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@ pub enum SockPollStatus {
}

#[derive(Debug)]
pub struct SocketState {
pub socket: RawSocket,
pub struct SocketStateInner {
pub inner: Option<Pin<Arc<Mutex<SockState>>>>,
pub token: mio::Token,
pub interest: mio::Interest,
}

#[derive(Debug)]
pub struct SocketState {
pub socket: RawSocket,
pub inner: Arc<Mutex<SocketStateInner>>,
}

impl SocketState {
pub fn new(socket: RawSocket) -> Self {
Self {
socket,
inner: None,
token: mio::Token(0),
interest: mio::Interest::READABLE,
inner: Arc::new(Mutex::new(SocketStateInner {
inner: None,
token: mio::Token(0),
interest: mio::Interest::READABLE,
}))
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion monoio/src/driver/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl LegacyDriver {
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
let io = ScheduledIo::default();
let io = ScheduledIo::new(state.inner.clone());
let token = inner.io_dispatch.insert(io);

match inner.poll.register(state, mio::Token(token), interest) {
Expand Down Expand Up @@ -303,6 +303,21 @@ impl LegacyInner {
flags: 0,
}),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
#[cfg(windows)]
{
if let Some((sock_state, token, interest)) = {
let socket_state_lock = ref_mut.state.lock().unwrap();
socket_state_lock.inner.clone().map(|inner| (inner, socket_state_lock.token, socket_state_lock.interest))
} {
if let Err(e) = inner.poll.reregister(sock_state, token, interest) {
return Poll::Ready(CompletionMeta {
result: Err(e),
flags: 0,
});
}
}
}

ref_mut.clear_readiness(direction.mask());
ref_mut.set_waker(cx, direction);
Poll::Pending
Expand Down
9 changes: 8 additions & 1 deletion monoio/src/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ pub(crate) struct ScheduledIo {
reader: Option<Waker>,
/// Waker used for AsyncWrite.
writer: Option<Waker>,

#[cfg(windows)]
pub state: std::sync::Arc<std::sync::Mutex<super::legacy::iocp::SocketStateInner>>,
}


#[cfg(not(windows))]
impl Default for ScheduledIo {
#[inline]
fn default() -> Self {
Expand All @@ -19,11 +24,13 @@ impl Default for ScheduledIo {
}

impl ScheduledIo {
pub(crate) const fn new() -> Self {
pub(crate) const fn new(#[cfg(windows)] state: std::sync::Arc<std::sync::Mutex<super::legacy::iocp::SocketStateInner>>) -> Self {
Self {
readiness: Ready::EMPTY,
reader: None,
writer: None,
#[cfg(windows)]
state,
}
}

Expand Down

0 comments on commit c30db89

Please sign in to comment.