Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to go-based SQLite driver (cvd) #129

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
)

func BenchmarkLDBQueryBaseline(b *testing.B) {
b.StopTimer()

ctx := context.TODO()

tmpDir, err := ioutil.TempDir("", "")
Expand Down Expand Up @@ -51,8 +49,8 @@ func BenchmarkLDBQueryBaseline(b *testing.B) {
b.Fatalf("Unexpected error preparing query: %v", err)
}

b.StartTimer()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
rows, err := prepQ.QueryContext(ctx, "foo")
if err != nil {
Expand Down Expand Up @@ -123,6 +121,8 @@ func BenchmarkGetRowByKey(b *testing.B) {
r: r,
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
var row benchKVRow
found, err := benchSetup.r.GetRowByKey(benchSetup.ctx, &row, "foo", "bar", "foo")
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ services:
MYSQL_DATABASE: ctldb
MYSQL_USER: ctldb
MYSQL_PASSWORD: ctldbpw
mem_limit: 536870912
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/segmentio/cli v0.5.1
github.com/segmentio/conf v1.1.0
github.com/segmentio/errors-go v1.0.0
github.com/segmentio/events/v2 v2.3.2
github.com/segmentio/go-sqlite3 v1.12.0
github.com/segmentio/stats/v4 v4.6.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ github.com/segmentio/cli v0.5.1 h1:Xhtnmp0LrF+JHQTTV4Q58S79gG8JKXO4MMniyqc+XZs=
github.com/segmentio/cli v0.5.1/go.mod h1:qz2M+DqXgYnjKLTrcI80MoGQsI6xT0wXCozfBAtF/iI=
github.com/segmentio/conf v1.1.0 h1:3d8AaXnQNLCze/UpZ31pwDpDj+tmb2FIwroOtqCYNBY=
github.com/segmentio/conf v1.1.0/go.mod h1:Y3B9O/PqqWqjyxyWWseyj/quPEtMu1zDp/kVbSWWaB0=
github.com/segmentio/errors-go v1.0.0 h1:B4mbo4hP3+XffV1GhwyAcHlvWoZtYdTyc3BOVPxspTQ=
github.com/segmentio/errors-go v1.0.0/go.mod h1:RDVEREUrpa4/jM8rt5KsQpu+JoXPi6i07vG7m4tX0MY=
github.com/segmentio/events/v2 v2.3.2 h1:J73yVqYtnLWZD3Oqef82fYPZhfpRfQGiOvBes+OohoY=
github.com/segmentio/events/v2 v2.3.2/go.mod h1:9HY7dFOCKoPQx3hUBXYim6I4hqaZWtSGWJ4IYAMxtkM=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszjmIzcY/tvdDYKRLVvzggtAmmJkn9j4GQ=
Expand Down
26 changes: 13 additions & 13 deletions ldb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ctlstore
import (
"context"
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -11,7 +12,6 @@ import (
"sync"
"time"

"github.com/segmentio/errors-go"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"

Expand Down Expand Up @@ -64,14 +64,14 @@ func newVersionedLDBReader(dirPath string) (*LDBReader, error) {
// To initialize this reader, we must first load an LDB:
last, err := lookupLastLDBSync(dirPath)
if err != nil {
return nil, errors.Wrap(err, "checking last ldb sync")
return nil, fmt.Errorf("checking last ldb sync: %w", err)
}
if last == 0 {
return nil, fmt.Errorf("no LDB in path (%s)", dirPath)
}
err = reader.switchLDB(dirPath, last)
if err != nil {
return nil, errors.Wrap(err, "switching ldbs")
return nil, fmt.Errorf("switching ldbs: %w", err)
}

// Then we can defer to the watcher goroutine to swap this
Expand Down Expand Up @@ -133,7 +133,7 @@ func (reader *LDBReader) GetLedgerLatency(ctx context.Context) (time.Duration, e
case err == sql.ErrNoRows:
return 0, ErrNoLedgerUpdates
case err != nil:
return 0, errors.Wrap(err, "get ledger latency")
return 0, fmt.Errorf("get ledger latency: %w", err)
default:
return time.Now().Sub(timestamp), nil
}
Expand Down Expand Up @@ -281,7 +281,7 @@ func (reader *LDBReader) GetRowByKey(
if err != nil {
// See NOTE above about why this cache is getting cleared
reader.invalidatePKCache(ldbTable) // assumes RLock is held
err = errors.Wrap(err, "query target row error")
err = fmt.Errorf("query target row error: %w", err)
return
}
defer rows.Close()
Expand All @@ -306,7 +306,7 @@ func (reader *LDBReader) GetRowByKey(
err = scanFunc(rows)

if err != nil {
err = errors.Wrap(err, "target row scan error")
err = fmt.Errorf("target row scan error: %w", err)
} else {
err = rows.Err()
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (reader *LDBReader) getPrimaryKey(ctx context.Context, ldbTable string) (sc
const qs = "SELECT name,type FROM pragma_table_info(?) WHERE pk > 0 ORDER BY pk ASC"
rows, err := reader.Db.QueryContext(ctx, qs, ldbTable)
if err != nil {
return schema.PrimaryKeyZero, errors.Wrap(err, "query pragma_table_info error")
return schema.PrimaryKeyZero, fmt.Errorf("query pragma_table_info error: %w", err)
}
defer rows.Close()

Expand All @@ -441,14 +441,14 @@ func (reader *LDBReader) getPrimaryKey(ctx context.Context, ldbTable string) (sc
var ftString string
err = rows.Scan(&name, &ftString)
if err != nil {
return schema.PrimaryKeyZero, errors.WithStack(err)
return schema.PrimaryKeyZero, fmt.Errorf("scan: %w", err)
}
rawFieldNames = append(rawFieldNames, name)
rawFieldTypes = append(rawFieldTypes, ftString)
}
err = rows.Err()
if err != nil {
return schema.PrimaryKeyZero, errors.WithStack(err)
return schema.PrimaryKeyZero, fmt.Errorf("rows err: %w", err)
}

pk, err := schema.NewPKFromRawNamesAndTypes(rawFieldNames, rawFieldTypes)
Expand Down Expand Up @@ -622,14 +622,14 @@ func (reader *LDBReader) switchLDB(dirPath string, timestamp int64) error {

db, err := newLDB(fullPath)
if err != nil {
return errors.Wrap(err, "new ldb")
return fmt.Errorf("new ldb: %w", err)
}

reader.mu.Lock()
defer reader.mu.Unlock()

if err = reader.closeDB(); err != nil {
return errors.Wrap(err, "closing db")
return fmt.Errorf("closing db: %w", err)
}

reader.Db = db
Expand Down Expand Up @@ -668,7 +668,7 @@ func lookupLastLDBSync(dirPath string) (int64, error) {
// dirPath + ["<timestamp>", ldb.DefaultLDBFilename]
localPath, err := filepath.Rel(dirPath, filePath)
if err != nil {
return errors.Wrapf(err, "base path (%s)", filePath)
return fmt.Errorf("base path (%s): %w", filePath, err)
}
fields := strings.Split(localPath, "/")

Expand All @@ -691,7 +691,7 @@ func lookupLastLDBSync(dirPath string) (int64, error) {
return nil
})
if err != nil {
return 0, errors.Wrap(err, "filepath walk")
return 0, fmt.Errorf("filepath walk: %w", err)
}

return lastSync, nil
Expand Down
3 changes: 2 additions & 1 deletion ldb_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ctlstore
import (
"context"
"database/sql"
"errors"
"fmt"
"io/ioutil"
"os"
Expand All @@ -11,7 +12,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"

"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/segmentio/ctlstore/pkg/ldbwriter"
"github.com/segmentio/ctlstore/pkg/schema"
Expand Down
4 changes: 2 additions & 2 deletions pkg/changelog/changelog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package changelog

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
"github.com/segmentio/events/v2"
)

Expand Down Expand Up @@ -42,7 +42,7 @@ func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error {

bytes, err := json.Marshal(structure)
if err != nil {
return errors.Wrap(err, "error marshalling json")
return fmt.Errorf("error marshalling json: %w", err)
}

events.Debug("changelogWriter.WriteChange: %{family}s.%{table}s => %{key}v",
Expand Down
5 changes: 2 additions & 3 deletions pkg/cmd/ctlstore-cli/cmd/read_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"text/tabwriter"
"time"

"github.com/pkg/errors"
"github.com/segmentio/cli"
"github.com/segmentio/ctlstore"
)
Expand Down Expand Up @@ -48,7 +47,7 @@ var cliReadKeys = &cli.CommandFunc{
}
reader, err := ctlstore.ReaderForPath(ldbPath)
if err != nil {
return errors.Wrap(err, "ldb reader for path")
return fmt.Errorf("ldb reader for path: %w", err)
}
defer reader.Close()
resMap := make(map[string]interface{})
Expand Down Expand Up @@ -108,7 +107,7 @@ func parseKey(key string) (interface{}, error) {
}
hex, err := hex.DecodeString(parts[1])
if err != nil {
return nil, errors.Errorf("could not parse '%s' as hex", parts[1])
return nil, fmt.Errorf("could not parse '%s' as hex", parts[1])
}
return hex, nil
}
4 changes: 3 additions & 1 deletion pkg/cmd/ctlstore-cli/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import "github.com/segmentio/ctlstore/pkg/cmd/ctlstore-cli/cmd"
import (
"github.com/segmentio/ctlstore/pkg/cmd/ctlstore-cli/cmd"
)

func main() {
cmd.Execute()
Expand Down
13 changes: 6 additions & 7 deletions pkg/cmd/ctlstore-mutator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/segmentio/conf"
"github.com/segmentio/ctlstore/pkg/utils"
"github.com/segmentio/errors-go"
)

type config struct {
Expand Down Expand Up @@ -99,15 +98,15 @@ func main() {
}
b, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "marshaling payload")
return fmt.Errorf("marshaling payload: %w", err)
}
req, err := http.NewRequest("POST", executiveURL+"/families/"+cfg.FamilyName+"/mutations", bytes.NewReader(b))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ctlstore-writer", cfg.WriterName)
req.Header.Set("ctlstore-secret", cfg.WriterSecret)
resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "making mutation request")
return fmt.Errorf("making mutation request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -145,12 +144,12 @@ func setup(cfg config, url string) error {

req, err := http.NewRequest("POST", url+"/families/"+cfg.FamilyName, nil)
if err != nil {
return errors.Wrap(err, "create family request")
return fmt.Errorf("create family request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
res, err = client.Do(req)
if err != nil {
return errors.Wrap(err, "making faily request")
return fmt.Errorf("making faily request: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict {
Expand All @@ -175,12 +174,12 @@ func setup(cfg config, url string) error {
}
req, err = http.NewRequest("POST", url+"/families/"+cfg.FamilyName+"/tables/"+cfg.TableName, utils.NewJsonReader(tableDef))
if err != nil {
return errors.Wrap(err, "create family request")
return fmt.Errorf("create family request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
res, err = client.Do(req)
if err != nil {
return errors.Wrap(err, "making table request")
return fmt.Errorf("making table request: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusConflict {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/ctlstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -10,7 +11,6 @@ import (
"time"

"github.com/segmentio/conf"
"github.com/segmentio/errors-go"
"github.com/segmentio/events/v2"
_ "github.com/segmentio/events/v2/sigevents"
"github.com/segmentio/stats/v4"
Expand Down Expand Up @@ -291,12 +291,12 @@ func supervisor(ctx context.Context, args []string) {
})
defer teardown()
if err := utils.EnsureDirForFile(cliCfg.ReflectorConfig.LDBPath); err != nil {
return errors.Wrap(err, "ensure ldb dir")
return fmt.Errorf("ensure ldb dir: %w", err)
}

reflector, err := newReflector(cliCfg.ReflectorConfig, true)
if err != nil {
return errors.Wrap(err, "build supervisor reflector")
return fmt.Errorf("build supervisor reflector: %w", err)
}

supervisor, err := supervisorpkg.SupervisorFromConfig(supervisorpkg.SupervisorConfig{
Expand All @@ -306,7 +306,7 @@ func supervisor(ctx context.Context, args []string) {
Reflector: reflector, // compose the reflector, since it will start with the supervisor
})
if err != nil {
return errors.Wrap(err, "start supervisor")
return fmt.Errorf("start supervisor: %w", err)
}
defer supervisor.Close()
supervisor.Start(ctx)
Expand Down Expand Up @@ -403,7 +403,7 @@ func executive(ctx context.Context, args []string) {
defer executive.Close()

if err := executive.Start(ctx, cliCfg.Bind); err != nil {
if errors.Cause(err) != context.Canceled {
if errors.Is(err, context.Canceled) {
errs.IncrDefault(stats.T("op", "service shutdown"))
}
events.Log("executive quit: %v", err)
Expand Down
27 changes: 27 additions & 0 deletions pkg/ctldb/ctldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ CREATE TABLE locks (

INSERT INTO locks VALUES('ledger', 0);

` + LimiterDBSchemaUp,
"sqlite": `

CREATE TABLE families (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(191) NOT NULL UNIQUE
);

CREATE TABLE mutators (
writer VARCHAR(191) NOT NULL PRIMARY KEY,
secret VARCHAR(255),
cookie BLOB(1024) NOT NULL,
clock INTEGER NOT NULL DEFAULT 0
);

CREATE TABLE ctlstore_dml_ledger (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
leader_ts DATETIME DEFAULT CURRENT_TIMESTAMP,
statement TEXT NOT NULL
);

CREATE TABLE locks (
id VARCHAR(191) NOT NULL PRIMARY KEY,
clock INTEGER NOT NULL DEFAULT 0
);

INSERT INTO locks VALUES('ledger', 0);
` + LimiterDBSchemaUp,
"sqlite3": `

Expand Down
4 changes: 3 additions & 1 deletion pkg/ctldb/ctldb_test_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ctldb

import "testing"
import (
"testing"
)

// This configuration comes from the docker-compose.yml file
const testCtlDBRawDSN = "ctldb:ctldbpw@tcp(localhost:3306)/ctldb"
Expand Down
Loading
Loading