Skip to content

Commit

Permalink
basic event sourcing
Browse files Browse the repository at this point in the history
  • Loading branch information
darky committed Dec 7, 2024
1 parent 6b726e2 commit 73650b3
Show file tree
Hide file tree
Showing 16 changed files with 366 additions and 20 deletions.
22 changes: 22 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"license": "MIT",
"dependencies": {
"pg": "^8.13.1",
"pg-cursor": "^2.12.1",
"robot3": "^1.0.1",
"throw": "^1.1.1",
"ts-pattern": "^5.5.0"
Expand All @@ -40,6 +41,7 @@
"@testcontainers/postgresql": "^10.16.0",
"@types/node": "^22.10.1",
"@types/pg": "^8.11.10",
"@types/pg-cursor": "^2.7.2",
"testcontainers": "^10.16.0",
"ts-fp-di": "^0.22.0",
"tsx": "^4.19.2",
Expand Down
46 changes: 46 additions & 0 deletions src/es.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import type { Pg } from './pg.ts'
import type { Adapter, Options, OutboxMessage, StartStop } from './types.ts'
import Cursor from 'pg-cursor'

export class Es implements StartStop {
private lastEventId = '0'

constructor(private readonly pg: Pg, private readonly adapter: Adapter, private readonly options: Options) {}

async start() {
await this.initSync()
}

async stop() {}

setLastEventId(index: OutboxMessage['id']) {
this.lastEventId = index
}

getLastEventId() {
return this.lastEventId
}

private async initSync() {
const client = await this.pg.getClient()
const cursor = client.query(
new Cursor(
`
select * from pg_trx_outbox
where is_event and id > $1
order by id
`,
[this.getLastEventId()]
)
)
while (true) {
const messages: OutboxMessage[] = await cursor.read(this.options.eventSourcingOptions?.initSyncBatchSize ?? 100)
if (!messages.length) {
break
}
await this.adapter.send(messages)
this.setLastEventId(messages.at(-1)!.id)
}
client.release()
}
}
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { P, match } from 'ts-pattern'
import { Pg } from './pg.ts'
import { Responder } from './responder.ts'
import { diDep, diExists, diHas } from 'ts-fp-di'
import { Es } from './es.ts'

export class PgTrxOutbox implements StartStop {
private pg: Pg
Expand All @@ -17,6 +18,7 @@ export class PgTrxOutbox implements StartStop {
private poller?: Poller
private notifier?: Notifier
private logical?: Logical
private es?: Es

constructor(options: Options) {
const opts: Options = {
Expand All @@ -30,7 +32,8 @@ export class PgTrxOutbox implements StartStop {
}
this.adapter = opts.adapter
this.pg = new Pg(opts)
this.transfer = new Transfer(opts, this.pg, this.adapter)
this.es = new Es(this.pg, this.adapter, opts)
this.transfer = new Transfer(opts, this.pg, this.adapter, this.es)
this.responder = new Responder(opts, this.pg)
const fsm = new FSM(opts, this.transfer)
match(opts.outboxOptions?.mode)
Expand All @@ -47,12 +50,14 @@ export class PgTrxOutbox implements StartStop {
await this.adapter.start()
await this.pg.start()
await this.responder.start()
await this.es?.start()
await this.poller?.start()
await this.notifier?.start()
await this.logical?.start()
}

async stop() {
await this.es?.stop()
await this.logical?.stop()
await this.notifier?.stop()
await this.poller?.stop()
Expand Down
44 changes: 25 additions & 19 deletions src/transfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ import type { PoolClient } from 'pg'
import { Pg } from './pg.ts'
import type { Adapter, Options, OutboxMessage } from './types.ts'
import thr from 'throw'
import type { Es } from './es.ts'

export class Transfer {
constructor(private readonly options: Options, private readonly pg: Pg, private readonly adapter: Adapter) {}
constructor(
private readonly options: Options,
private readonly pg: Pg,
private readonly adapter: Adapter,
private readonly es: Es
) {}

async transferMessages(passedMessages: readonly OutboxMessage[] = []) {
let messages: readonly OutboxMessage[] = []
Expand Down Expand Up @@ -48,23 +54,22 @@ export class Transfer {
)
}
await this.updateToProcessed(client, ids, responses, errors, metas, processed, attempts, sinceAt)
this.es.setLastEventId(messages.at(-1)?.id ?? '0')
}
} catch (e) {
if ((e as { code: string }).code !== '55P03') {
if (messages.length) {
await this.updateToProcessed(
client,
messages.map(r => r.id),
messages.map(() => null),
messages.map(() => (e as Error).stack ?? (e as Error).message ?? e),
messages.map(() => null),
messages.map(() => true),
messages.map(m => m.attempts),
messages.map(m => m.since_at)
)
}
throw e
if (messages.length) {
await this.updateToProcessed(
client,
messages.map(r => r.id),
messages.map(() => null),
messages.map(() => (e as Error).stack ?? (e as Error).message ?? e),
messages.map(() => null),
messages.map(() => true),
messages.map(m => m.attempts),
messages.map(m => m.since_at)
)
}
throw e
} finally {
await client.query('commit')
client.release()
Expand All @@ -79,15 +84,16 @@ export class Transfer {
select * from pg_trx_outbox${
this.options.outboxOptions?.partition == null ? '' : `_${this.options.outboxOptions?.partition}`
}
where processed = false and (since_at is null or now() > since_at) ${
this.options.outboxOptions?.topicFilter?.length ? 'and topic = any($2)' : ''
}
where not is_event and processed = false and (since_at is null or now() > since_at) ${
this.options.outboxOptions?.topicFilter?.length ? 'and topic = any($3)' : ''
} or is_event and id > $2
order by id
limit $1
for update nowait
for update
`,
[
this.options.outboxOptions?.limit ?? 50,
this.es.getLastEventId(),
...(this.options.outboxOptions?.topicFilter?.length ? [this.options.outboxOptions?.topicFilter] : []),
]
)
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ export type Options = {
*/
retryMaxAttempts?: number
}
eventSourcingOptions?: {
/**
* how much messages send for processing on init sync, default 100
*/
initSyncBatchSize?: number
}
}
1 change: 1 addition & 0 deletions test/adapters.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ beforeEach(async () => {
meta jsonb NULL,
context_id double precision NOT NULL DEFAULT random(),
attempts smallint NOT NULL DEFAULT 0,
is_event boolean NOT NULL DEFAULT false,
CONSTRAINT pg_trx_outbox_pk PRIMARY KEY (id)
);
`)
Expand Down
1 change: 1 addition & 0 deletions test/contextid.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ beforeEach(async () => {
meta jsonb NULL,
context_id double precision NOT NULL DEFAULT random(),
attempts smallint NOT NULL DEFAULT 0,
is_event boolean NOT NULL DEFAULT false,
CONSTRAINT pg_trx_outbox_pk PRIMARY KEY (id)
);
`)
Expand Down
1 change: 1 addition & 0 deletions test/error.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ beforeEach(async () => {
meta jsonb NULL,
context_id double precision NOT NULL DEFAULT random(),
attempts smallint NOT NULL DEFAULT 0,
is_event boolean NOT NULL DEFAULT false,
CONSTRAINT pg_trx_outbox_pk PRIMARY KEY (id)
);
`)
Expand Down
Loading

0 comments on commit 73650b3

Please sign in to comment.