diff --git a/solar/src/actors/muxrpc/history_stream.rs b/solar/src/actors/muxrpc/history_stream.rs index 466cd4f..5fcae43 100644 --- a/solar/src/actors/muxrpc/history_stream.rs +++ b/solar/src/actors/muxrpc/history_stream.rs @@ -9,13 +9,14 @@ use kuska_ssb::{ rpc, }; use log::{debug, info, warn}; -use once_cell::sync::Lazy; -use regex::Regex; use crate::{ - actors::muxrpc::{ - blobs_get::RpcBlobsGetEvent, - handler::{RpcHandler, RpcInput}, + actors::{ + muxrpc::{ + blobs_get::RpcBlobsGetEvent, + handler::{RpcHandler, RpcInput}, + }, + replication::blobs, }, broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination}, config::{PEERS_TO_REPLICATE, RESYNC_CONFIG, SECRET_CONFIG}, @@ -26,10 +27,6 @@ use crate::{ Result, }; -/// Regex pattern used to match blob references. -pub static BLOB_REGEX: Lazy = - Lazy::new(|| Regex::new(r"(&[0-9A-Za-z/+=]*.sha256)").unwrap()); - #[derive(Debug)] struct HistoryStreamRequest { req_no: i32, @@ -174,22 +171,6 @@ where Ok(false) } - /// Extract blob references from post-type messages. - fn extract_blob_refs(&mut self, msg: &Message) -> Vec { - let mut refs = Vec::new(); - - let msg = serde_json::from_value(msg.content().clone()); - - if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg { - for cap in BLOB_REGEX.captures_iter(&text) { - let key = cap.get(0).unwrap().as_str().to_owned(); - refs.push(key); - } - } - - refs - } - /// Process an incoming MUXRPC response. The response is expected to /// contain an SSB message. async fn recv_rpc_response( @@ -236,13 +217,9 @@ where // Extract blob references from the received message and // request those blobs if they are not already in the local // blobstore. - for key in self.extract_blob_refs(&msg) { + for key in blobs::extract_blob_refs(&msg) { if !BLOB_STORE.read().await.exists(&key) { - let event = RpcBlobsGetEvent(dto::BlobsGetIn { - key, - size: None, - max: None, - }); + let event = RpcBlobsGetEvent(dto::BlobsGetIn::new(key)); let broker_msg = BrokerEvent::new( Destination::Broadcast, BrokerMessage::RpcBlobsGet(event), diff --git a/solar/src/actors/replication/blobs.rs b/solar/src/actors/replication/blobs.rs new file mode 100644 index 0000000..d0c8622 --- /dev/null +++ b/solar/src/actors/replication/blobs.rs @@ -0,0 +1,23 @@ +use kuska_ssb::{api::dto::content::TypedMessage, feed::Message}; +use once_cell::sync::Lazy; +use regex::Regex; + +/// Regex pattern used to match blob references. +pub static BLOB_REGEX: Lazy = + Lazy::new(|| Regex::new(r"(&[0-9A-Za-z/+=]*.sha256)").unwrap()); + +/// Extract blob references from post-type messages. +pub fn extract_blob_refs(msg: &Message) -> Vec { + let mut refs = Vec::new(); + + let msg = serde_json::from_value(msg.content().clone()); + + if let Ok(TypedMessage::Post { text, .. }) = msg { + for cap in BLOB_REGEX.captures_iter(&text) { + let key = cap.get(0).unwrap().as_str().to_owned(); + refs.push(key); + } + } + + refs +} diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 62982c8..7172a8a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -14,19 +14,26 @@ use std::{ use async_std::task; use futures::{select_biased, FutureExt, SinkExt, StreamExt}; -use kuska_ssb::{api::dto::content::SsbId, crypto::ToSsbId, feed::Message}; +use kuska_ssb::{ + api::dto::{content::SsbId, BlobsGetIn}, + crypto::ToSsbId, + feed::Message, +}; use log::{debug, error, trace, warn}; use serde_json::Value; use crate::{ actors::{ - muxrpc::ReqNo, + muxrpc::{ReqNo, RpcBlobsGetEvent}, network::{connection::ConnectionData, connection_manager::ConnectionEvent}, - replication::ebt::{clock, replicator, EncodedClockValue, VectorClock}, + replication::{ + blobs, + ebt::{clock, replicator, EncodedClockValue, VectorClock}, + }, }, broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER}, config::PEERS_TO_REPLICATE, - node::KV_STORE, + node::{BLOB_STORE, KV_STORE}, Result, }; @@ -354,6 +361,21 @@ impl EbtManager { msg.sequence(), msg.author() ); + + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Extract blob references from the received message and + // request those blobs if they are not already in the local + // blobstore. + for key in blobs::extract_blob_refs(&msg) { + if !BLOB_STORE.read().await.exists(&key) { + let event = RpcBlobsGetEvent(BlobsGetIn::new(key)); + let broker_msg = + BrokerEvent::new(Destination::Broadcast, BrokerMessage::RpcBlobsGet(event)); + ch_broker.send(broker_msg).await?; + } + } } else { warn!( "Received out-of-order message from {}; received: {}, expected: {} + 1", diff --git a/solar/src/actors/replication/mod.rs b/solar/src/actors/replication/mod.rs index f80eeb9..e99e2de 100644 --- a/solar/src/actors/replication/mod.rs +++ b/solar/src/actors/replication/mod.rs @@ -1,3 +1,4 @@ +pub mod blobs; pub mod classic; pub mod config; pub mod ebt;