Skip to content

Commit

Permalink
feat (MapAction, MapEvent) replace PersistEvent on event store
Browse files Browse the repository at this point in the history
  • Loading branch information
pentateu committed Jun 1, 2020
1 parent ad5049e commit d7e3b8e
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 133 deletions.
150 changes: 76 additions & 74 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Make you life easy when doing CQRS back-ends :)
If you need some info on what CQRS is please [click here :)](https://blog.knoldus.com/cqrs-and-event-sourcing/)

## Imports

At minimum you will need ...

```go
import (
"github.com/moleculer-go/cqrs"
Expand All @@ -17,22 +19,21 @@ import (

## Event Store


![Event Store](docs/CQRS-persist-event.jpg "Event Store")
1) The first piece is the event store which is responsible to store the events (aka commands) and also propagate events to be processed and stored in aggregates.

1.1 - The event store uses an adaptor to store the data allowing you to choose between multiple data stores. In this example we use SQLite. Once the event is saved for later processing the event store returns a success response. This operation goal is to be fast and reliable.
1. The first piece is the event store which is responsible to store the events (aka commands) and also propagate events to be processed and stored in aggregates.

1.2 - The event store will dispatch each event and any service that subscribes to the event "profile.updated" will be able to process is it.
1.1 - The event store uses an adaptor to store the data allowing you to choose between multiple data stores. In this example we use SQLite. Once the event is saved for later processing the event store returns a success response. This operation goal is to be fast and reliable.

2) The second piece is the service that will expose the action to be triggered by an api call.
1.2 - The event store will dispatch each event and any service that subscribes to the event "profile.updated" will be able to process is it.

The action is responsible to perform validation and modification to the payload before saving the event. The action should not call any expensive service/action/lib at this stage since the objective is to save the event (aka write the command) as soon as possible. Complicated or expensive rules should be done later when the event is dispatched.
2. The second piece is the service that will expose the action to be triggered by an api call.

In this case the "profile.update" action maps directly to the event store using ```events.PersistEvent("profile.updated")```.
The action is responsible to perform validation and modification to the payload before saving the event. The action should not call any expensive service/action/lib at this stage since the objective is to save the event (aka write the command) as soon as possible. Complicated or expensive rules should be done later when the event is dispatched.

3) The third piece is the Aggregate which is where you will store your data. More details on that later.
In this case the "profile.update" action maps directly to the event store using ```events.MapAction("update", "profile.updated")```.

3. The third piece is the Aggregate which is where you will store your data. More details on that later.

```go
//1. event store
Expand All @@ -43,23 +44,22 @@ var Service = moleculer.ServiceSchema{
Name: "profile",
Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()},
Actions: []moleculer.Action{
{
Name: "update",
Handler: events.PersistEvent("profile.updated"),
},
events.MapAction("update", "profile.updated"),
//** Incomplete profile service **//
}
}
```

## Aggregates

Aggregates are where you store your data in the way you want to consume it. Basically you will query your aggregates.

If you have a screen that need to display the user profile you need an aggregate for that.

If you want some statistics (total, totalMarried, totalStudents) by country you need an aggregate for that.

![Aggregates](docs/CQRS-aggregates.jpg "Aggregates")

```go
//3. create an aggregate (simple table to store computed values)
var summaryAggregate = cqrs.Aggregate(
Expand All @@ -79,10 +79,7 @@ var Service = moleculer.ServiceSchema{
Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()},
Actions: []moleculer.Action{
//2. profile.update action just persists the payload in the store
{
Name: "update",
Handler: events.PersistEvent("profile.updated"),
},
events.MapAction("update", "profile.updated"),
{
Name: "transformProfileSummary",
Handler: transformProfileSummary,
Expand All @@ -97,10 +94,11 @@ var Service = moleculer.ServiceSchema{
## Transformations
Between your event and your aggregates you need to transform the data.
In a traditional app where in the controller or business object you have assemble/transform all the data from the request to the format acceptable in your database.
In a traditional app where in the controller or business object you have assemble/transform all the data from the request to the format acceptable in your database.
This exactly the same thing you do here, but you start with the event and end with a aggregate record that will be wither: be created, update or removed.
The example bellow will create or update a summary record.
```go
//2.1 transforms the contents of the event "profile.created"
// into a profile summary update.
Expand Down Expand Up @@ -148,7 +146,6 @@ profile := <-bkr.Call("profile.update", M{

```
### Store Adaptor
```go
Expand All @@ -160,7 +157,7 @@ func storeFactory(fields ...map[string]interface{}) cqrs.StoreFactory {
fields = append(fields, cqrsFields)
//creates an store adapter, in this case SQLite
//the store adapter is use to store data of the eventStore, so here is where
//you define the back end of your eventStore.
//you define the back end of your eventStore.
// You could use mongo.MongoAdapter to have your eventStore backed by mongo DB.
return &sqlite.Adapter{
URI: "file:memory:?mode=memory",
Expand All @@ -173,25 +170,23 @@ func storeFactory(fields ...map[string]interface{}) cqrs.StoreFactory {
```
### Storing Events
```go
// service property has an action create that will store the event "property.created" with the payload
// sent to the action.
var Service = moleculer.ServiceSchema{
Name: "property",
Mixins: []moleculer.Mixin{events.Mixin(), propertiesAggregate.Mixin(), summaryAggregate.Mixin()},
Actions: []moleculer.Action{
{
Name: "create",

// events.PersistEvent() creates a moleculer Action handle that will respond to the action.
// when the action property.create is invoked the eventStore will save the event an respond back with an standard answer.
Handler: events.PersistEvent("property.created"),
},
// events.MapAction() creates a moleculer.Action with a handler that will respond to the action.
// when the action property.create is invoked the eventStore will save the event an respond back with an standard answer.
events.MapAction("create", "property.created"),
//...

```
### Pumping and handling events
```go
//same service as above, now is to demonstrate how you handle events dispatched by the event pump.
var Service = moleculer.ServiceSchema{
Expand Down Expand Up @@ -221,65 +216,72 @@ var Service = moleculer.ServiceSchema{
Aggregates are database tables, elastic search indexes and etc. Simple as that. They store the "calculated values" or the "current state of the system" :)
Basically there are never queries on events. Events are dispatched and any moleculer services listen to the them.
Aggregates have an api to map event -> transformation -> aggregate action
```go
propertiesAggregate.On("property.created").Create("property.transformProperty"),
```
The code above is listening to event "property.created", it will use the transformation action "property.transformProperty" and it will invoke the aggregate action create.
```go
summaryAggregate.On("property.created").Update("property.transformCountrySummary"),
```
The code above is listening to event "property.created", it will use the transformation action "property.transformCountrySummary" and it will invoke the aggregate action Update.
### Define your Aggregates
Aggregate are tables of data and depending on the data store they need an schema.
In this case with SQLite we need to specify the fields in our aggregate.
```go
var summaryAggregate = cqrs.Aggregate(
"profileSummaryAggregate",
storeFactory(map[string]interface{}{
"countryCode": "string",
"total": "integer",
"beachCity": "integer",
"mountain": "integer",
}),
cqrs.NoSnapshot,
```
The code above is listening to event "property.created", it will use the transformation action "property.transformProperty" and it will invoke the aggregate action create.
```go
summaryAggregate.On("property.created").Update("property.transformCountrySummary"),
```
The code above is listening to event "property.created", it will use the transformation action "property.transformCountrySummary" and it will invoke the aggregate action Update.
### Define your Aggregates
Aggregate are tables of data and depending on the data store they need an schema.
In this case with SQLite we need to specify the fields in our aggregate.
```go
var summaryAggregate = cqrs.Aggregate(
"profileSummaryAggregate",
storeFactory(map[string]interface{}{
"countryCode": "string",
"total": "integer",
"beachCity": "integer",
"mountain": "integer",
}),
cqrs.NoSnapshot,
).Snapshot("propertyEventStore")
```
An aggregate contain all the actions available to any store (https://github.com/moleculer-go/store)
So you can just do:
```go
summaries := <-bkr.Call("profileSummaryAggregate.find", M{})
```
```
An aggregate contain all the actions available to any store (https://github.com/moleculer-go/store)
So you can just do:
```go
summaries := <-bkr.Call("profileSummaryAggregate.find", M{})
```
## Snapshots
Snapshots are a work in progress. basic feature is implemented and now under stress test.
snapshotName := aggregate.snapshot():
- aggregate stops listening to events -> pause aggregate changes :)
- create a snapshot event -> new events will continue to be recorded in the events store after this point! = Write is enabled.
- ** no changes are happening on aggregates ** but reads continue happily.
- backup aggregates -> aggregate.backup(snapshotName) (SQLLite -> basicaly copy files :) )
- process all events since the snapshot and start listening to events again.
- done.
- Error scenarios:
- if backup fails the event is marked as failed and is ignored when trying to restore events.
- Rationale:
---> Since you created an event about the start of the snapshot at the same moment you stop processing events for that aggregate. this event should point to the backup file. so it can be used when restoring the snapshot.
The restore an snapshot is also very simple
aggregate.restore(snapshotName)
- find backup files using snapshotName and locate snapshot event in the event store.
- ** at this stage the event store might be receiving new events -> write is enabled **
- backup is restored.
- read is enabled :)
- events start processing from the snapshot moment
- ** system takes a while to catch up **
- system is eventually consistent :)
- aggregate stops listening to events -> pause aggregate changes :)
- create a snapshot event -> new events will continue to be recorded in the events store after this point! = Write is enabled.
- ** no changes are happening on aggregates ** but reads continue happily.
- backup aggregates -> aggregate.backup(snapshotName) (SQLLite -> basicaly copy files :) )
- process all events since the snapshot and start listening to events again.
- done.
- Error scenarios:
- if backup fails the event is marked as failed and is ignored when trying to restore events.
- Rationale:
---> Since you created an event about the start of the snapshot at the same moment you stop processing events for that aggregate. this event should point to the backup file. so it can be used when restoring the snapshot.
The restore an snapshot is also very simple
aggregate.restore(snapshotName)
- find backup files using snapshotName and locate snapshot event in the event store.
- ** at this stage the event store might be receiving new events -> write is enabled **
- backup is restored.
- read is enabled :)
- events start processing from the snapshot moment
- ** system takes a while to catch up **
- system is eventually consistent :)
## References
Diagram Source: https://app.creately.com/diagram/1hz4OEfdfxM/edit
Diagram Source: https://app.creately.com/diagram/1hz4OEfdfxM/edit
5 changes: 4 additions & 1 deletion cqrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ type ManyTransformer func(context moleculer.Context, params moleculer.Payload) [

type EventStorer interface {
Mixin() moleculer.Mixin
PersistEvent(eventName string, extraParams ...map[string]interface{}) moleculer.ActionHandler

MapEvent(eventName string, extraParams ...map[string]interface{}) moleculer.Event

MapAction(actionName string, eventName string, extraParams ...map[string]interface{}) moleculer.Action

//try to move these to a interface just around snapshoter
// StartSnapshot(snapshotName string, aggregateMetadata map[string]interface{}) error
Expand Down
15 changes: 3 additions & 12 deletions cqrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ var _ = Describe("CQRS Pluggin", func() {
},
},
Actions: []moleculer.Action{
{
Name: "create",
Handler: eventStore.PersistEvent("property.created"),
},
eventStore.MapAction("create", "property.created"),
},
}
bkr := broker.New(&moleculer.Config{
Expand Down Expand Up @@ -192,10 +189,7 @@ var _ = Describe("CQRS Pluggin", func() {
Name: "user",
Mixins: []moleculer.Mixin{eventStore.Mixin()},
Actions: []moleculer.Action{
{
Name: "create",
Handler: eventStore.PersistEvent("user.created", M{"tag": "valueX"}),
},
eventStore.MapAction("create", "user.created", M{"tag": "valueX"}),
},
}
bkr := broker.New(&moleculer.Config{
Expand Down Expand Up @@ -258,10 +252,7 @@ var _ = Describe("CQRS Pluggin", func() {
Name: "property",
Mixins: []moleculer.Mixin{eventStore.Mixin()},
Actions: []moleculer.Action{
{
Name: "create",
Handler: eventStore.PersistEvent("property.created"),
},
eventStore.MapAction("create", "property.created"),
},
}
bkr := broker.New(&moleculer.Config{
Expand Down
76 changes: 48 additions & 28 deletions eventStore.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,37 +232,57 @@ func (e *eventStore) createEventStoreService() {
}
}

// PersistEvent receives eventName and extraParams and returns an action handler
func (e *eventStore) saveEvent(c moleculer.Context, p moleculer.Payload, eventName string, extraParams ...map[string]interface{}) interface{} {
event := M{
"event": eventName,
"created": time.Now().Unix(),
"status": StatusCreated,
"eventType": TypeCommand,
}
//merge event with params
extra := e.parseExtraParams(extraParams)
if len(extra) > 0 {
for name, value := range extra {
event[name] = value
}
}
event["payload"] = e.serializer.PayloadToBytes(p)

//save to the event store
r := <-c.Call(e.eventStoreService.Name+".create", event)
if r.IsError() {
c.Emit(
eventName+".failed",
payload.Empty().Add("error", r).Add("event", event),
)
return r
}
return r
}

// MapAction receives actionName, eventName and extraParams and returns an moleculer.Action with an action handler
// that saves the payload as an event record inside the event store.
// extraParams are label=value to be saved in the event record.
// if it fails to save the event to the store it emits the event eventName.failed
func (e *eventStore) PersistEvent(eventName string, extraParams ...map[string]interface{}) moleculer.ActionHandler {
return func(c moleculer.Context, p moleculer.Payload) interface{} {
event := M{
"event": eventName,
"created": time.Now().Unix(),
"status": StatusCreated,
"eventType": TypeCommand,
}
//merge event with params
extra := e.parseExtraParams(extraParams)
if len(extra) > 0 {
for name, value := range extra {
event[name] = value
}
}
event["payload"] = e.serializer.PayloadToBytes(p)

//save to the event store
r := <-c.Call(e.eventStoreService.Name+".create", event)
if r.IsError() {
c.Emit(
eventName+".failed",
payload.Empty().Add("error", r).Add("event", event),
)
return r
}
return r
func (e *eventStore) MapAction(actionName, eventName string, extraParams ...map[string]interface{}) moleculer.Action {
return moleculer.Action{
Name: actionName,
Handler: func(c moleculer.Context, p moleculer.Payload) interface{} {
return e.saveEvent(c, p, eventName, extraParams...)
},
}
}

// MapEvent receives eventName and extraParams and returns an moleculer.Action with an action handler
// that saves the payload as an event record inside the event store.
// extraParams are label=value to be saved in the event record.
// if it fails to save the event to the store it emits the event eventName.failed
func (e *eventStore) MapEvent(eventName string, extraParams ...map[string]interface{}) moleculer.Event {
return moleculer.Event{
Name: eventName,
Handler: func(c moleculer.Context, p moleculer.Payload) {
e.saveEvent(c, p, eventName, extraParams...)
},
}
}

Expand Down
Loading

0 comments on commit d7e3b8e

Please sign in to comment.