Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service layering #307

Merged
merged 37 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e0006c0
WIP: Working Box<dyn Stream>> based Service but with stream & middlew…
ximon18 Apr 14, 2024
4aec8bd
WIP: Working Box<dyn Stream> based Service andmandatory middleware, w…
ximon18 Apr 15, 2024
724d670
WIP: Working Box<dyn Stream> based Service and mandatory middleware, …
ximon18 Apr 15, 2024
4cc2bd5
Compilation fix.
ximon18 Apr 15, 2024
32b4623
Remove commented out code.
ximon18 Apr 15, 2024
c6682b6
WIP: Working Box<dyn Stream> based Service and mandatory middleware (…
ximon18 Apr 16, 2024
8a406d3
Factor `MiddlewareStream` out to new util.rs.
ximon18 Apr 16, 2024
98fcb65
Adds Service style versions of the EDNS and Cookie middleware too. Al…
ximon18 Apr 16, 2024
3dc28be
Remove the old middleware processors, pocessor trait, chain and build…
ximon18 Apr 16, 2024
0618b82
Remove diagnostic trace message accidentally left behind.
ximon18 Apr 23, 2024
20220a0
Test fixes: A single test stream should yield only a single message, …
ximon18 Apr 29, 2024
7549bd0
- Layered service based middleware.
ximon18 Apr 30, 2024
9feccce
cargo fmt.
ximon18 Apr 30, 2024
3a09c12
Merge branch 'main' into service-layering
ximon18 Apr 30, 2024
592304b
FIX: DisconnectWithFlush ignored in Stream Connection handler.
ximon18 Apr 30, 2024
f4a1e0b
Update syntax in broken doc tests.
ximon18 Apr 30, 2024
d236fff
Add a bare bones demo of an RFC 9567 monitoring agent implemented usi…
ximon18 Apr 30, 2024
31e5d7c
Fix broken RustDoc references.
ximon18 Apr 30, 2024
a45a09f
Fix compilation error "`main` function not found in crate `serve_rfc9…
ximon18 Apr 30, 2024
429de28
Fix compilation error "use of unstable library feature 'option_as_sli…
ximon18 Apr 30, 2024
e4b9d58
Merge branch 'main' into service-layering.
ximon18 Apr 30, 2024
ce0f029
FIX: Cookie middleware doesn't do post-processing, so actually suppor…
ximon18 Apr 30, 2024
4313cfd
Files missed from previous commit.
ximon18 Apr 30, 2024
e991c59
Minor cleanup.
ximon18 Apr 30, 2024
d98401e
Merge branch 'main' into service-layering
ximon18 Jun 12, 2024
7b765dd
Compilation fix.
ximon18 Jun 12, 2024
f5e39f7
Merge branch 'main' into service-layering
ximon18 Jun 26, 2024
5dd9b0e
Merge branch 'main' into service-layering
ximon18 Jun 27, 2024
558aa41
Merge branch 'main' into service-layering
ximon18 Jul 9, 2024
eac8e22
Merge branch 'main' into service-layering
ximon18 Jul 23, 2024
16f369b
RustDoc fixes and additions, minor code tweaks and removal of comment…
ximon18 Jul 23, 2024
def12fd
RustDoc fixes.
ximon18 Jul 23, 2024
0513f0b
Remove RFC 9567 example. as it has nothing to do with the other chang…
ximon18 Jul 23, 2024
47a3a15
cargo fmt.
ximon18 Jul 23, 2024
d18cd26
Merge branch 'main' into service-layering
ximon18 Jul 23, 2024
21a7f0b
Merge branch 'main' into service-layering
ximon18 Jul 24, 2024
c6ec451
Merge branch 'main' into service-layering
ximon18 Jul 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ name = "domain"
path = "src/lib.rs"

[dependencies]
octseq = { version = "0.5.2-dev", git = "https://github.com/NLnetLabs/octseq.git", rev ="3f7797f4274af0a52e66105250ee1186ff2ab6ac", default-features = false }
time = { version = "0.3.1", default-features = false }

octseq = { version = "0.5.2-dev", git = "https://github.com/NLnetLabs/octseq.git", rev ="3f7797f4274af0a52e66105250ee1186ff2ab6ac", default-features = false }
time = { version = "0.3.1", default-features = false }
rand = { version = "0.8", optional = true }
arc-swap = { version = "1.7.0", optional = true }
bytes = { version = "1.0", optional = true, default-features = false }
Expand Down
248 changes: 40 additions & 208 deletions examples/serve-zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
//!
//! dig @127.0.0.1 -p 8053 AXFR example.com

use std::future::{pending, ready, Future};
use std::future::pending;
use std::io::BufReader;
use std::process::exit;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;

use domain::base::iana::{Opcode, Rcode};
use domain::base::message_builder::AdditionalBuilder;
use domain::base::{Message, Name, Rtype, ToName};
use domain::base::iana::Rcode;
use domain::base::ToName;
use domain::net::server::buf::VecBufSource;
use domain::net::server::dgram::DgramServer;
use domain::net::server::message::Request;
use domain::net::server::service::{
CallResult, ServiceError, Transaction, TransactionStream,
};
#[cfg(feature = "siphasher")]
use domain::net::server::middleware::cookies::CookiesMiddlewareSvc;
use domain::net::server::middleware::edns::EdnsMiddlewareSvc;
use domain::net::server::middleware::mandatory::MandatoryMiddlewareSvc;
use domain::net::server::service::{CallResult, ServiceResult};
use domain::net::server::stream::StreamServer;
use domain::net::server::util::{mk_builder_for_target, service_fn};
use domain::zonefile::inplace;
use domain::zonetree::{Answer, Rrset};
use domain::zonetree::Answer;
use domain::zonetree::{Zone, ZoneTree};
use octseq::OctetsBuilder;
use tokio::net::{TcpListener, UdpSocket};
use tracing_subscriber::EnvFilter;

Expand All @@ -51,13 +51,13 @@ async fn main() {
.ok();

// Populate a zone tree with test data
let mut zones = ZoneTree::new();
let zone_bytes = include_bytes!("../test-data/zonefiles/nsd-example.txt");
let mut zone_bytes = BufReader::new(&zone_bytes[..]);

// We're reading from static data so this cannot fail due to I/O error.
// Don't handle errors that shouldn't happen, keep the example focused
// on what we want to demonstrate.
let mut zones = ZoneTree::new();
let reader =
inplace::Zonefile::load(&mut zone_bytes).unwrap_or_else(|err| {
eprintln!("Error reading zone file bytes: {err}");
Expand All @@ -77,7 +77,13 @@ async fn main() {
let zones = Arc::new(zones);

let addr = "127.0.0.1:8053";
let svc = Arc::new(service_fn(my_service, zones));
let svc = service_fn(my_service, zones);

#[cfg(feature = "siphasher")]
let svc = CookiesMiddlewareSvc::<Vec<u8>, _>::with_random_secret(svc);
let svc = EdnsMiddlewareSvc::<Vec<u8>, _>::new(svc);
let svc = MandatoryMiddlewareSvc::<Vec<u8>, _>::new(svc);
let svc = Arc::new(svc);

let sock = UdpSocket::bind(addr).await.unwrap();
let sock = Arc::new(sock);
Expand All @@ -97,21 +103,32 @@ async fn main() {

tokio::spawn(async move { tcp_srv.run().await });

eprintln!("Ready");

tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(5000)).await;
for (i, metrics) in udp_metrics.iter().enumerate() {
eprintln!(
"Server status: UDP[{i}]: #conn={:?}, #in-flight={}, #pending-writes={}, #msgs-recvd={}, #msgs-sent={}",
metrics.num_connections(),
metrics.num_inflight_requests(),
metrics.num_pending_writes(),
metrics.num_received_requests(),
metrics.num_sent_responses(),
);

let mut udp_num_connections = 0;
let mut udp_num_inflight_requests = 0;
let mut udp_num_pending_writes = 0;
let mut udp_num_received_requests = 0;
let mut udp_num_sent_responses = 0;

for metrics in udp_metrics.iter() {
udp_num_connections += metrics.num_connections();
udp_num_inflight_requests += metrics.num_inflight_requests();
udp_num_pending_writes += metrics.num_pending_writes();
udp_num_received_requests += metrics.num_received_requests();
udp_num_sent_responses += metrics.num_sent_responses();
}
eprintln!(
"Server status: TCP: #conn={:?}, #in-flight={}, #pending-writes={}, #msgs-recvd={}, #msgs-sent={}",
"Server status: #conn/#in-flight/#pending-writes/#msgs-recvd/#msgs-sent: UDP={}/{}/{}/{}/{} TCP={}/{}/{}/{}/{}",
udp_num_connections,
udp_num_inflight_requests,
udp_num_pending_writes,
udp_num_received_requests,
udp_num_sent_responses,
tcp_metrics.num_connections(),
tcp_metrics.num_inflight_requests(),
tcp_metrics.num_pending_writes(),
Expand All @@ -128,30 +145,7 @@ async fn main() {
fn my_service(
request: Request<Vec<u8>>,
zones: Arc<ZoneTree>,
) -> Result<
Transaction<
Vec<u8>,
impl Future<Output = Result<CallResult<Vec<u8>>, ServiceError>> + Send,
>,
ServiceError,
> {
let qtype = request.message().sole_question().unwrap().qtype();
match qtype {
Rtype::AXFR if request.transport_ctx().is_non_udp() => {
let fut = handle_axfr_request(request, zones);
Ok(Transaction::stream(Box::pin(fut)))
}
_ => {
let fut = handle_non_axfr_request(request, zones);
Ok(Transaction::single(fut))
}
}
}

async fn handle_non_axfr_request(
request: Request<Vec<u8>>,
zones: Arc<ZoneTree>,
) -> Result<CallResult<Vec<u8>>, ServiceError> {
) -> ServiceResult<Vec<u8>> {
let question = request.message().sole_question().unwrap();
let zone = zones
.find_zone(question.qname(), question.qclass())
Expand All @@ -169,165 +163,3 @@ async fn handle_non_axfr_request(
let additional = answer.to_message(request.message(), builder);
Ok(CallResult::new(additional))
}

async fn handle_axfr_request(
request: Request<Vec<u8>>,
zones: Arc<ZoneTree>,
) -> TransactionStream<Result<CallResult<Vec<u8>>, ServiceError>> {
let mut stream = TransactionStream::default();

// Look up the zone for the queried name.
let question = request.message().sole_question().unwrap();
let zone = zones
.find_zone(question.qname(), question.qclass())
.map(|zone| zone.read());

// If not found, return an NXDOMAIN error response.
let Some(zone) = zone else {
let answer = Answer::new(Rcode::NXDOMAIN);
add_to_stream(answer, request.message(), &mut stream);
return stream;
};

// https://datatracker.ietf.org/doc/html/rfc5936#section-2.2
// 2.2: AXFR Response
//
// "An AXFR response that is transferring the zone's contents
// will consist of a series (which could be a series of
// length 1) of DNS messages. In such a series, the first
// message MUST begin with the SOA resource record of the
// zone, and the last message MUST conclude with the same SOA
// resource record. Intermediate messages MUST NOT contain
// the SOA resource record. The AXFR server MUST copy the
// Question section from the corresponding AXFR query message
// into the first response message's Question section. For
// subsequent messages, it MAY do the same or leave the
// Question section empty."

// Get the SOA record as AXFR transfers must start and end with the SOA
// record. If not found, return a SERVFAIL error response.
let qname = question.qname().to_bytes();
let Ok(soa_answer) = zone.query(qname, Rtype::SOA) else {
let answer = Answer::new(Rcode::SERVFAIL);
add_to_stream(answer, request.message(), &mut stream);
return stream;
};

// Push the begin SOA response message into the stream
add_to_stream(soa_answer.clone(), request.message(), &mut stream);

// "The AXFR protocol treats the zone contents as an unordered
// collection (or to use the mathematical term, a "set") of
// RRs. Except for the requirement that the transfer must
// begin and end with the SOA RR, there is no requirement to
// send the RRs in any particular order or grouped into
// response messages in any particular way. Although servers
// typically do attempt to send related RRs (such as the RRs
// forming an RRset, and the RRsets of a name) as a
// contiguous group or, when message space allows, in the
// same response message, they are not required to do so, and
// clients MUST accept any ordering and grouping of the
// non-SOA RRs. Each RR SHOULD be transmitted only once, and
// AXFR clients MUST ignore any duplicate RRs received.
//
// Each AXFR response message SHOULD contain a sufficient
// number of RRs to reasonably amortize the per-message
// overhead, up to the largest number that will fit within a
// DNS message (taking the required content of the other
// sections into account, as described below).
//
// Some old AXFR clients expect each response message to
// contain only a single RR. To interoperate with such
// clients, the server MAY restrict response messages to a
// single RR. As there is no standard way to automatically
// detect such clients, this typically requires manual
// configuration at the server."

let stream = Arc::new(Mutex::new(stream));
let cloned_stream = stream.clone();
let cloned_msg = request.message().clone();

let op = Box::new(move |owner: Name<_>, rrset: &Rrset| {
if rrset.rtype() != Rtype::SOA {
let builder = mk_builder_for_target();
let mut answer =
builder.start_answer(&cloned_msg, Rcode::NOERROR).unwrap();
for item in rrset.data() {
answer.push((owner.clone(), rrset.ttl(), item)).unwrap();
}

let additional = answer.additional();
let mut stream = cloned_stream.lock().unwrap();
add_additional_to_stream(additional, &cloned_msg, &mut stream);
}
});
zone.walk(op);

let mutex = Arc::try_unwrap(stream).unwrap();
let mut stream = mutex.into_inner().unwrap();

// Push the end SOA response message into the stream
add_to_stream(soa_answer, request.message(), &mut stream);

stream
}

#[allow(clippy::type_complexity)]
fn add_to_stream(
answer: Answer,
msg: &Message<Vec<u8>>,
stream: &mut TransactionStream<Result<CallResult<Vec<u8>>, ServiceError>>,
) {
let builder = mk_builder_for_target();
let additional = answer.to_message(msg, builder);
add_additional_to_stream(additional, msg, stream);
}

#[allow(clippy::type_complexity)]
fn add_additional_to_stream(
mut additional: AdditionalBuilder<domain::base::StreamTarget<Vec<u8>>>,
msg: &Message<Vec<u8>>,
stream: &mut TransactionStream<Result<CallResult<Vec<u8>>, ServiceError>>,
) {
set_axfr_header(msg, &mut additional);
stream.push(ready(Ok(CallResult::new(additional))));
}

fn set_axfr_header<Target>(
msg: &Message<Vec<u8>>,
additional: &mut AdditionalBuilder<Target>,
) where
Target: AsMut<[u8]>,
Target: OctetsBuilder,
{
// https://datatracker.ietf.org/doc/html/rfc5936#section-2.2.1
// 2.2.1: Header Values
//
// "These are the DNS message header values for AXFR responses.
//
// ID MUST be copied from request -- see Note a)
//
// QR MUST be 1 (Response)
//
// OPCODE MUST be 0 (Standard Query)
//
// Flags:
// AA normally 1 -- see Note b)
// TC MUST be 0 (Not truncated)
// RD RECOMMENDED: copy request's value; MAY be set to 0
// RA SHOULD be 0 -- see Note c)
// Z "mbz" -- see Note d)
// AD "mbz" -- see Note d)
// CD "mbz" -- see Note d)"
let header = additional.header_mut();
header.set_id(msg.header().id());
header.set_qr(true);
header.set_opcode(Opcode::QUERY);
header.set_aa(true);
header.set_tc(false);
header.set_rd(msg.header().rd());
header.set_ra(false);
header.set_z(false);
header.set_ad(false);
header.set_cd(false);
}
Loading