Skip to content

Commit

Permalink
Enforce user defined unique keys for aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
erikrozendaal committed Jan 6, 2025
1 parent 75e153b commit 9aa5cc9
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 4 deletions.
19 changes: 19 additions & 0 deletions db/sequent_pgsql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ DECLARE
_provided_events_partition_key aggregates.events_partition_key%TYPE;
_events_partition_key aggregates.events_partition_key%TYPE;
_snapshot_outdated_at aggregates_that_need_snapshots.snapshot_outdated_at%TYPE;
_unique_keys jsonb;
BEGIN
_command_id = store_command(_command);

Expand Down Expand Up @@ -165,6 +166,7 @@ BEGIN
_aggregate_id = _aggregate->>'aggregate_id';
_provided_events_partition_key = _aggregate->>'events_partition_key';
_snapshot_outdated_at = _aggregate->>'snapshot_outdated_at';
_unique_keys = COALESCE(_aggregate->'unique_keys', '{}'::jsonb);

SELECT * INTO _aggregate_row FROM aggregates WHERE aggregate_id = _aggregate_id;
_events_partition_key = COALESCE(_provided_events_partition_key, _aggregate_row.events_partition_key, '');
Expand All @@ -179,6 +181,23 @@ BEGIN
DO UPDATE SET events_partition_key = EXCLUDED.events_partition_key
WHERE aggregates.events_partition_key IS DISTINCT FROM EXCLUDED.events_partition_key;

DELETE FROM aggregate_unique_keys AS target
WHERE target.aggregate_id = _aggregate_id
AND NOT (_unique_keys ? target.scope);

BEGIN
INSERT INTO aggregate_unique_keys AS target (aggregate_id, scope, key)
SELECT _aggregate_id, key, value
FROM jsonb_each(_unique_keys) AS x
ON CONFLICT (aggregate_id, scope) DO UPDATE
SET key = EXCLUDED.key
WHERE target.key <> EXCLUDED.key;
EXCEPTION
WHEN unique_violation THEN
RAISE unique_violation
USING MESSAGE = 'duplicate aggregate key value for aggregate ' || _aggregate_id || ' (' || SQLERRM || ')';
END;

INSERT INTO events (partition_key, aggregate_id, sequence_number, created_at, command_id, event_type_id, event_json)
SELECT _events_partition_key,
_aggregate_id,
Expand Down
5 changes: 5 additions & 0 deletions db/sequent_schema_indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ CREATE INDEX events_event_type_id_idx ON events (event_type_id);
ALTER TABLE aggregates
ADD FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_types (id) ON UPDATE CASCADE;

ALTER TABLE aggregate_unique_keys
ADD PRIMARY KEY (aggregate_id, scope),
ADD UNIQUE (scope, key),
ADD FOREIGN KEY (aggregate_id) REFERENCES aggregates (aggregate_id) ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE events
ADD FOREIGN KEY (partition_key, aggregate_id) REFERENCES aggregates (events_partition_key, aggregate_id)
ON UPDATE CASCADE ON DELETE RESTRICT;
Expand Down
6 changes: 6 additions & 0 deletions db/sequent_schema_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ CREATE TABLE aggregates (
created_at timestamp with time zone NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (aggregate_id);

CREATE TABLE aggregate_unique_keys (
aggregate_id uuid NOT NULL,
scope text NOT NULL,
key jsonb NOT NULL
);

CREATE TABLE events (
aggregate_id uuid NOT NULL,
partition_key text NOT NULL DEFAULT '',
Expand Down
5 changes: 5 additions & 0 deletions lib/sequent/core/aggregate_root.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,17 @@ def to_s
"#{self.class.name}: #{@id}"
end

def unique_keys
{}
end

def event_stream
EventStream.new(
aggregate_type: self.class.name,
aggregate_id: id,
events_partition_key: events_partition_key,
snapshot_outdated_at: snapshot_outdated? ? Time.now : nil,
unique_keys:,
)
end

Expand Down
11 changes: 9 additions & 2 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class EventStore
class OptimisticLockingError < RuntimeError
end

class AggregateKeyNotUniqueError < RuntimeError
end

class DeserializeEventError < RuntimeError
attr_reader :event_hash

Expand Down Expand Up @@ -250,8 +253,12 @@ def store_events(command, streams_with_events = [])
Sequent::Core::Oj.dump(events),
],
)
rescue ActiveRecord::RecordNotUnique
raise OptimisticLockingError
rescue ActiveRecord::RecordNotUnique => e
if e.message =~ /duplicate aggregate key value/
raise AggregateKeyNotUniqueError
else
raise OptimisticLockingError
end
end

def convert_timestamp(timestamp)
Expand Down
11 changes: 9 additions & 2 deletions lib/sequent/core/stream_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@

module Sequent
module Core
EventStream = Data.define(:aggregate_type, :aggregate_id, :events_partition_key, :snapshot_outdated_at) do
def initialize(aggregate_type:, aggregate_id:, events_partition_key: '', snapshot_outdated_at: nil)
EventStream = Data.define(
:aggregate_type,
:aggregate_id,
:events_partition_key,
:snapshot_outdated_at,
:unique_keys,
) do
def initialize(aggregate_type:, aggregate_id:, events_partition_key: '', snapshot_outdated_at: nil,
unique_keys: {})
super
end
end
Expand Down
31 changes: 31 additions & 0 deletions spec/lib/sequent/core/aggregate_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -481,5 +481,36 @@ def ping
expect(aggregate.first.pinged).to eq(2)
end
end

context 'with unique keys' do
class DummyWithUniqueKeysCreated < Sequent::Core::Event
attrs unique_keys: Object
end

class DummyAggregateWithUniqueKeys < Sequent::Core::AggregateRoot
def initialize(id, unique_keys)
super(id)
apply DummyWithUniqueKeysCreated, unique_keys:
end

def unique_keys
@unique_keys || {}
end

on DummyWithUniqueKeysCreated do |event|
@unique_keys = event.unique_keys
end
end

it 'enforces key uniqueness with the same scope' do
dummy1 = DummyAggregateWithUniqueKeys.new(Sequent.new_uuid, {email: '[email protected]'})
dummy2 = DummyAggregateWithUniqueKeys.new(Sequent.new_uuid, {email: '[email protected]'})
Sequent.aggregate_repository.add_aggregate(dummy1)
Sequent.aggregate_repository.add_aggregate(dummy2)

expect { Sequent.aggregate_repository.commit(DummyCommand.new) }
.to raise_error Sequent::Core::EventStore::AggregateKeyNotUniqueError
end
end
end
end

0 comments on commit 9aa5cc9

Please sign in to comment.