Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Finish billing
Browse files Browse the repository at this point in the history
  • Loading branch information
Geometrically committed Oct 10, 2024
1 parent 4d10a43 commit b9ce67c
Show file tree
Hide file tree
Showing 6 changed files with 467 additions and 324 deletions.
10 changes: 6 additions & 4 deletions migrations/20240923163452_charges-fix.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ CREATE TABLE charges (
price_id bigint REFERENCES products_prices NOT NULL,
amount bigint NOT NULL,
currency_code text NOT NULL,
subscription_id bigint NULL,
interval text NULL,
status varchar(255) NOT NULL,
due timestamptz DEFAULT CURRENT_TIMESTAMP NOT NULL,
last_attempt timestamptz NOT NULL
last_attempt timestamptz NULL,
charge_type text NOT NULL,
subscription_id bigint NULL,
subscription_interval text NULL
);

ALTER TABLE users_subscriptions DROP COLUMN last_charge;
ALTER TABLE users_subscriptions ADD COLUMN metadata jsonb NULL;
ALTER TABLE users_subscriptions ADD COLUMN metadata jsonb NULL;
ALTER TABLE users_subscriptions DROP COLUMN expires;
71 changes: 56 additions & 15 deletions src/database/models/charge_item.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::database::models::{
ChargeId, DatabaseError, ProductPriceId, UserId, UserSubscriptionId,
};
use crate::models::billing::{ChargeStatus, PriceDuration};
use crate::models::billing::{ChargeStatus, ChargeType, PriceDuration};
use chrono::{DateTime, Utc};
use std::convert::{TryFrom, TryInto};

Expand All @@ -11,11 +11,13 @@ pub struct ChargeItem {
pub price_id: ProductPriceId,
pub amount: i64,
pub currency_code: String,
pub subscription_id: Option<UserSubscriptionId>,
pub interval: Option<PriceDuration>,
pub status: ChargeStatus,
pub due: DateTime<Utc>,
pub last_attempt: Option<DateTime<Utc>>,

pub type_: ChargeType,
pub subscription_id: Option<UserSubscriptionId>,
pub subscription_interval: Option<PriceDuration>,
}

struct ChargeResult {
Expand All @@ -24,11 +26,12 @@ struct ChargeResult {
price_id: i64,
amount: i64,
currency_code: String,
subscription_id: Option<i64>,
interval: Option<String>,
status: String,
due: DateTime<Utc>,
last_attempt: Option<DateTime<Utc>>,
charge_type: String,
subscription_id: Option<i64>,
subscription_interval: Option<String>,
}

impl TryFrom<ChargeResult> for ChargeItem {
Expand All @@ -41,11 +44,14 @@ impl TryFrom<ChargeResult> for ChargeItem {
price_id: ProductPriceId(r.price_id),
amount: r.amount,
currency_code: r.currency_code,
subscription_id: r.subscription_id.map(UserSubscriptionId),
interval: r.interval.map(|x| serde_json::from_str(&x)).transpose()?,
status: serde_json::from_str(&r.status)?,
status: ChargeStatus::from_string(&r.status),
due: r.due,
last_attempt: r.last_attempt,
type_: ChargeType::from_string(&r.charge_type),
subscription_id: r.subscription_id.map(UserSubscriptionId),
subscription_interval: r
.subscription_interval
.map(|x| PriceDuration::from_string(&*x)),
})
}
}
Expand All @@ -55,7 +61,7 @@ macro_rules! select_charges_with_predicate {
sqlx::query_as!(
ChargeResult,
r#"
SELECT id, user_id, price_id, amount, currency_code, subscription_id, interval, status, due, last_attempt
SELECT id, user_id, price_id, amount, currency_code, status, due, last_attempt, charge_type, subscription_id, subscription_interval
FROM charges
"#
+ $predicate,
Expand All @@ -71,24 +77,27 @@ impl ChargeItem {
) -> Result<ChargeId, DatabaseError> {
sqlx::query!(
r#"
INSERT INTO charges (id, user_id, price_id, amount, currency_code, subscription_id, interval, status, due, last_attempt)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO charges (id, user_id, price_id, amount, currency_code, charge_type, status, due, last_attempt, subscription_id, subscription_interval)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (id)
DO UPDATE
SET status = EXCLUDED.status,
last_attempt = EXCLUDED.last_attempt,
due = EXCLUDED.due
due = EXCLUDED.due,
subscription_id = EXCLUDED.subscription_id,
subscription_interval = EXCLUDED.subscription_interval
"#,
self.id.0,
self.user_id.0,
self.price_id.0,
self.amount,
self.currency_code,
self.subscription_id.map(|x| x.0),
self.interval.map(|x| x.as_str()),
self.type_.as_str(),
self.status.as_str(),
self.due,
self.last_attempt,
self.subscription_id.map(|x| x.0),
self.subscription_interval.map(|x| x.as_str()),
)
.execute(&mut **transaction)
.await?;
Expand All @@ -113,7 +122,7 @@ impl ChargeItem {
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ChargeItem>, DatabaseError> {
let user_id = user_id.0;
let res = select_charges_with_predicate!("WHERE user_id = $1", user_id)
let res = select_charges_with_predicate!("WHERE user_id = $1 ORDER BY due DESC", user_id)
.fetch_all(exec)
.await?;

Expand All @@ -123,6 +132,21 @@ impl ChargeItem {
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}

pub async fn get_open_subscription(
user_subscription_id: UserSubscriptionId,
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Option<ChargeItem>, DatabaseError> {
let user_subscription_id = user_subscription_id.0;
let res = select_charges_with_predicate!(
"WHERE subscription_id = $1 AND (status = 'open' OR status = 'cancelled')",
user_subscription_id
)
.fetch_optional(exec)
.await?;

Ok(res.and_then(|r| r.try_into().ok()))
}

pub async fn get_chargeable(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<ChargeItem>, DatabaseError> {
Expand All @@ -137,4 +161,21 @@ impl ChargeItem {
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}

pub async fn remove(
id: ChargeId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
DELETE FROM charges
WHERE id = $1
",
id.0 as i64
)
.execute(&mut **transaction)
.await?;

Ok(())
}
}
58 changes: 30 additions & 28 deletions src/database/models/user_subscription_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub struct UserSubscriptionItem {
pub price_id: ProductPriceId,
pub interval: PriceDuration,
pub created: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub status: SubscriptionStatus,
pub metadata: Option<SubscriptionMetadata>,
}
Expand All @@ -21,7 +20,6 @@ struct UserSubscriptionResult {
price_id: i64,
interval: String,
pub created: DateTime<Utc>,
pub expires: DateTime<Utc>,
pub status: String,
pub metadata: serde_json::Value,
}
Expand All @@ -32,8 +30,8 @@ macro_rules! select_user_subscriptions_with_predicate {
UserSubscriptionResult,
r#"
SELECT
id, user_id, price_id, interval, created, expires, status, metadata
FROM users_subscriptions
us.id, us.user_id, us.price_id, us.interval, us.created, us.status, us.metadata
FROM users_subscriptions us
"#
+ $predicate,
$param
Expand All @@ -51,7 +49,6 @@ impl TryFrom<UserSubscriptionResult> for UserSubscriptionItem {
price_id: ProductPriceId(r.price_id),
interval: PriceDuration::from_string(&r.interval),
created: r.created,
expires: r.expires,
status: SubscriptionStatus::from_string(&r.status),
metadata: serde_json::from_value(r.metadata)?,
})
Expand All @@ -73,7 +70,7 @@ impl UserSubscriptionItem {
let ids = ids.iter().map(|id| id.0).collect_vec();
let ids_ref: &[i64] = &ids;
let results =
select_user_subscriptions_with_predicate!("WHERE id = ANY($1::bigint[])", ids_ref)
select_user_subscriptions_with_predicate!("WHERE us.id = ANY($1::bigint[])", ids_ref)
.fetch_all(exec)
.await?;

Expand All @@ -88,7 +85,7 @@ impl UserSubscriptionItem {
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let user_id = user_id.0;
let results = select_user_subscriptions_with_predicate!("WHERE user_id = $1", user_id)
let results = select_user_subscriptions_with_predicate!("WHERE us.user_id = $1", user_id)
.fetch_all(exec)
.await?;

Expand All @@ -98,22 +95,45 @@ impl UserSubscriptionItem {
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}

pub async fn get_all_unprovision(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let now = Utc::now();
let results = select_user_subscriptions_with_predicate!(
"
INNER JOIN charges c
ON c.subscription_id = us.id
AND (
(c.status = 'cancelled' AND c.due < $1) OR
(c.status = 'failed' AND c.last_attempt < $1 - INTERVAL '2 days')
)
",
now
)
.fetch_all(exec)
.await?;

Ok(results
.into_iter()
.map(|r| r.try_into())
.collect::<Result<Vec<_>, serde_json::Error>>()?)
}

pub async fn upsert(
&self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
INSERT INTO users_subscriptions (
id, user_id, price_id, interval, created, expires, status, metadata
id, user_id, price_id, interval, created, status, metadata
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8
$1, $2, $3, $4, $5, $6, $7
)
ON CONFLICT (id)
DO UPDATE
SET interval = EXCLUDED.interval,
expires = EXCLUDED.expires,
status = EXCLUDED.status,
price_id = EXCLUDED.price_id,
metadata = EXCLUDED.metadata
Expand All @@ -123,7 +143,6 @@ impl UserSubscriptionItem {
self.price_id.0,
self.interval.as_str(),
self.created,
self.expires,
self.status.as_str(),
serde_json::to_value(&self.metadata)?,
)
Expand All @@ -132,21 +151,4 @@ impl UserSubscriptionItem {

Ok(())
}

pub async fn remove(
id: UserSubscriptionId,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<(), DatabaseError> {
sqlx::query!(
"
DELETE FROM users_subscriptions
WHERE id = $1
",
id.0 as i64
)
.execute(&mut **transaction)
.await?;

Ok(())
}
}
27 changes: 18 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,24 @@ pub fn app_setup(
}

let stripe_client = stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap());
// {
// let pool_ref = pool.clone();
// let redis_ref = redis_pool.clone();
// let stripe_client_ref = stripe_client.clone();
//
// actix_rt::spawn(async move {
// routes::internal::billing::task(stripe_client_ref, pool_ref, redis_ref).await;
// });
// }
{
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
let stripe_client_ref = stripe_client.clone();

actix_rt::spawn(async move {
routes::internal::billing::task(stripe_client_ref, pool_ref, redis_ref).await;
});
}

{
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();

actix_rt::spawn(async move {
routes::internal::billing::subscription_task(pool_ref, redis_ref).await;
});
}

let ip_salt = Pepper {
pepper: models::ids::Base62Id(models::ids::random_base62(11)).to_string(),
Expand Down
Loading

0 comments on commit b9ce67c

Please sign in to comment.