From d7e3b8e45297c727af5702fc40f4766af2e5d528 Mon Sep 17 00:00:00 2001 From: Rafael Date: Mon, 1 Jun 2020 18:00:32 +1200 Subject: [PATCH] feat (MapAction, MapEvent) replace PersistEvent on event store --- README.md | 150 ++++++++++++------------ cqrs.go | 5 +- cqrs_test.go | 15 +-- eventStore.service.go | 76 +++++++----- examples/property/property.coverprofile | 28 ++--- examples/property/property.service.go | 5 +- 6 files changed, 146 insertions(+), 133 deletions(-) diff --git a/README.md b/README.md index 6fe5130..9bc19ec 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,9 @@ Make you life easy when doing CQRS back-ends :) If you need some info on what CQRS is please [click here :)](https://blog.knoldus.com/cqrs-and-event-sourcing/) ## Imports + At minimum you will need ... + ```go import ( "github.com/moleculer-go/cqrs" @@ -17,22 +19,21 @@ import ( ## Event Store - ![Event Store](docs/CQRS-persist-event.jpg "Event Store") -1) The first piece is the event store which is responsible to store the events (aka commands) and also propagate events to be processed and stored in aggregates. - 1.1 - The event store uses an adaptor to store the data allowing you to choose between multiple data stores. In this example we use SQLite. Once the event is saved for later processing the event store returns a success response. This operation goal is to be fast and reliable. +1. The first piece is the event store which is responsible to store the events (aka commands) and also propagate events to be processed and stored in aggregates. - 1.2 - The event store will dispatch each event and any service that subscribes to the event "profile.updated" will be able to process is it. + 1.1 - The event store uses an adaptor to store the data allowing you to choose between multiple data stores. In this example we use SQLite. Once the event is saved for later processing the event store returns a success response. This operation goal is to be fast and reliable. -2) The second piece is the service that will expose the action to be triggered by an api call. + 1.2 - The event store will dispatch each event and any service that subscribes to the event "profile.updated" will be able to process is it. - The action is responsible to perform validation and modification to the payload before saving the event. The action should not call any expensive service/action/lib at this stage since the objective is to save the event (aka write the command) as soon as possible. Complicated or expensive rules should be done later when the event is dispatched. +2. The second piece is the service that will expose the action to be triggered by an api call. - In this case the "profile.update" action maps directly to the event store using ```events.PersistEvent("profile.updated")```. + The action is responsible to perform validation and modification to the payload before saving the event. The action should not call any expensive service/action/lib at this stage since the objective is to save the event (aka write the command) as soon as possible. Complicated or expensive rules should be done later when the event is dispatched. -3) The third piece is the Aggregate which is where you will store your data. More details on that later. + In this case the "profile.update" action maps directly to the event store using ```events.MapAction("update", "profile.updated")```. +3. The third piece is the Aggregate which is where you will store your data. More details on that later. ```go //1. event store @@ -43,16 +44,14 @@ var Service = moleculer.ServiceSchema{ Name: "profile", Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()}, Actions: []moleculer.Action{ - { - Name: "update", - Handler: events.PersistEvent("profile.updated"), - }, + events.MapAction("update", "profile.updated"), //** Incomplete profile service **// } } ``` ## Aggregates + Aggregates are where you store your data in the way you want to consume it. Basically you will query your aggregates. If you have a screen that need to display the user profile you need an aggregate for that. @@ -60,6 +59,7 @@ If you have a screen that need to display the user profile you need an aggregate If you want some statistics (total, totalMarried, totalStudents) by country you need an aggregate for that. ![Aggregates](docs/CQRS-aggregates.jpg "Aggregates") + ```go //3. create an aggregate (simple table to store computed values) var summaryAggregate = cqrs.Aggregate( @@ -79,10 +79,7 @@ var Service = moleculer.ServiceSchema{ Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()}, Actions: []moleculer.Action{ //2. profile.update action just persists the payload in the store - { - Name: "update", - Handler: events.PersistEvent("profile.updated"), - }, + events.MapAction("update", "profile.updated"), { Name: "transformProfileSummary", Handler: transformProfileSummary, @@ -97,10 +94,11 @@ var Service = moleculer.ServiceSchema{ ## Transformations Between your event and your aggregates you need to transform the data. -In a traditional app where in the controller or business object you have assemble/transform all the data from the request to the format acceptable in your database. +In a traditional app where in the controller or business object you have assemble/transform all the data from the request to the format acceptable in your database. This exactly the same thing you do here, but you start with the event and end with a aggregate record that will be wither: be created, update or removed. The example bellow will create or update a summary record. + ```go //2.1 transforms the contents of the event "profile.created" // into a profile summary update. @@ -148,7 +146,6 @@ profile := <-bkr.Call("profile.update", M{ ``` - ### Store Adaptor ```go @@ -160,7 +157,7 @@ func storeFactory(fields ...map[string]interface{}) cqrs.StoreFactory { fields = append(fields, cqrsFields) //creates an store adapter, in this case SQLite //the store adapter is use to store data of the eventStore, so here is where - //you define the back end of your eventStore. + //you define the back end of your eventStore. // You could use mongo.MongoAdapter to have your eventStore backed by mongo DB. return &sqlite.Adapter{ URI: "file:memory:?mode=memory", @@ -173,6 +170,7 @@ func storeFactory(fields ...map[string]interface{}) cqrs.StoreFactory { ``` ### Storing Events + ```go // service property has an action create that will store the event "property.created" with the payload // sent to the action. @@ -180,18 +178,15 @@ var Service = moleculer.ServiceSchema{ Name: "property", Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()}, Actions: []moleculer.Action{ - { - Name: "create", - - // events.PersistEvent() creates a moleculer Action handle that will respond to the action. - // when the action property.create is invoked the eventStore will save the event an respond back with an standard answer. - Handler: events.PersistEvent("property.created"), - }, + // events.MapAction() creates a moleculer.Action with a handler that will respond to the action. + // when the action property.create is invoked the eventStore will save the event an respond back with an standard answer. + events.MapAction("create", "property.created"), //... ``` ### Pumping and handling events + ```go //same service as above, now is to demonstrate how you handle events dispatched by the event pump. var Service = moleculer.ServiceSchema{ @@ -221,65 +216,72 @@ var Service = moleculer.ServiceSchema{ Aggregates are database tables, elastic search indexes and etc. Simple as that. They store the "calculated values" or the "current state of the system" :) Basically there are never queries on events. Events are dispatched and any moleculer services listen to the them. Aggregates have an api to map event -> transformation -> aggregate action + ```go propertiesAggregate.On("property.created").Create("property.transformProperty"), - ``` - The code above is listening to event "property.created", it will use the transformation action "property.transformProperty" and it will invoke the aggregate action create. - ```go - summaryAggregate.On("property.created").Update("property.transformCountrySummary"), - ``` - The code above is listening to event "property.created", it will use the transformation action "property.transformCountrySummary" and it will invoke the aggregate action Update. - - ### Define your Aggregates - Aggregate are tables of data and depending on the data store they need an schema. - In this case with SQLite we need to specify the fields in our aggregate. - ```go - var summaryAggregate = cqrs.Aggregate( - "profileSummaryAggregate", - storeFactory(map[string]interface{}{ - "countryCode": "string", - "total": "integer", - "beachCity": "integer", - "mountain": "integer", - }), - cqrs.NoSnapshot, +``` + +The code above is listening to event "property.created", it will use the transformation action "property.transformProperty" and it will invoke the aggregate action create. + +```go +summaryAggregate.On("property.created").Update("property.transformCountrySummary"), +``` + +The code above is listening to event "property.created", it will use the transformation action "property.transformCountrySummary" and it will invoke the aggregate action Update. + +### Define your Aggregates + +Aggregate are tables of data and depending on the data store they need an schema. +In this case with SQLite we need to specify the fields in our aggregate. + +```go +var summaryAggregate = cqrs.Aggregate( +"profileSummaryAggregate", +storeFactory(map[string]interface{}{ +"countryCode": "string", +"total": "integer", +"beachCity": "integer", +"mountain": "integer", +}), +cqrs.NoSnapshot, ).Snapshot("propertyEventStore") - ``` - An aggregate contain all the actions available to any store (https://github.com/moleculer-go/store) - So you can just do: - ```go - summaries := <-bkr.Call("profileSummaryAggregate.find", M{}) - ``` +``` + +An aggregate contain all the actions available to any store (https://github.com/moleculer-go/store) +So you can just do: +```go +summaries := <-bkr.Call("profileSummaryAggregate.find", M{}) +``` ## Snapshots Snapshots are a work in progress. basic feature is implemented and now under stress test. snapshotName := aggregate.snapshot(): - - aggregate stops listening to events -> pause aggregate changes :) - - create a snapshot event -> new events will continue to be recorded in the events store after this point! = Write is enabled. - - ** no changes are happening on aggregates ** but reads continue happily. - - backup aggregates -> aggregate.backup(snapshotName) (SQLLite -> basicaly copy files :) ) - - process all events since the snapshot and start listening to events again. - - done. - - Error scenarios: - - if backup fails the event is marked as failed and is ignored when trying to restore events. - - Rationale: - ---> Since you created an event about the start of the snapshot at the same moment you stop processing events for that aggregate. this event should point to the backup file. so it can be used when restoring the snapshot. - - - The restore an snapshot is also very simple - aggregate.restore(snapshotName) - - find backup files using snapshotName and locate snapshot event in the event store. - - ** at this stage the event store might be receiving new events -> write is enabled ** - - backup is restored. - - read is enabled :) - - events start processing from the snapshot moment - - ** system takes a while to catch up ** - - system is eventually consistent :) +- aggregate stops listening to events -> pause aggregate changes :) +- create a snapshot event -> new events will continue to be recorded in the events store after this point! = Write is enabled. +- ** no changes are happening on aggregates ** but reads continue happily. +- backup aggregates -> aggregate.backup(snapshotName) (SQLLite -> basicaly copy files :) ) +- process all events since the snapshot and start listening to events again. +- done. +- Error scenarios: +- if backup fails the event is marked as failed and is ignored when trying to restore events. +- Rationale: + ---> Since you created an event about the start of the snapshot at the same moment you stop processing events for that aggregate. this event should point to the backup file. so it can be used when restoring the snapshot. + +The restore an snapshot is also very simple +aggregate.restore(snapshotName) + +- find backup files using snapshotName and locate snapshot event in the event store. +- ** at this stage the event store might be receiving new events -> write is enabled ** +- backup is restored. +- read is enabled :) +- events start processing from the snapshot moment +- ** system takes a while to catch up ** +- system is eventually consistent :) ## References -Diagram Source: https://app.creately.com/diagram/1hz4OEfdfxM/edit \ No newline at end of file +Diagram Source: https://app.creately.com/diagram/1hz4OEfdfxM/edit diff --git a/cqrs.go b/cqrs.go index 2b1ab87..cfb73f7 100644 --- a/cqrs.go +++ b/cqrs.go @@ -10,7 +10,10 @@ type ManyTransformer func(context moleculer.Context, params moleculer.Payload) [ type EventStorer interface { Mixin() moleculer.Mixin - PersistEvent(eventName string, extraParams ...map[string]interface{}) moleculer.ActionHandler + + MapEvent(eventName string, extraParams ...map[string]interface{}) moleculer.Event + + MapAction(actionName string, eventName string, extraParams ...map[string]interface{}) moleculer.Action //try to move these to a interface just around snapshoter // StartSnapshot(snapshotName string, aggregateMetadata map[string]interface{}) error diff --git a/cqrs_test.go b/cqrs_test.go index b4a27b9..740812a 100644 --- a/cqrs_test.go +++ b/cqrs_test.go @@ -65,10 +65,7 @@ var _ = Describe("CQRS Pluggin", func() { }, }, Actions: []moleculer.Action{ - { - Name: "create", - Handler: eventStore.PersistEvent("property.created"), - }, + eventStore.MapAction("create", "property.created"), }, } bkr := broker.New(&moleculer.Config{ @@ -192,10 +189,7 @@ var _ = Describe("CQRS Pluggin", func() { Name: "user", Mixins: []moleculer.Mixin{eventStore.Mixin()}, Actions: []moleculer.Action{ - { - Name: "create", - Handler: eventStore.PersistEvent("user.created", M{"tag": "valueX"}), - }, + eventStore.MapAction("create", "user.created", M{"tag": "valueX"}), }, } bkr := broker.New(&moleculer.Config{ @@ -258,10 +252,7 @@ var _ = Describe("CQRS Pluggin", func() { Name: "property", Mixins: []moleculer.Mixin{eventStore.Mixin()}, Actions: []moleculer.Action{ - { - Name: "create", - Handler: eventStore.PersistEvent("property.created"), - }, + eventStore.MapAction("create", "property.created"), }, } bkr := broker.New(&moleculer.Config{ diff --git a/eventStore.service.go b/eventStore.service.go index e121e15..948f018 100644 --- a/eventStore.service.go +++ b/eventStore.service.go @@ -232,37 +232,57 @@ func (e *eventStore) createEventStoreService() { } } -// PersistEvent receives eventName and extraParams and returns an action handler +func (e *eventStore) saveEvent(c moleculer.Context, p moleculer.Payload, eventName string, extraParams ...map[string]interface{}) interface{} { + event := M{ + "event": eventName, + "created": time.Now().Unix(), + "status": StatusCreated, + "eventType": TypeCommand, + } + //merge event with params + extra := e.parseExtraParams(extraParams) + if len(extra) > 0 { + for name, value := range extra { + event[name] = value + } + } + event["payload"] = e.serializer.PayloadToBytes(p) + + //save to the event store + r := <-c.Call(e.eventStoreService.Name+".create", event) + if r.IsError() { + c.Emit( + eventName+".failed", + payload.Empty().Add("error", r).Add("event", event), + ) + return r + } + return r +} + +// MapAction receives actionName, eventName and extraParams and returns an moleculer.Action with an action handler // that saves the payload as an event record inside the event store. // extraParams are label=value to be saved in the event record. // if it fails to save the event to the store it emits the event eventName.failed -func (e *eventStore) PersistEvent(eventName string, extraParams ...map[string]interface{}) moleculer.ActionHandler { - return func(c moleculer.Context, p moleculer.Payload) interface{} { - event := M{ - "event": eventName, - "created": time.Now().Unix(), - "status": StatusCreated, - "eventType": TypeCommand, - } - //merge event with params - extra := e.parseExtraParams(extraParams) - if len(extra) > 0 { - for name, value := range extra { - event[name] = value - } - } - event["payload"] = e.serializer.PayloadToBytes(p) - - //save to the event store - r := <-c.Call(e.eventStoreService.Name+".create", event) - if r.IsError() { - c.Emit( - eventName+".failed", - payload.Empty().Add("error", r).Add("event", event), - ) - return r - } - return r +func (e *eventStore) MapAction(actionName, eventName string, extraParams ...map[string]interface{}) moleculer.Action { + return moleculer.Action{ + Name: actionName, + Handler: func(c moleculer.Context, p moleculer.Payload) interface{} { + return e.saveEvent(c, p, eventName, extraParams...) + }, + } +} + +// MapEvent receives eventName and extraParams and returns an moleculer.Action with an action handler +// that saves the payload as an event record inside the event store. +// extraParams are label=value to be saved in the event record. +// if it fails to save the event to the store it emits the event eventName.failed +func (e *eventStore) MapEvent(eventName string, extraParams ...map[string]interface{}) moleculer.Event { + return moleculer.Event{ + Name: eventName, + Handler: func(c moleculer.Context, p moleculer.Payload) { + e.saveEvent(c, p, eventName, extraParams...) + }, } } diff --git a/examples/property/property.coverprofile b/examples/property/property.coverprofile index 1b6f879..7b8aa39 100644 --- a/examples/property/property.coverprofile +++ b/examples/property/property.coverprofile @@ -1,17 +1,17 @@ mode: atomic github.com/moleculer-go/cqrs/examples/property/property.service.go:13.71,14.86 1 3 github.com/moleculer-go/cqrs/examples/property/property.service.go:14.86,21.3 2 3 -github.com/moleculer-go/cqrs/examples/property/property.service.go:92.88,94.2 1 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:98.94,104.23 4 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:115.2,116.27 2 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:119.2,119.26 1 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:122.2,123.15 2 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:104.23,107.3 2 0 -github.com/moleculer-go/cqrs/examples/property/property.service.go:107.8,114.3 1 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:116.27,118.3 1 0 -github.com/moleculer-go/cqrs/examples/property/property.service.go:119.26,121.3 1 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:126.51,128.2 1 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:130.50,132.2 1 1 -github.com/moleculer-go/cqrs/examples/property/property.service.go:135.71,136.66 1 0 -github.com/moleculer-go/cqrs/examples/property/property.service.go:136.66,137.44 1 0 -github.com/moleculer-go/cqrs/examples/property/property.service.go:137.44,139.4 1 0 +github.com/moleculer-go/cqrs/examples/property/property.service.go:89.88,91.2 1 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:95.94,101.23 4 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:112.2,113.27 2 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:116.2,116.26 1 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:119.2,120.15 2 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:101.23,104.3 2 0 +github.com/moleculer-go/cqrs/examples/property/property.service.go:104.8,111.3 1 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:113.27,115.3 1 0 +github.com/moleculer-go/cqrs/examples/property/property.service.go:116.26,118.3 1 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:123.51,125.2 1 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:127.50,129.2 1 1 +github.com/moleculer-go/cqrs/examples/property/property.service.go:132.71,133.66 1 0 +github.com/moleculer-go/cqrs/examples/property/property.service.go:133.66,134.44 1 0 +github.com/moleculer-go/cqrs/examples/property/property.service.go:134.44,136.4 1 0 diff --git a/examples/property/property.service.go b/examples/property/property.service.go index 4e6ee34..e831cdb 100644 --- a/examples/property/property.service.go +++ b/examples/property/property.service.go @@ -68,10 +68,7 @@ var Service = moleculer.ServiceSchema{ Name: "property", Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()}, Actions: []moleculer.Action{ - { - Name: "create", - Handler: events.PersistEvent("property.created"), - }, + events.MapAction("create", "property.created"), { Name: "transformProperty", Handler: transformProperty,