Skip to content

Commit

Permalink
Merge pull request #48 from ethpandaops/feat/db-connections
Browse files Browse the repository at this point in the history
feat(server): add config for db connections
  • Loading branch information
Savid authored Feb 13, 2023
2 parents 1968a2d + 13f3fad commit fdc6429
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 22 deletions.
2 changes: 2 additions & 0 deletions docs/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Server requires a single `yaml` config file. An example file can be found [here]
| services.coordinator.persistence | object | | Persistence configuration |
| services.coordinator.persistence.driverName | string | `postgres` | Persistence driver name (`postgres`) |
| services.coordinator.persistence.connectionString | string | | Connection string for the persistence driver |
| services.coordinator.persistence.maxIdleConns | int | `2` | The maximum number of connections in the idle connection pool. `0` means no idle connections are retained. |
| services.coordinator.persistence.maxOpenConns | int | `0` | The maximum number of open connections to the database. `0` means unlimited connections. |
| services.coordinator.persistence.maxQueueSize | int | `51200` | The maximum queue size to buffer items for delayed processing. If the queue gets full it drops the items |
| services.coordinator.persistence.batchTimeout | string | `5s` | The maximum duration for constructing a batch. Processor forcefully sends available items when timeout is reached |
| services.coordinator.persistence.exportTimeout | string | `30s` | The maximum duration for exporting items. If the timeout is reached, the export will be cancelled |
Expand Down
2 changes: 2 additions & 0 deletions example_server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ services:
persistence:
driverName: postgres
connectionString: postgres://postgres:password@localhost:5432/xatu?sslmode=disable
maxIdleConns: 2 # 0 = no idle connections are retained
maxOpenConns: 0 # 0 = unlimited
maxQueueSize: 51200
batchTimeout: 5s
exportTimeout: 30s
Expand Down
15 changes: 9 additions & 6 deletions pkg/server/service/coordinator/persistence/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,24 @@ func New(ctx context.Context, log logrus.FieldLogger, conf *Config) (*Client, er
return c, nil
}

func (e *Client) Start(ctx context.Context) error {
db, err := sql.Open(string(e.config.DriverName), e.config.ConnectionString)
func (c *Client) Start(ctx context.Context) error {
db, err := sql.Open(string(c.config.DriverName), c.config.ConnectionString)
if err != nil {
return err
}

e.db = db
db.SetMaxIdleConns(c.config.MaxIdleConns)
db.SetMaxOpenConns(c.config.MaxOpenConns)

c.db = db

return nil
}

func (e *Client) Stop(ctx context.Context) error {
if e.db == nil {
func (c *Client) Stop(ctx context.Context) error {
if c.db == nil {
return nil
}

return e.db.Close()
return c.db.Close()
}
2 changes: 2 additions & 0 deletions pkg/server/service/coordinator/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
type Config struct {
ConnectionString string `yaml:"connectionString"`
DriverName DriverName `yaml:"driverName"`
MaxIdleConns int `yaml:"maxIdleConns" default:"2"`
MaxOpenConns int `yaml:"maxOpenConns" default:"0"`
MaxQueueSize int `yaml:"maxQueueSize" default:"51200"`
BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"`
ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"`
Expand Down
14 changes: 7 additions & 7 deletions pkg/server/service/coordinator/persistence/node_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type NodeRecordExporter struct {
client *Client
}

func (e *Client) InsertNodeRecords(ctx context.Context, records []*node.Record) error {
func (c *Client) InsertNodeRecords(ctx context.Context, records []*node.Record) error {
values := make([]interface{}, len(records))
for i, record := range records {
values[i] = record
Expand All @@ -24,7 +24,7 @@ func (e *Client) InsertNodeRecords(ctx context.Context, records []*node.Record)
sb := nodeRecordStruct.InsertInto("node_record", values...)
sql, args := sb.Build()
sql += " ON CONFLICT (enr) DO NOTHING;"
_, err := e.db.Exec(sql, args...)
_, err := c.db.Exec(sql, args...)

return err
}
Expand Down Expand Up @@ -54,18 +54,18 @@ func (e *NodeRecordExporter) sendUpstream(ctx context.Context, items []*node.Rec
return e.client.InsertNodeRecords(ctx, items)
}

func (e *Client) UpdateNodeRecord(ctx context.Context, record *node.Record) error {
func (c *Client) UpdateNodeRecord(ctx context.Context, record *node.Record) error {
sb := nodeRecordStruct.Update("node_record", record)
sb.Where(sb.E("enr", record.Enr))

sql, args := sb.Build()

_, err := e.db.Exec(sql, args...)
_, err := c.db.Exec(sql, args...)

return err
}

func (e *Client) CheckoutStalledExecutionNodeRecords(ctx context.Context, limit int) ([]*node.Record, error) {
func (c *Client) CheckoutStalledExecutionNodeRecords(ctx context.Context, limit int) ([]*node.Record, error) {
sb := nodeRecordStruct.SelectFrom("node_record")

sb.Where("eth2 IS NULL")
Expand All @@ -78,7 +78,7 @@ func (e *Client) CheckoutStalledExecutionNodeRecords(ctx context.Context, limit

sql, args := sb.Build()

rows, err := e.db.Query(sql, args...)
rows, err := c.db.Query(sql, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (e *Client) CheckoutStalledExecutionNodeRecords(ctx context.Context, limit
ub.Where(ub.In("enr", enrs...))
sql, args = ub.Build()

_, err = e.db.Exec(sql, args...)
_, err = c.db.Exec(sql, args...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type AvailableExecutionNodeRecord struct {

var availableExecutionNodeRecordStruct = sqlbuilder.NewStruct(new(AvailableExecutionNodeRecord)).For(sqlbuilder.PostgreSQL)

func (e *Client) UpsertNodeRecordActivities(ctx context.Context, activities []*node.Activity) error {
func (c *Client) UpsertNodeRecordActivities(ctx context.Context, activities []*node.Activity) error {
values := make([]interface{}, len(activities))

for i, activity := range activities {
Expand All @@ -37,12 +37,12 @@ func (e *Client) UpsertNodeRecordActivities(ctx context.Context, activities []*n
sqlQuery, args := ub.Build()
sqlQuery += " ON CONFLICT ON CONSTRAINT c_unique DO UPDATE SET update_time = EXCLUDED.update_time, connected = EXCLUDED.connected"

_, err := e.db.Exec(sqlQuery, args...)
_, err := c.db.Exec(sqlQuery, args...)

return err
}

func (e *Client) ListAvailableExecutionNodeRecords(ctx context.Context, clientID string, ignoredNodeRecords []string, networkIDs []uint64, forkIDHashes [][]byte, limit int) ([]*string, error) {
func (c *Client) ListAvailableExecutionNodeRecords(ctx context.Context, clientID string, ignoredNodeRecords []string, networkIDs []uint64, forkIDHashes [][]byte, limit int) ([]*string, error) {
inr := make([]interface{}, 0, len(ignoredNodeRecords))
for _, enr := range ignoredNodeRecords {
inr = append(inr, enr)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (e *Client) ListAvailableExecutionNodeRecords(ctx context.Context, clientID

args[0] = subArgs[0]

rows, err := e.db.Query(sqlQuery, args...)
rows, err := c.db.Query(sqlQuery, args...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

var nodeRecordExecutionStruct = sqlbuilder.NewStruct(new(node.Execution)).For(sqlbuilder.PostgreSQL)

func (e *Client) InsertNodeRecordExecution(ctx context.Context, record *node.Execution) error {
func (c *Client) InsertNodeRecordExecution(ctx context.Context, record *node.Execution) error {
ib := nodeRecordExecutionStruct.InsertInto("node_record_execution")

items := []interface{}{
Expand All @@ -32,16 +32,16 @@ func (e *Client) InsertNodeRecordExecution(ctx context.Context, record *node.Exe

sql, args := ib.Build()

_, err := e.db.Exec(sql, args...)
_, err := c.db.Exec(sql, args...)

if err != nil {
e.log.WithError(err).Error("failed to insert node record execution")
c.log.WithError(err).Error("failed to insert node record execution")
}

return err
}

func (e *Client) ListNodeRecordExecutions(ctx context.Context, networkIDs []uint64, forkIDHashes [][]byte, limit int) ([]*node.Execution, error) {
func (c *Client) ListNodeRecordExecutions(ctx context.Context, networkIDs []uint64, forkIDHashes [][]byte, limit int) ([]*node.Execution, error) {
sb := nodeRecordExecutionStruct.SelectFrom("node_record_execution")

if len(networkIDs) > 0 {
Expand All @@ -67,7 +67,7 @@ func (e *Client) ListNodeRecordExecutions(ctx context.Context, networkIDs []uint

sql, args := sb.Build()

rows, err := e.db.Query(sql, args...)
rows, err := c.db.Query(sql, args...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fdc6429

Please sign in to comment.