Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve packet handling #63

Merged
merged 5 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ tracing-flame = "0.2.0"
ndarray = "0.15.6"
jemallocator = "0.5.4"
jemalloc-ctl = "0.5.4"
libc = "0.2.153"
fastrand = "2.0.2"

# removing this because jemalloc-ctl is nice for memory stats also
# https://github.com/rust-lang/rust-analyzer/issues/1441#issuecomment-509506279
Expand All @@ -68,6 +70,7 @@ redundant_feature_names = "deny"
wildcard_dependencies = "deny"

restriction = { level = "deny", priority = -1 }
semicolon_outside_block = "allow"
missing_docs_in_private_items = "allow"
question_mark_used = "allow"
print_stdout = "allow"
Expand Down Expand Up @@ -117,6 +120,7 @@ pedantic = { level = "deny", priority = -1 }
uninlined_format_args = "allow" # consider denying; this is allowed because Copilot often generates code that triggers this lint
needless_pass_by_value = "allow" # consider denying
cast_lossless = "allow"
cast_sign_loss = "allow"
cast_possible_truncation = "allow" # consider denying
cast_precision_loss = "allow" # consider denying
missing_errors_doc = "allow" # consider denying
Expand Down
1 change: 0 additions & 1 deletion server/src/bits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ impl BitStorage {
vec![0; calculated_length]
};

#[allow(clippy::cast_sign_loss)]
Ok(Self {
data: using_data,
bits,
Expand Down
1 change: 0 additions & 1 deletion server/src/bounding_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl BoundingBox {
}
}

#[allow(clippy::cast_sign_loss)]
const fn idx(location: IVec2) -> Index2D {
Index2D {
x: location.x,
Expand Down
92 changes: 84 additions & 8 deletions server/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
#![allow(clippy::module_name_repetitions)]

use std::{borrow::Cow, cell::UnsafeCell, collections::BTreeSet, io, io::ErrorKind, sync::Mutex};
use std::{
borrow::Cow,
cell::UnsafeCell,
collections::BTreeSet,
io,
io::ErrorKind,
os::fd::{AsRawFd, RawFd},
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};

use anyhow::{ensure, Context};
use base64::Engine;
Expand Down Expand Up @@ -31,6 +43,8 @@ use valence_registry::{BiomeRegistry, RegistryCodec};

use crate::GLOBAL;

const DEFAULT_SPEED: u32 = 1024 * 1024;

const READ_BUF_SIZE: usize = 4096;

/// The Minecraft protocol version this library currently targets.
Expand Down Expand Up @@ -66,6 +80,7 @@ pub struct Io {

pub struct IoWrite {
write: OwnedWriteHalf<TcpStream>,
raw_fd: RawFd,
}

pub struct IoRead {
Expand All @@ -76,9 +91,17 @@ pub struct IoRead {
pub struct WriterComm {
tx: flume::Sender<bytes::Bytes>,
enc: PacketEncoder,

/// Approximate speed that the other side can receive the data that this sends.
/// Measured in bytes/second.
speed_mib_per_second: Arc<AtomicU32>,
}

impl WriterComm {
pub fn speed_mib_per_second(&self) -> u32 {
self.speed_mib_per_second.load(Ordering::Relaxed)
}

pub fn serialize<P>(&mut self, pkt: &P) -> anyhow::Result<bytes::Bytes>
where
P: valence_protocol::Packet + Encode,
Expand Down Expand Up @@ -117,10 +140,9 @@ impl WriterComm {
}

pub fn send_keep_alive(&mut self) -> anyhow::Result<()> {
// todo: handle error
let pkt = valence_protocol::packets::play::KeepAliveS2c {
// todo: this might be inefficient
id: rand::random(),
// The ID can be set to zero because it doesn't matter
id: 0,
};

self.send_packet(&pkt)?;
Expand Down Expand Up @@ -212,6 +234,36 @@ impl IoWrite {

Ok(())
}

andrewgazelka marked this conversation as resolved.
Show resolved Hide resolved
/// This function returns the number of bytes in the TCP send queue that have
/// not yet been transmitted to the network.
///
/// It utilizes the `ioctl` system call with the `TIOCOUTQ` operation on Unix-like systems to
/// query this information. The function safely checks the `ioctl` return value to ensure no
/// errors occur during the operation.
///
/// If running on non-Unix systems, it currently returns `0` by default.
///
/// Proper error handling for `ioctl` failures should be added, and support for other operating
/// systems needs to be considered for portability.
pub(crate) fn queued_send(&self) -> libc::c_int {
if cfg!(unix) {
let mut value: libc::c_int = 0;
// SAFETY: raw_fd is valid since the TcpStream is still alive, and value is valid to
// write to
unsafe {
// TODO: Handle ioctl error properly
assert_ne!(
libc::ioctl(self.raw_fd, libc::TIOCOUTQ, core::ptr::addr_of_mut!(value)),
-1
);
}
value
} else {
// TODO: Support getting queued send for other OS
0
}
}
}

pub struct Packets {
Expand Down Expand Up @@ -273,7 +325,6 @@ impl Io {

let mut bytes_slice = &*bytes;
let slice = &mut bytes_slice;
#[allow(clippy::cast_sign_loss)]
let length = VarInt::decode_partial(slice).unwrap() as usize;

let slice_len = bytes_slice.len();
Expand Down Expand Up @@ -307,7 +358,6 @@ impl Io {

let HandshakeC2s {
protocol_version,
server_port,
next_state,
..
} = self.recv_packet().await?;
Expand All @@ -318,7 +368,6 @@ impl Io {
protocol_version.0 == PROTOCOL_VERSION,
"expected protocol version {PROTOCOL_VERSION}, got {version}"
);
ensure!(server_port == 25565, "expected server port 25565");

match next_state {
HandshakeNextState::Status => self.server_status().await?,
Expand Down Expand Up @@ -356,14 +405,18 @@ impl Io {
// bound at 1024 packets
let (s2c_tx, s2c_rx) = flume::unbounded();

let raw_fd = self.stream.as_raw_fd();
let (read, write) = self.stream.into_split();

let speed = Arc::new(AtomicU32::new(DEFAULT_SPEED));

let writer_comm = WriterComm {
tx: s2c_tx,
enc: self.enc,
speed_mib_per_second: Arc::clone(&speed),
};

let mut io_write = IoWrite { write };
let mut io_write = IoWrite { write, raw_fd };

let mut io_read = IoRead {
stream: read,
Expand All @@ -381,11 +434,34 @@ impl Io {
});

monoio::spawn(async move {
let mut past_queued_send = 0;
let mut past_instant = Instant::now();
while let Ok(bytes) = s2c_rx.recv_async().await {
let len = bytes.len();
if let Err(e) = io_write.send_packet(bytes).await {
error!("{e:?}");
break;
}
let elapsed = past_instant.elapsed();

// todo: clarify why 1 second?
if elapsed > Duration::from_secs(1) {
andrewgazelka marked this conversation as resolved.
Show resolved Hide resolved
let queued_send = io_write.queued_send();
speed.store(
((past_queued_send - queued_send) as f32 / elapsed.as_secs_f32()) as u32,
Ordering::Relaxed,
);
past_queued_send = io_write.queued_send();
past_instant = Instant::now();
} else {
// This will make the estimated speed slightly lower than the actual speed, but
// it makes measuring speed more practical because the server will send packets
// to the client more often than 1 second
#[allow(clippy::cast_possible_wrap)]
{
past_queued_send += len as libc::c_int;
}
}
}
});

Expand Down
17 changes: 13 additions & 4 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use valence_protocol::math::DVec3;
use crate::{
bounding_box::BoundingBox,
io::{server, ClientConnection, Packets, GLOBAL_PACKETS},
singleton::{encoder::Encoder, player_lookup::PlayerLookup},
singleton::player_lookup::PlayerLookup,
};

mod global;
Expand All @@ -47,6 +47,13 @@ struct Player {
packets: Packets,
name: Box<str>,
last_keep_alive_sent: Instant,

/// Set to true if a keep alive has been sent to the client and the client hasn't responded.
unresponded_keep_alive: bool,

/// The player's ping. This is likely higher than the player's real ping.
ping: Duration,

locale: Option<String>,
}

Expand Down Expand Up @@ -115,6 +122,9 @@ fn bytes_to_mb(bytes: usize) -> f64 {
#[derive(Event)]
struct Gametick;

#[derive(Event)]
struct BroadcastPackets;

static GLOBAL: global::Global = global::Global {
player_count: AtomicU32::new(0),
};
Expand Down Expand Up @@ -171,6 +181,7 @@ impl Game {
world.add_handler(system::entity_detect_collisions);
world.add_handler(system::reset_bounding_boxes);

world.add_handler(system::broadcast_packets);
world.add_handler(system::keep_alive);
world.add_handler(process_packets);
world.add_handler(system::tps_message);
Expand All @@ -179,9 +190,6 @@ impl Game {
let bounding_boxes = world.spawn();
world.insert(bounding_boxes, bounding_box::EntityBoundingBoxes::default());

let encoder = world.spawn();
world.insert(encoder, Encoder);

let lookup = world.spawn();
world.insert(lookup, PlayerLookup::default());

Expand Down Expand Up @@ -269,6 +277,7 @@ impl Game {
}

self.world.send(Gametick);
self.world.send(BroadcastPackets);

self.req_packets.send(()).unwrap();

Expand Down
12 changes: 11 additions & 1 deletion server/src/packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ fn update_selected_slot(mut data: &[u8]) -> anyhow::Result<()> {
Ok(())
}

fn keep_alive(player: &mut Player) -> anyhow::Result<()> {
ensure!(
!player.unresponded_keep_alive,
"keep alive sent unexpectedly"
);
player.unresponded_keep_alive = false;
player.ping = player.last_keep_alive_sent.elapsed();
Ok(())
}

#[derive(Debug, Copy, Clone)]
enum HybridPos {
Absolute(f64),
Expand Down Expand Up @@ -244,7 +254,7 @@ pub fn switch(
play::ClientCommandC2s::ID => player_command(data),
play::UpdatePlayerAbilitiesC2s::ID => update_player_abilities(data)?,
play::UpdateSelectedSlotC2s::ID => update_selected_slot(data)?,
play::KeepAliveC2s::ID => (), // todo: implement
play::KeepAliveC2s::ID => keep_alive(player)?,
play::CommandExecutionC2s::ID => chat_command(data, player, full_entity_pose, sender)?,
_ => warn!("unknown packet id: 0x{:02X}", id),
}
Expand Down
Loading