Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: settings clone #35

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ WORKDIR /app
RUN apt-get update -y && apt-get install -y ca-certificates libssl-dev libpq-dev --no-install-recommends && rm -rf /var/lib/apt/lists/*

COPY --from=builder /usr/src/autopulse/target/release/autopulse /usr/local/bin/
COPY default.toml default.toml

EXPOSE 2875

Expand Down
4 changes: 3 additions & 1 deletion Dockerfile.alpine
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ COPY README.md README.md
# Database support: postgres, sqlite
ARG ENABLED_FEATURES="postgres,sqlite"

# Add file to include
ADD default.toml default.toml

RUN cargo build --release --target x86_64-unknown-linux-musl --no-default-features --features ${ENABLED_FEATURES}

FROM alpine AS runtime
Expand All @@ -51,7 +54,6 @@ WORKDIR /app
# RUN addgroup -S user && adduser -S user -G user

COPY --from=builder /app/target/x86_64-unknown-linux-musl/release/autopulse /usr/local/bin/
COPY default.toml default.toml

# RUN chown -R user:user /app

Expand Down
2 changes: 1 addition & 1 deletion src/db/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl AnyConnection {
PRAGMA synchronous = NORMAL; -- fsync only in critical moments
PRAGMA wal_autocheckpoint = 1000; -- write WAL changes back every 1000 pages, for an in average 1MB WAL file. May affect readers if number is increased
PRAGMA wal_checkpoint(TRUNCATE); -- free some space by truncating possibly massive WAL files from the last run.
PRAGMA busy_timeout = 250; -- sleep if the database is busy
PRAGMA busy_timeout = 5000; -- sleep if the database is busy
PRAGMA foreign_keys = ON; -- enforce foreign keys
").map_err(ConnectionError::CouldntSetupConfiguration)?;
}
Expand Down
18 changes: 8 additions & 10 deletions src/service/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ use crate::{
};
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error};

#[derive(Clone)]
pub struct PulseManager {
pub settings: Settings,
pub settings: Arc<Settings>,
pub pool: DbPool,
pub webhooks: Arc<WebhookManager>,
}

impl PulseManager {
pub fn new(settings: Settings, pool: DbPool) -> Self {
let settings = Arc::new(settings);

Self {
settings: settings.clone(),
pool,
Expand Down Expand Up @@ -77,12 +78,12 @@ impl PulseManager {
}

pub fn start(&self) -> tokio::task::JoinHandle<()> {
let settings = self.settings.clone();
let pool = self.pool.clone();
let webhooks = self.webhooks.clone();
let settings = self.settings.clone();

tokio::spawn(async move {
let runner = PulseRunner::new(Arc::new(RwLock::new(settings)), pool, webhooks);
let runner = PulseRunner::new(settings, pool, webhooks);
let mut timer = tokio::time::interval(std::time::Duration::from_secs(1));

loop {
Expand Down Expand Up @@ -114,7 +115,6 @@ impl PulseManager {
let (global_tx, mut global_rx) = tokio::sync::mpsc::unbounded_channel();

let settings = self.settings.clone();
let settings = Arc::new(settings);

for (name, trigger) in settings.triggers.clone() {
if let Trigger::Notify(service) = trigger {
Expand All @@ -129,14 +129,10 @@ impl PulseManager {
}
});

let settings_clone = settings.clone();

tokio::spawn(async move {
while let Some(file_path) = rx.recv().await {
if let Err(e) = global_tx.send((name.clone(), file_path)) {
error!("unable to send notify event: {:?}", e);
} else {
settings_clone.triggers.get(&name).unwrap().tick();
}
}
});
Expand All @@ -161,8 +157,10 @@ impl PulseManager {

manager
.webhooks
.add_event(EventType::New, Some(name), &[file_path])
.add_event(EventType::New, Some(name.clone()), &[file_path])
.await;

settings.triggers.get(&name).unwrap().tick();
}
})
}
Expand Down
36 changes: 9 additions & 27 deletions src/service/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@ use crate::{
};
use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::RwLock;
use tracing::{error, info, warn};

pub(super) struct PulseRunner {
webhooks: Arc<WebhookManager>,
settings: Arc<RwLock<Settings>>,
settings: Arc<Settings>,
pool: DbPool,
}

impl PulseRunner {
pub const fn new(
settings: Arc<RwLock<Settings>>,
pool: DbPool,
webhooks: Arc<WebhookManager>,
) -> Self {
pub const fn new(settings: Arc<Settings>, pool: DbPool, webhooks: Arc<WebhookManager>) -> Self {
Self {
webhooks,
settings,
Expand All @@ -35,14 +30,10 @@ impl PulseRunner {
}

async fn update_found_status(&self) -> anyhow::Result<()> {
let read_settings = self.settings.read().await;

if !read_settings.opts.check_path {
if !self.settings.opts.check_path {
return Ok(());
}

drop(read_settings);

let mut found_files = vec![];
let mut mismatched_files = vec![];

Expand Down Expand Up @@ -111,9 +102,7 @@ impl PulseRunner {
pub async fn update_process_status(&self) -> anyhow::Result<()> {
let mut conn = get_conn(&self.pool);

let read_settings = self.settings.read().await;

let tickable = read_settings.get_tickable_triggers();
let tickable = self.settings.get_tickable_triggers();

let base_query = scan_events
.filter(process_status.ne::<String>(ProcessStatus::Complete.into()))
Expand All @@ -126,7 +115,7 @@ impl PulseRunner {
// filter by trigger in tickable
.filter(event_source.eq_any(tickable));

let mut evs = if read_settings.opts.check_path {
let mut evs = if self.settings.opts.check_path {
base_query
.filter(found_status.eq::<String>(FoundStatus::Found.into()))
.load::<ScanEvent>(&mut conn)?
Expand All @@ -138,8 +127,6 @@ impl PulseRunner {
return Ok(());
}

drop(read_settings);

let (processed, retrying, failed) = self.process_events(&mut evs).await?;

if !processed.is_empty() {
Expand Down Expand Up @@ -186,11 +173,10 @@ impl PulseRunner {
evs: &mut [ScanEvent],
) -> anyhow::Result<(Vec<String>, Vec<String>, Vec<String>)> {
let mut failed_ids = vec![];
let mut rw_settings = self.settings.write().await;

let trigger_settings = rw_settings.triggers.clone();
let trigger_settings = &self.settings.triggers;

for (name, target) in rw_settings.targets.iter_mut() {
for (name, target) in self.settings.targets.iter() {
let evs = evs
.iter_mut()
.filter(|x| !x.get_targets_hit().contains(name))
Expand Down Expand Up @@ -239,7 +225,7 @@ impl PulseRunner {
if failed_ids.contains(&ev.id) {
ev.failed_times += 1;

if ev.failed_times >= rw_settings.opts.max_retries {
if ev.failed_times >= self.settings.opts.max_retries {
ev.process_status = ProcessStatus::Failed.into();
ev.next_retry_at = None;
failed.push(conn.save_changes(ev)?.file_path.clone());
Expand All @@ -265,12 +251,8 @@ impl PulseRunner {
async fn cleanup(&self) -> anyhow::Result<()> {
let mut conn = get_conn(&self.pool);

let read_settings = self.settings.read().await;

let time_before_cleanup = chrono::Utc::now().naive_utc()
- chrono::Duration::days(read_settings.opts.cleanup_days as i64);

drop(read_settings);
- chrono::Duration::days(self.settings.opts.cleanup_days as i64);

let delete_not_found = diesel::delete(
scan_events
Expand Down
2 changes: 1 addition & 1 deletion src/service/targets/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Command {
}

impl TargetProcess for Command {
async fn process(&mut self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
let mut succeded = Vec::new();

for ev in evs {
Expand Down
2 changes: 1 addition & 1 deletion src/service/targets/emby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl Emby {
}

impl TargetProcess for Emby {
async fn process<'a>(&mut self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
async fn process<'a>(&self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
let libraries = self.libraries().await?;

let mut succeded = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion src/service/targets/fileflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl FileFlows {
}

impl TargetProcess for FileFlows {
async fn process<'a>(&mut self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
async fn process<'a>(&self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
let mut succeeded = Vec::new();

for ev in evs {
Expand Down
2 changes: 1 addition & 1 deletion src/service/targets/plex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Plex {
}

impl TargetProcess for Plex {
async fn process<'a>(&mut self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
async fn process<'a>(&self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
let libraries = self.libraries().await?;

let mut succeeded = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion src/service/targets/tdarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Tdarr {
}

impl TargetProcess for Tdarr {
async fn process<'a>(&mut self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
async fn process<'a>(&self, evs: &[&'a ScanEvent]) -> anyhow::Result<Vec<String>> {
let mut succeeded = Vec::new();

self.scan(evs).await?;
Expand Down
1 change: 0 additions & 1 deletion src/service/triggers/lidarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::utils::{
use serde::Deserialize;

#[derive(Deserialize, Clone)]

pub struct Lidarr {
/// Rewrite path
pub rewrite: Option<Rewrite>,
Expand Down
1 change: 0 additions & 1 deletion src/service/triggers/manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::utils::{settings::Rewrite, timer::Timer};
use serde::Deserialize;

#[derive(Deserialize, Clone)]

pub struct Manual {
/// Rewrite path
pub rewrite: Option<Rewrite>,
Expand Down
1 change: 0 additions & 1 deletion src/service/triggers/radarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::utils::{
use serde::Deserialize;

#[derive(Deserialize, Clone)]

pub struct Radarr {
/// Rewrite path
pub rewrite: Option<Rewrite>,
Expand Down
1 change: 0 additions & 1 deletion src/service/triggers/readarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::utils::{
use serde::Deserialize;

#[derive(Deserialize, Clone)]

pub struct Readarr {
/// Rewrite path
pub rewrite: Option<Rewrite>,
Expand Down
1 change: 0 additions & 1 deletion src/service/triggers/sonarr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::utils::{
use serde::Deserialize;

#[derive(Deserialize, Clone)]

pub struct Sonarr {
/// Rewrite path
pub rewrite: Option<Rewrite>,
Expand Down
4 changes: 2 additions & 2 deletions src/service/webhooks/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ impl EventType {

#[derive(Clone)]
pub struct WebhookManager {
settings: Settings,
settings: Arc<Settings>,
queue: Arc<RwLock<WebhookQueue>>,
}

impl WebhookManager {
pub fn new(settings: Settings) -> Self {
pub fn new(settings: Arc<Settings>) -> Self {
Self {
settings,
queue: Arc::new(RwLock::new(HashMap::new())),
Expand Down
11 changes: 7 additions & 4 deletions src/utils/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
webhooks::{discord::DiscordWebhook, WebhookBatch},
},
};
use config::{Config, File};
use config::{Config, FileFormat};
use serde::Deserialize;
use std::collections::HashMap;

Expand Down Expand Up @@ -99,7 +99,10 @@ pub struct Settings {
impl Settings {
pub fn get_settings(optional_config_file: Option<String>) -> anyhow::Result<Self> {
let mut settings = Config::builder()
.add_source(File::with_name("default.toml"))
.add_source(config::File::from_str(
include_str!("../../default.toml"),
FileFormat::Toml,
))
.add_source(config::File::with_name("config").required(false))
.add_source(config::Environment::with_prefix("AUTOPULSE").separator("__"));

Expand Down Expand Up @@ -229,7 +232,7 @@ impl Webhook {

pub trait TargetProcess {
fn process<'a>(
&mut self,
&self,
evs: &[&'a ScanEvent],
) -> impl std::future::Future<Output = anyhow::Result<Vec<String>>> + Send;
}
Expand All @@ -256,7 +259,7 @@ pub enum Target {
}

impl Target {
pub async fn process(&mut self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
pub async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
match self {
Self::Plex(p) => p.process(evs).await,
Self::Jellyfin(j) => j.process(evs).await,
Expand Down
3 changes: 1 addition & 2 deletions src/utils/timer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use serde::Deserialize;
use std::{
sync::{Arc, Mutex},
time::Duration,
};

use serde::Deserialize;

#[derive(Deserialize, Clone)]
pub struct Timer {
#[serde(skip)]
Expand Down
Loading