Skip to content

Commit

Permalink
CI enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed Apr 25, 2020
1 parent 026ac40 commit 76210d7
Show file tree
Hide file tree
Showing 36 changed files with 4,933 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.vscode
.DS_Store
*.cer
*.pem
/bin
/log*
22 changes: 22 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
language: go

go:
- 1.13.x
- 1.14.x

services:
- docker

before_install:
- |
docker run --name kafka --rm -d -p 2181:2181 -p 9092:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
obsidiandynamics/kafka
- go get -u -v all

script:
- make
- make int

after_success:
- bash <(curl -s https://codecov.io/bash)
32 changes: 32 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
default: build test

all: test lint

build: dirs
go build -race -o bin ./...

test: dirs
go test ./... -race -count=1 -coverprofile=bin/coverage.out

soaktest: dirs
SOAK_CMD="make test" sh/soak.sh

int: FORCE
GOLABELS=int go test -timeout 180s -v -race -count=1 ./int

soakint: FORCE
SOAK_CMD="make int" sh/soak.sh

dirs:
mkdir -p bin

lint:
golint ./...

clean:
rm -rf bin

list: FORCE
@$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$'

FORCE:
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
<img src="https://image.flaticon.com/icons/svg/2427/2427017.svg" width="90px" alt="logo"/> `goharvest`
===
![Go version](https://img.shields.io/github/go-mod/go-version/obsidiandynamics/goharvest)
[![Build](https://travis-ci.org/obsidiandynamics/goharvest.svg?branch=master) ](https://travis-ci.org/obsidiandynamics/goharvest#)
![Release](https://img.shields.io/github/v/release/obsidiandynamics/goharvest?color=ff69b4)
[![Codecov](https://codecov.io/gh/obsidiandynamics/goharvest/branch/master/graph/badge.svg)](https://codecov.io/gh/obsidiandynamics/goharvest)
[![Go Report Card](https://goreportcard.com/badge/github.com/obsidiandynamics/goharvest)](https://goreportcard.com/report/github.com/obsidiandynamics/goharvest)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/obsidiandynamics/goharvest.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/obsidiandynamics/goharvest/alerts/)
[![GoDoc Reference](https://img.shields.io/badge/docs-GoDoc-blue.svg)](https://pkg.go.dev/github.com/obsidiandynamics/goharvest?tab=doc)

Implementation of the [Transactional outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern for Postgres and Kafka.

Expand Down
88 changes: 88 additions & 0 deletions battery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package goharvest

import (
"hash/fnv"
)

type cell struct {
records chan OutboxRecord
done chan int
}

func (c cell) stop() {
close(c.records)
}

func (c cell) await() {
<-c.done
}

func (c cell) enqueue(rec OutboxRecord) bool {
select {
case <-c.done:
return false
case c.records <- rec:
return true
}
}

type cellHandler func(records chan OutboxRecord)

func newCell(buffer int, handler cellHandler) cell {
c := cell{
records: make(chan OutboxRecord),
done: make(chan int),
}
go func() {
defer close(c.done)
handler(c.records)
}()
return c
}

type battery interface {
stop()
await()
shutdown()
enqueue(rec OutboxRecord) bool
}

type concurrentBattery []cell

func (b *concurrentBattery) stop() {
for _, c := range *b {
c.stop()
}
}

func (b *concurrentBattery) await() {
for _, c := range *b {
c.await()
}
}

func (b *concurrentBattery) shutdown() {
b.stop()
b.await()
}

func (b *concurrentBattery) enqueue(rec OutboxRecord) bool {
if length := len(*b); length > 1 {
return (*b)[hash(rec.KafkaKey)%uint32(length)].enqueue(rec)
}
return (*b)[0].enqueue(rec)
}

func newConcurrentBattery(concurrency int, buffer int, handler cellHandler) *concurrentBattery {
b := make(concurrentBattery, concurrency)
for i := 0; i < concurrency; i++ {
b[i] = newCell(buffer, handler)
}
return &b
}

func hash(str string) uint32 {
algorithm := fnv.New32a()
algorithm.Write([]byte(str))
return algorithm.Sum32()
}
43 changes: 43 additions & 0 deletions battery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package goharvest

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestEnqueue_concurrencyOf1(t *testing.T) {
enqueued := make(chan OutboxRecord)
b := newConcurrentBattery(1, 0, func(records chan OutboxRecord) {
for rec := range records {
enqueued <- rec
}
})
defer b.shutdown()

rec := OutboxRecord{}
assert.True(t, b.enqueue(rec))
assert.Equal(t, rec, <-enqueued)
}

func TestEnqueue_concurrencyOf2(t *testing.T) {
enqueued := make(chan OutboxRecord)
b := newConcurrentBattery(2, 0, func(records chan OutboxRecord) {
for rec := range records {
enqueued <- rec
}
})
defer b.shutdown()

rec := OutboxRecord{}
assert.True(t, b.enqueue(rec))
assert.Equal(t, rec, <-enqueued)
}

func TestEnqueue_afterDone(t *testing.T) {
b := newConcurrentBattery(2, 0, func(records chan OutboxRecord) {})
b.await()

assert.False(t, b.enqueue(OutboxRecord{}))
b.stop()
}
65 changes: 65 additions & 0 deletions cmd/goharvest_example/example_main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"database/sql"

"github.com/obsidiandynamics/goharvest"
"github.com/obsidiandynamics/libstdgo/scribe"
scribelogrus "github.com/obsidiandynamics/libstdgo/scribe/logrus"
"github.com/sirupsen/logrus"
)

func main() {
const dataSource = "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable"

// Optional: Ensure the database table exists before we start harvesting.
func() {
db, err := sql.Open("postgres", dataSource)
if err != nil {
panic(err)
}
defer db.Close()

_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS outbox (
id BIGSERIAL PRIMARY KEY,
create_time TIMESTAMP WITH TIME ZONE NOT NULL,
kafka_topic VARCHAR(249) NOT NULL,
kafka_key VARCHAR(100) NOT NULL, -- pick your own key size
kafka_value VARCHAR(10000), -- pick your own value size
kafka_header_keys TEXT[] NOT NULL,
kafka_header_values TEXT[] NOT NULL,
leader_id UUID
)
`)
if err != nil {
panic(err)
}
}()

// Configure the harvester. It will use its own database connections under the hood.
log := logrus.StandardLogger()
log.SetLevel(logrus.DebugLevel)
config := goharvest.Config{
BaseKafkaConfig: goharvest.KafkaConfigMap{
"bootstrap.servers": "localhost:9092",
},
DataSource: dataSource,
Scribe: scribe.New(scribelogrus.Bind()),
}

// Create a new harvester.
harvest, err := goharvest.New(config)
if err != nil {
panic(err)
}

// Start it.
err = harvest.Start()
if err != nil {
panic(err)
}

// Wait indefinitely for it to end.
log.Fatal(harvest.Await())
}
105 changes: 105 additions & 0 deletions cmd/pump/pump_main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"database/sql"
"flag"
"fmt"
"log"
"math/rand"
"strconv"
"time"

"github.com/obsidiandynamics/goharvest"
"github.com/obsidiandynamics/goharvest/metric"
"github.com/obsidiandynamics/goharvest/stasher"
)

const recordsPerTxn = 20

func main() {
var keys, records, interval int
var dataSource, outboxTable, kafkaTopic string
var blank bool
flag.IntVar(&keys, "keys", -1, "Number of unique keys")
flag.IntVar(&records, "records", -1, "Number of records to generate")
flag.IntVar(&interval, "interval", 0, "Write interval (in milliseconds")
flag.StringVar(&dataSource, "ds", "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable", "Data source")
flag.StringVar(&outboxTable, "outbox", "outbox", "Outbox table name")
flag.StringVar(&kafkaTopic, "topic", "pump", "Kafka output topic name")
flag.BoolVar(&blank, "blank", false, "Generate blank records (nil value)")
flag.Parse()

errorFunc := func(field string) {
flag.PrintDefaults()
panic(fmt.Errorf("required '-%s' has not been set", field))
}
if keys == -1 {
errorFunc("keys")
}
if records == -1 {
errorFunc("records")
}

fmt.Printf("Starting stasher; keys: %d, records: %d, interval: %d ms\n", keys, records, interval)
fmt.Printf(" Data source: %s\n", dataSource)
fmt.Printf(" Outbox table name: %s\n", outboxTable)

db, err := sql.Open("postgres", dataSource)
if err != nil {
panic(err)
}
defer db.Close()

st := stasher.NewStasher(outboxTable)

meter := metric.NewMeter("pump", 5*time.Second)

var tx *sql.Tx
var pre stasher.PreStash
for i := 0; i < records; i++ {
if i%recordsPerTxn == 0 {
finaliseTx(tx)

tx, err = db.Begin()
if err != nil {
panic(err)
}
pre, err = st.Prepare(tx)
if err != nil {
panic(err)
}
}

rand := rand.Uint64()
var value *string
if !blank {
value = goharvest.StringPtr(fmt.Sprintf("value-%x", rand))
}

rec := goharvest.OutboxRecord{
KafkaTopic: kafkaTopic,
KafkaKey: fmt.Sprintf("key-%x", rand%uint64(keys)),
KafkaValue: value,
KafkaHeaders: goharvest.KafkaHeaders{
goharvest.KafkaHeader{Key: "Seq", Value: strconv.Itoa(i)},
},
}
err := pre.Stash(rec)
if err != nil {
panic(err)
}
time.Sleep(time.Duration(interval * int(time.Millisecond)))
meter.Add(1)
meter.MaybeStatsLog(log.Printf)
}
finaliseTx(tx)
}

func finaliseTx(tx *sql.Tx) {
if tx != nil {
err := tx.Commit()
if err != nil {
panic(err)
}
}
}
Loading

0 comments on commit 76210d7

Please sign in to comment.