Skip to content

Commit

Permalink
feat: optimizing yamux wasm feature
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Oct 18, 2024
1 parent f2f669a commit cefce8a
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 30 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
name: Github Action

on:
pull_request: # trigger on pull requests
pull_request: # trigger on pull requests
push:
branches:
- master # trigger on push to master
- master # trigger on push to master

jobs:
test:
name: Test
runs-on: ${{ matrix.os }}
strategy:
matrix:
build: [ linux, macos, windows ]
build: [linux, macos, windows]
include:
- build: linux
os: ubuntu-latest
Expand Down Expand Up @@ -63,6 +63,7 @@ jobs:
run: |
echo "stable" > rust-toolchain
rustup target add wasm32-unknown-unknown
rustup target add wasm32-wasip1
make features-check
fuzz_test:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ features-check:
# required wasm32-unknown-unknown target
$(Change_Work_Path) && cargo build --features wasm-timer,unstable --no-default-features --target=wasm32-unknown-unknown
$(Change_Work_Path) && cargo build --features wasm-timer,unstable,secio-async-trait --no-default-features --target=wasm32-unknown-unknown
# required wasm32-wasip1 target
$(Change_Work_Path) && cargo build --features wasm-timer,unstable --no-default-features --target=wasm32-wasip1
$(Change_Work_Path) && cargo build --features wasm-timer,unstable,secio-async-trait --no-default-features --target=wasm32-wasip1
bench_p2p:
cd bench && cargo run --release

Expand Down
2 changes: 1 addition & 1 deletion secio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ molecule = "0.8.0"

unsigned-varint = "0.8"
bs58 = "0.5.0"
secp256k1 = "0.29"
secp256k1 = "0.30"
rand = "0.8"

[target.'cfg(unix)'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion tentacle/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<K> InnerService<K>
where
K: KeyProvider,
{
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
#[cfg(not(target_family = "wasm"))]
fn spawn_listener(&mut self, incoming: MultiIncoming, listen_address: Multiaddr) {
let listener = Listener {
inner: incoming,
Expand Down
3 changes: 3 additions & 0 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ nohash-hasher = "0.2"

futures-timer = { version = "3.0.2", optional = true }

[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
web-time = "1.1.0"

[dev-dependencies]
env_logger = "0.6"
rand = "0.8"
Expand Down
74 changes: 49 additions & 25 deletions yamux/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use std::{
task::{Context, Poll},
time::Duration,
};
#[cfg(target_family = "wasm")]

#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
use timer::Instant;
/// wasm-unknown-unkown brower doesn't support time get, must use browser timer instead
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use web_time::Instant;

use futures::{
channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
Expand All @@ -35,13 +39,6 @@ use timer::{interval, Interval};
const BUF_SHRINK_THRESHOLD: usize = u8::MAX as usize;
const TIMEOUT: Duration = Duration::from_secs(30);

/// wasm doesn't support time get, must use browser timer instead
/// But we can simulate it with `futures-timer`.
/// So, I implemented a global time dependent on `futures-timer`,
/// Because in the browser environment, it is always single-threaded, so feel free to be unsafe
#[cfg(target_family = "wasm")]
static mut TIME: Instant = Instant::from_u64(0);

/// The session
pub struct Session<T> {
// Framed low level raw stream
Expand Down Expand Up @@ -109,6 +106,10 @@ pub struct Session<T> {
control_receiver: Receiver<Command>,

keepalive: Option<Interval>,
/// wasi use time mock to recording time changes
/// yamux's timeout statistics are session independent
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
time_mock: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}

/// Session type, client or server
Expand Down Expand Up @@ -149,8 +150,18 @@ where
raw_stream,
FrameCodec::default().max_frame_size(config.max_stream_window_size),
);
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
let time_mock = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let keepalive = if config.enable_keepalive {
Some(interval(config.keepalive_interval))
#[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))]
let interval = interval(config.keepalive_interval);

#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
let mut interval = interval(config.keepalive_interval);
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
interval.mock_instant(time_mock.clone());

Some(interval)
} else {
None
};
Expand All @@ -174,6 +185,8 @@ where
control_sender,
control_receiver,
keepalive,
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
time_mock,
}
}

Expand Down Expand Up @@ -276,7 +289,7 @@ where
if self
.pings
.iter()
.any(|(_id, time)| Instant::now().saturating_duration_since(*time) > TIMEOUT)
.any(|(_id, time)| ping_at.saturating_duration_since(*time) > TIMEOUT)
{
return Err(io::ErrorKind::TimedOut.into());
}
Expand Down Expand Up @@ -636,7 +649,13 @@ where
// Assume that remote peer has gone away and this session should be closed.
self.remote_go_away = true;
} else {
self.keep_alive(cx, Instant::now())?;
#[cfg(not(all(target_family = "wasm", not(target_os = "unknown"))))]
let now = Instant::now();
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
let now = Instant::from_u64(
self.time_mock.load(std::sync::atomic::Ordering::Acquire) as u64,
);
self.keep_alive(cx, now)?;
}
}
Poll::Ready(None) => {
Expand Down Expand Up @@ -731,7 +750,7 @@ mod timer {
}
}

#[cfg(target_family = "wasm")]
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
pub use wasm_mock::Instant;

#[cfg(feature = "generic-timer")]
Expand All @@ -747,15 +766,27 @@ mod timer {
pub struct Interval {
delay: Delay,
period: Duration,
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
mock_instant: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}

impl Interval {
fn new(period: Duration) -> Self {
Self {
delay: Delay::new(period),
period,
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
mock_instant: Default::default(),
}
}

#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
pub fn mock_instant(
&mut self,
mock_instant: std::sync::Arc<std::sync::atomic::AtomicUsize>,
) {
self.mock_instant = mock_instant;
}
}

impl Stream for Interval {
Expand All @@ -766,10 +797,11 @@ mod timer {
Poll::Ready(_) => {
let dur = self.period;
self.delay.reset(dur);
#[cfg(target_family = "wasm")]
unsafe {
super::super::TIME += dur;
}
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
self.mock_instant.fetch_add(
dur.as_millis() as usize,
std::sync::atomic::Ordering::AcqRel,
);
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
Expand All @@ -784,7 +816,7 @@ mod timer {
}
}

#[cfg(target_family = "wasm")]
#[cfg(all(target_family = "wasm", not(target_os = "unknown")))]
#[allow(dead_code)]
mod wasm_mock {
use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd};
Expand Down Expand Up @@ -824,21 +856,13 @@ mod timer {
Instant { inner: val }
}

pub fn now() -> Instant {
unsafe { super::super::TIME }
}

pub fn duration_since(&self, earlier: Instant) -> Duration {
*self - earlier
}

pub fn saturating_duration_since(&self, earlier: Instant) -> Duration {
*self - earlier
}

pub fn elapsed(&self) -> Duration {
Instant::now() - *self
}
}

impl Add<Duration> for Instant {
Expand Down

0 comments on commit cefce8a

Please sign in to comment.