Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zjg555543 committed Dec 16, 2024
1 parent 340e7c8 commit c849c53
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 107 deletions.
13 changes: 7 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"github.com/0xPolygon/cdk/db/types"
"math/big"
"net/http"
"os"
Expand Down Expand Up @@ -824,15 +825,15 @@ func runSqliteServiceIfNeeded(
if isNeeded([]string{
cdkcommon.AGGREGATOR},
components) {
dbPath[sqldb.AGG_TX_MGR] = cfg.Aggregator.EthTxManager.StoragePath
dbPath[sqldb.AGG_SYNC] = cfg.Aggregator.Synchronizer.SQLDB.DataSource
dbPath[sqldb.AGG_REORG_L1] = cfg.ReorgDetectorL1.DBPath
dbPath[types.AggTxMgr] = cfg.Aggregator.EthTxManager.StoragePath
dbPath[types.AggSync] = cfg.Aggregator.Synchronizer.SQLDB.DataSource
dbPath[types.AggReorgL1] = cfg.ReorgDetectorL1.DBPath
} else if isNeeded([]string{
cdkcommon.SEQUENCE_SENDER},
components) {
dbPath[sqldb.SEQS_TX_MGR] = cfg.SequenceSender.EthTxManager.StoragePath
dbPath[sqldb.SEQS_L1_TREE] = cfg.L1InfoTreeSync.DBPath
dbPath[sqldb.SEQS_REORG_L1] = cfg.ReorgDetectorL1.DBPath
dbPath[types.SeqsTxMgr] = cfg.SequenceSender.EthTxManager.StoragePath
dbPath[types.SeqsL1Tree] = cfg.L1InfoTreeSync.DBPath
dbPath[types.SeqsReorgL1] = cfg.ReorgDetectorL1.DBPath
} else {
log.Warn("No need to start sqlite service")
return
Expand Down
9 changes: 9 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,13 @@ SaveCertificatesToFilesPath = ""
MaxRetriesStoreCertificate = 3
DelayBeetweenRetries = "60s"
KeepCertificatesHistory = true
[Sqlite]
Host = "0.0.0.0"
Port = 8081
ReadTimeout = "2s"
WriteTimeout = "2s"
AuthMethodList = "select,insert,update,delete"
MaxRequestsPerIPAndSecond = 500
`
157 changes: 56 additions & 101 deletions db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,41 @@ package db

import (
"context"
dbSql "database/sql"
"errors"
"fmt"
"strings"
"time"

"database/sql"
"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/0xPolygon/cdk/db/types"
"github.com/0xPolygon/cdk/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

const (
NAME = "sqlite"
meterName = "github.com/0xPolygon/cdk/sqlite/service"

METHOD_SELECT = "select"
METHOD_INSERT = "insert"
METHOD_UPDATE = "update"
METHOD_DELETE = "delete"

LIMIT_SQL_LEN = 6

zeroHex = "0x0"

SEQS_L1_TREE = "seqs_l1tree"
SEQS_TX_MGR = "seqs_txmgr"
SEQS_REORG_L1 = "seqs_reorg_l1"

AGG_SYNC = "agg_sync"
AGG_TX_MGR = "agg_txmgr"
AGG_REORG_L1 = "agg_reorg_l1"
)

type SqliteEndpoints struct {
logger *log.Logger
meter metric.Meter
readTimeout time.Duration
writeTimeout time.Duration

authMethods []string

dbMaps map[string]string
sqlDBs map[string]*dbSql.DB
sqlDBs map[string]*sql.DB
}

func CreateSqliteService(
cfg Config,
dbMaps map[string]string,
) *rpc.Server {
logger := log.WithFields("module", NAME)
logger := log.WithFields("module", types.NAME)

meter := otel.Meter(meterName)
methodList := strings.Split(cfg.AuthMethodList, ",")
log.Info(fmt.Sprintf("Sqlite service method auth list: %s", methodList))
for _, s := range methodList {
methodList = append(methodList, s)
}
log.Info(fmt.Sprintf("Sqlite service dbMaps: %v", dbMaps))
sqlDBs := make(map[string]*dbSql.DB)
sqlDBs := make(map[string]*sql.DB)
for k, dbPath := range dbMaps {
log.Info(fmt.Sprintf("Sqlite service: %s, %s", k, dbPath))
db, err := NewSQLiteDB(dbPath)
Expand All @@ -75,10 +49,9 @@ func CreateSqliteService(

services := []rpc.Service{
{
Name: NAME,
Name: types.NAME,
Service: &SqliteEndpoints{
logger: logger,
meter: meter,
readTimeout: cfg.ReadTimeout.Duration,
writeTimeout: cfg.WriteTimeout.Duration,
authMethods: methodList,
Expand All @@ -97,23 +70,19 @@ func CreateSqliteService(
}, services, rpc.WithLogger(logger.GetSugaredLogger()))
}

type A struct {
Fields map[string]interface{}
}

func (b *SqliteEndpoints) Select(
db string,
sql string,
dbName string,
sqlCmd string,
) (interface{}, rpc.Error) {
err, dbCon := b.checkAndGetDB(db, sql, METHOD_SELECT)
err, dbCon := b.checkAndGetDB(dbName, sqlCmd, types.MethodSelect)
if err != nil {
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error()))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error()))
}
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
rows, err := dbCon.QueryContext(ctx, sql)
rows, err := dbCon.QueryContext(ctx, sqlCmd)
if err != nil {
if errors.Is(err, dbSql.ErrNoRows) {
if errors.Is(err, sql.ErrNoRows) {
return nil, rpc.NewRPCError(
rpc.DefaultErrorCode, fmt.Sprintf("No rows"), ErrNotFound)
}
Expand All @@ -129,45 +98,67 @@ func (b *SqliteEndpoints) Select(
}

func (b *SqliteEndpoints) Insert(
dbName string,
sqlCmd string,
) (interface{}, rpc.Error) {
return b.alterMethod(types.MethodInsert, dbName, sqlCmd)
}

func (b *SqliteEndpoints) Delete(
dbName string,
sqlCmd string,
) (interface{}, rpc.Error) {
return b.alterMethod(types.MethodDelete, dbName, sqlCmd)
}

func (b *SqliteEndpoints) Update(
dbName string,
sqlCmd string,
) (interface{}, rpc.Error) {
return b.alterMethod(types.MethodUpdate, dbName, sqlCmd)
}

func (b *SqliteEndpoints) alterMethod(
method string,
db string,
sql string,
) (interface{}, rpc.Error) {
log.Info(fmt.Sprintf("Sqlite service insert: %s, %s", db, sql))
err, dbCon := b.checkAndGetDB(db, sql, METHOD_INSERT)
log.Info(fmt.Sprintf("Sqlite: %s, %s", db, sql))
err, dbCon := b.checkAndGetDB(db, sql, method)
if err != nil {
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error()))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("check params invalid: %s", err.Error()))
}
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()

tx, err := NewTx(ctx, dbCon)
if err != nil {
log.Error(fmt.Sprintf("failed to create tx: %s", err))
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to create tx: %s", err))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to create tx: %s", err))
}
shouldRollback := true
defer func() {
if shouldRollback {
if errRllbck := tx.Rollback(); errRllbck != nil {
log.Errorf("error while rolling back tx %v", errRllbck)
if errRollback := tx.Rollback(); errRollback != nil {
log.Errorf("error while rolling back tx %v", errRollback)
}
}
}()

ct, err := tx.Exec(sql)
if err != nil {
log.Error(fmt.Sprintf("failed to exec: %s", err))
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to exec: %s", err))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to exec: %s", err))
}
count, err := ct.RowsAffected()
if err != nil {
log.Error(fmt.Sprintf("failed to get rows affected: %s", err))
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to get rows affected: %s", err))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to get rows affected: %s", err))
}

if err := tx.Commit(); err != nil {
log.Error(fmt.Sprintf("failed to commit: %s", err))
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to commit: %s", err))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to commit: %s", err))
}
shouldRollback = false

Expand All @@ -176,45 +167,20 @@ func (b *SqliteEndpoints) Insert(
}, nil
}

func (b *SqliteEndpoints) Update(
db string,
sql string,
) (interface{}, rpc.Error) {
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
c, merr := b.meter.Int64Counter("claim_proof")
if merr != nil {
b.logger.Warnf("failed to create claim_proof counter: %s", merr)
}
c.Add(ctx, 1)

return types.SqliteData{}, nil
}

func (b *SqliteEndpoints) Delete(
db string,
sql string,
) (interface{}, rpc.Error) {
log.Info(fmt.Sprintf("Sqlite service Delete: %s, %s", db, sql))

return types.SqliteData{}, nil
}

func (b *SqliteEndpoints) GetDbs() (interface{}, rpc.Error) {
//var dbList []string
dbList := make(map[string][]string)
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
for k, dbPath := range b.dbMaps {
log.Info(fmt.Sprintf("Sqlite service: %s, %s", k, dbPath))
sql := "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name;"
err, dbCon := b.checkAndGetDB(k, sql, METHOD_SELECT)
sqlCmd := "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name;"
err, dbCon := b.checkAndGetDB(k, sqlCmd, types.MethodSelect)
if err != nil {
return zeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("%s", err.Error()))
return types.ZeroHex, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("%s", err.Error()))
}
ctx, cancel := context.WithTimeout(context.Background(), b.readTimeout)
defer cancel()
rows, err := dbCon.QueryContext(ctx, sql)
rows, err := dbCon.QueryContext(ctx, sqlCmd)
if err != nil {
if errors.Is(err, dbSql.ErrNoRows) {
if errors.Is(err, sql.ErrNoRows) {
return nil, rpc.NewRPCError(
rpc.DefaultErrorCode, fmt.Sprintf("No rows"), ErrNotFound)
}
Expand All @@ -225,24 +191,21 @@ func (b *SqliteEndpoints) GetDbs() (interface{}, rpc.Error) {
if err != nil {
return nil, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("failed to get results: %s", err.Error()))
}

dbList[k] = result
}

return dbList, nil
}

func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (error, *dbSql.DB) {
log.Info(fmt.Sprintf("Sqlite endpoints, check db:%v,sql:%v,method:%v", db, sql, method))
if len(sql) <= LIMIT_SQL_LEN {
func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (error, *sql.DB) {

Check failure on line 200 in db/service.go

View workflow job for this annotation

GitHub Actions / lint

ST1008: error should be returned as the last argument (stylecheck)
log.Info(fmt.Sprintf("Sqlite check db:%v, sql:%v, method:%v", db, sql, method))
if len(sql) <= types.LimitSqlLen {
return fmt.Errorf("sql length is too short"), nil
}

sqlMethod := strings.ToLower(sql[:6])
if sqlMethod != method {
return fmt.Errorf("sql method is not valid"), nil
}

found := false
for _, str := range b.authMethods {
if str == method {
Expand All @@ -253,48 +216,41 @@ func (b *SqliteEndpoints) checkAndGetDB(db string, sql string, method string) (e
if !found {
return fmt.Errorf("sql method is not authorized"), nil
}

dbCon, ok := b.sqlDBs[db]
if !ok {
return fmt.Errorf("sql db is not valid"), nil
}
return nil, dbCon
}

func getResults(rows *dbSql.Rows) (error, []A) {
var result []A
func getResults(rows *sql.Rows) (error, []types.QueryData) {

Check failure on line 226 in db/service.go

View workflow job for this annotation

GitHub Actions / lint

ST1008: error should be returned as the last argument (stylecheck)
var result []types.QueryData
columns, err := rows.Columns()
if err != nil {
log.Error(fmt.Sprintf("Failed to get columns: %v", err))
return err, nil
}
for rows.Next() {
record := A{Fields: make(map[string]interface{})}

record := types.QueryData{Fields: make(map[string]interface{})}
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}

if err := rows.Scan(valuePtrs...); err != nil {
log.Error("Failed to scan row: %v", err)
return err, nil
}

for i, colName := range columns {
record.Fields[colName] = values[i]
}

result = append(result, record)
}

return nil, result
}

func getTables(rows *dbSql.Rows) (error, []string) {
func getTables(rows *sql.Rows) (error, []string) {

Check failure on line 252 in db/service.go

View workflow job for this annotation

GitHub Actions / lint

ST1008: error should be returned as the last argument (stylecheck)
var result []string

for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {
Expand All @@ -303,6 +259,5 @@ func getTables(rows *dbSql.Rows) (error, []string) {
log.Info(fmt.Sprintf("Table name: %s", tableName))
result = append(result, tableName)
}

return nil, result
}
Loading

0 comments on commit c849c53

Please sign in to comment.