Skip to content

Commit

Permalink
Merge pull request #84 from mycognosist/request_blobs_from_ebt_msgs
Browse files Browse the repository at this point in the history
Request blobs from EBT msgs
  • Loading branch information
mycognosist authored Dec 26, 2023
2 parents 087baa0 + 738017f commit 0897be2
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 35 deletions.
39 changes: 8 additions & 31 deletions solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use kuska_ssb::{
rpc,
};
use log::{debug, info, warn};
use once_cell::sync::Lazy;
use regex::Regex;

use crate::{
actors::muxrpc::{
blobs_get::RpcBlobsGetEvent,
handler::{RpcHandler, RpcInput},
actors::{
muxrpc::{
blobs_get::RpcBlobsGetEvent,
handler::{RpcHandler, RpcInput},
},
replication::blobs,
},
broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination},
config::{PEERS_TO_REPLICATE, RESYNC_CONFIG, SECRET_CONFIG},
Expand All @@ -26,10 +27,6 @@ use crate::{
Result,
};

/// Regex pattern used to match blob references.
pub static BLOB_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(&[0-9A-Za-z/+=]*.sha256)").unwrap());

#[derive(Debug)]
struct HistoryStreamRequest {
req_no: i32,
Expand Down Expand Up @@ -174,22 +171,6 @@ where
Ok(false)
}

/// Extract blob references from post-type messages.
fn extract_blob_refs(&mut self, msg: &Message) -> Vec<String> {
let mut refs = Vec::new();

let msg = serde_json::from_value(msg.content().clone());

if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg {
for cap in BLOB_REGEX.captures_iter(&text) {
let key = cap.get(0).unwrap().as_str().to_owned();
refs.push(key);
}
}

refs
}

/// Process an incoming MUXRPC response. The response is expected to
/// contain an SSB message.
async fn recv_rpc_response(
Expand Down Expand Up @@ -236,13 +217,9 @@ where
// Extract blob references from the received message and
// request those blobs if they are not already in the local
// blobstore.
for key in self.extract_blob_refs(&msg) {
for key in blobs::extract_blob_refs(&msg) {
if !BLOB_STORE.read().await.exists(&key) {
let event = RpcBlobsGetEvent(dto::BlobsGetIn {
key,
size: None,
max: None,
});
let event = RpcBlobsGetEvent(dto::BlobsGetIn::new(key));
let broker_msg = BrokerEvent::new(
Destination::Broadcast,
BrokerMessage::RpcBlobsGet(event),
Expand Down
23 changes: 23 additions & 0 deletions solar/src/actors/replication/blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use kuska_ssb::{api::dto::content::TypedMessage, feed::Message};
use once_cell::sync::Lazy;
use regex::Regex;

/// Regex pattern used to match blob references.
pub static BLOB_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(&[0-9A-Za-z/+=]*.sha256)").unwrap());

/// Extract blob references from post-type messages.
pub fn extract_blob_refs(msg: &Message) -> Vec<String> {
let mut refs = Vec::new();

let msg = serde_json::from_value(msg.content().clone());

if let Ok(TypedMessage::Post { text, .. }) = msg {
for cap in BLOB_REGEX.captures_iter(&text) {
let key = cap.get(0).unwrap().as_str().to_owned();
refs.push(key);
}
}

refs
}
30 changes: 26 additions & 4 deletions solar/src/actors/replication/ebt/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ use std::{

use async_std::task;
use futures::{select_biased, FutureExt, SinkExt, StreamExt};
use kuska_ssb::{api::dto::content::SsbId, crypto::ToSsbId, feed::Message};
use kuska_ssb::{
api::dto::{content::SsbId, BlobsGetIn},
crypto::ToSsbId,
feed::Message,
};
use log::{debug, error, trace, warn};
use serde_json::Value;

use crate::{
actors::{
muxrpc::ReqNo,
muxrpc::{ReqNo, RpcBlobsGetEvent},
network::{connection::ConnectionData, connection_manager::ConnectionEvent},
replication::ebt::{clock, replicator, EncodedClockValue, VectorClock},
replication::{
blobs,
ebt::{clock, replicator, EncodedClockValue, VectorClock},
},
},
broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER},
config::PEERS_TO_REPLICATE,
node::KV_STORE,
node::{BLOB_STORE, KV_STORE},
Result,
};

Expand Down Expand Up @@ -354,6 +361,21 @@ impl EbtManager {
msg.sequence(),
msg.author()
);

// Create channel to send messages to broker.
let mut ch_broker = BROKER.lock().await.create_sender();

// Extract blob references from the received message and
// request those blobs if they are not already in the local
// blobstore.
for key in blobs::extract_blob_refs(&msg) {
if !BLOB_STORE.read().await.exists(&key) {
let event = RpcBlobsGetEvent(BlobsGetIn::new(key));
let broker_msg =
BrokerEvent::new(Destination::Broadcast, BrokerMessage::RpcBlobsGet(event));
ch_broker.send(broker_msg).await?;
}
}
} else {
warn!(
"Received out-of-order message from {}; received: {}, expected: {} + 1",
Expand Down
1 change: 1 addition & 0 deletions solar/src/actors/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod blobs;
pub mod classic;
pub mod config;
pub mod ebt;

0 comments on commit 0897be2

Please sign in to comment.