Skip to content

Commit

Permalink
Add namespaces to modusdb (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
jairad26 authored Dec 7, 2024
1 parent 94ee90c commit 8b34f2b
Show file tree
Hide file tree
Showing 9 changed files with 581 additions and 179 deletions.
230 changes: 77 additions & 153 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,41 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v240/protos/api"
"github.com/dgraph-io/dgraph/v24/dql"
"github.com/dgraph-io/dgraph/v24/edgraph"
"github.com/dgraph-io/dgraph/v24/posting"
"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/query"
"github.com/dgraph-io/dgraph/v24/schema"
"github.com/dgraph-io/dgraph/v24/worker"
"github.com/dgraph-io/dgraph/v24/x"
"github.com/dgraph-io/ristretto/v2/z"
)

var (
// This ensures that we only have one instance of modusdb in this process.
singeton atomic.Bool
// This ensures that we only have one instance of modusDB in this process.
singleton atomic.Bool

ErrSingletonOnly = errors.New("only one modusdb instance is supported")
ErrEmptyDataDir = errors.New("data directory is required")
ErrDBClosed = errors.New("modusdb instance is closed")
ErrSingletonOnly = errors.New("only one modusDB instance is supported")
ErrEmptyDataDir = errors.New("data directory is required")
ErrClosedDB = errors.New("modusDB instance is closed")
ErrNonExistentNamespace = errors.New("namespace does not exist")
)

// DB is an instance of modusdb.
// For now, we only support one instance of modusdb per process.
// DB is an instance of modusDB.
// For now, we only support one instance of modusDB per process.
type DB struct {
mutex sync.RWMutex
isOpen bool

z *zero

// points to default / 0 / galaxy namespace
gxy *Namespace
}

// New returns a new modusdb instance.
// New returns a new modusDB instance.
func New(conf Config) (*DB, error) {
// Ensure that we do not create another instance of modusdb in the same process
if !singeton.CompareAndSwap(false, true) {
// Ensure that we do not create another instance of modusDB in the same process
if !singleton.CompareAndSwap(false, true) {
return nil, ErrSingletonOnly
}

Expand Down Expand Up @@ -74,198 +76,120 @@ func New(conf Config) (*DB, error) {
}

x.UpdateHealthStatus(true)

db.gxy = &Namespace{id: 0, db: db}
return db, nil
}

// Close closes the modusdb instance.
func (db *DB) Close() {
db.mutex.Lock()
defer db.mutex.Unlock()
func (db *DB) CreateNamespace() (*Namespace, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()

if !db.isOpen {
return
return nil, ErrClosedDB
}

if !singeton.CompareAndSwap(true, false) {
panic("modusdb instance was not properly opened")
startTs, err := db.z.nextTs()
if err != nil {
return nil, err
}
nsID, err := db.z.nextNS()
if err != nil {
return nil, err
}

db.isOpen = false
x.UpdateHealthStatus(false)
posting.Cleanup()
worker.State.Dispose()
if err := worker.ApplyInitialSchema(nsID, startTs); err != nil {
return nil, fmt.Errorf("error applying initial schema: %w", err)
}
for _, pred := range schema.State().Predicates() {
worker.InitTablet(pred)
}

return &Namespace{id: nsID, db: db}, nil
}

// DropAll drops all the data and schema in the modusdb instance.
func (db *DB) DropAll(ctx context.Context) error {
db.mutex.Lock()
defer db.mutex.Unlock()
func (db *DB) GetNamespace(nsID uint64) (*Namespace, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()

if !db.isOpen {
return ErrDBClosed
return nil, ErrClosedDB
}

p := &pb.Proposal{Mutations: &pb.Mutations{
GroupId: 1,
DropOp: pb.Mutations_ALL,
}}
if err := worker.ApplyMutations(ctx, p); err != nil {
return fmt.Errorf("error applying mutation: %w", err)
}
if err := db.reset(); err != nil {
return fmt.Errorf("error resetting db: %w", err)
if nsID > db.z.lastNS {
return nil, ErrNonExistentNamespace
}

// TODO: insert drop record
return nil
// TODO: when delete namespace is implemented, check if the namespace exists

return &Namespace{id: nsID, db: db}, nil
}

// DropData drops all the data in the modusdb instance.
func (db *DB) DropData(ctx context.Context) error {
// DropAll drops all the data and schema in the modusDB instance.
func (db *DB) DropAll(ctx context.Context) error {
db.mutex.Lock()
defer db.mutex.Unlock()

if !db.isOpen {
return ErrDBClosed
return ErrClosedDB
}

p := &pb.Proposal{Mutations: &pb.Mutations{
GroupId: 1,
DropOp: pb.Mutations_DATA,
DropOp: pb.Mutations_ALL,
}}
if err := worker.ApplyMutations(ctx, p); err != nil {
return fmt.Errorf("error applying mutation: %w", err)
}
if err := db.reset(); err != nil {
return fmt.Errorf("error resetting db: %w", err)
}

// TODO: insert drop record
// TODO: should we reset back the timestamp as well?
return nil
}

func (db *DB) AlterSchema(ctx context.Context, sch string) error {
db.mutex.Lock()
defer db.mutex.Unlock()

if !db.isOpen {
return ErrDBClosed
}

sc, err := schema.ParseWithNamespace(sch, 0)
if err != nil {
return fmt.Errorf("error parsing schema: %w", err)
}
for _, pred := range sc.Preds {
worker.InitTablet(pred.Predicate)
}
func (db *DB) DropData(ctx context.Context) error {
return db.gxy.DropData(ctx)
}

startTs, err := db.z.nextTs()
if err != nil {
return err
}
func (db *DB) AlterSchema(ctx context.Context, sch string) error {
return db.gxy.AlterSchema(ctx, sch)
}

p := &pb.Proposal{Mutations: &pb.Mutations{
GroupId: 1,
StartTs: startTs,
Schema: sc.Preds,
Types: sc.Types,
}}
if err := worker.ApplyMutations(ctx, p); err != nil {
return fmt.Errorf("error applying mutation: %w", err)
}
return nil
func (db *DB) Query(ctx context.Context, q string) (*api.Response, error) {
return db.gxy.Query(ctx, q)
}

func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) {
if len(ms) == 0 {
return nil, nil
}
return db.gxy.Mutate(ctx, ms)
}

dms := make([]*dql.Mutation, 0, len(ms))
for _, mu := range ms {
dm, err := edgraph.ParseMutationObject(mu, false)
if err != nil {
return nil, fmt.Errorf("error parsing mutation: %w", err)
}
dms = append(dms, dm)
}
newUids, err := query.ExtractBlankUIDs(ctx, dms)
if err != nil {
return nil, err
}
if len(newUids) > 0 {
num := &pb.Num{Val: uint64(len(newUids)), Type: pb.Num_UID}
res, err := db.z.nextUIDs(num)
if err != nil {
return nil, err
}
func (db *DB) Load(ctx context.Context, schemaPath, dataPath string) error {
return db.gxy.Load(ctx, schemaPath, dataPath)
}

curId := res.StartId
for k := range newUids {
x.AssertTruef(curId != 0 && curId <= res.EndId, "not enough uids generated")
newUids[k] = curId
curId++
}
}
edges, err := query.ToDirectedEdges(dms, newUids)
if err != nil {
return nil, err
}
ctx = x.AttachNamespace(ctx, 0)
func (db *DB) LoadData(inCtx context.Context, dataDir string) error {
return db.gxy.LoadData(inCtx, dataDir)
}

// Close closes the modusDB instance.
func (db *DB) Close() {
db.mutex.Lock()
defer db.mutex.Unlock()

if !db.isOpen {
return nil, ErrDBClosed
}

startTs, err := db.z.nextTs()
if err != nil {
return nil, err
}
commitTs, err := db.z.nextTs()
if err != nil {
return nil, err
}

m := &pb.Mutations{
GroupId: 1,
StartTs: startTs,
Edges: edges,
}
m.Edges, err = query.ExpandEdges(ctx, m)
if err != nil {
return nil, fmt.Errorf("error expanding edges: %w", err)
}

for _, edge := range m.Edges {
worker.InitTablet(edge.Attr)
}

p := &pb.Proposal{Mutations: m, StartTs: startTs}
if err := worker.ApplyMutations(ctx, p); err != nil {
return nil, err
return
}

return newUids, worker.ApplyCommited(ctx, &pb.OracleDelta{
Txns: []*pb.TxnStatus{{StartTs: startTs, CommitTs: commitTs}},
})
}

// Query performs query or mutation or upsert on the given modusdb instance.
func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()

if !db.isOpen {
return nil, ErrDBClosed
if !singleton.CompareAndSwap(true, false) {
panic("modusDB instance was not properly opened")
}

return (&edgraph.Server{}).QueryNoGrpc(ctx, &api.Request{
ReadOnly: true,
Query: query,
StartTs: db.z.readTs(),
})
db.isOpen = false
x.UpdateHealthStatus(false)
posting.Cleanup()
worker.State.Dispose()
}

func (db *DB) reset() error {
Expand All @@ -275,7 +199,7 @@ func (db *DB) reset() error {
}

if !restart {
if err := worker.ApplyInitialSchema(); err != nil {
if err := worker.ApplyInitialSchema(0, 1); err != nil {
return fmt.Errorf("error applying initial schema: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require (
github.com/cavaliergopher/grab/v3 v3.0.1
github.com/dgraph-io/badger/v4 v4.4.0
github.com/dgraph-io/dgo/v240 v240.0.1
github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241127082017-fd45b3875e3f
github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241202011806-64256ce6cac9
github.com/dgraph-io/ristretto/v2 v2.0.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
golang.org/x/sync v0.9.0
)

Expand All @@ -25,7 +25,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.15.0 // indirect
github.com/bits-and-blooms/bitset v1.17.0 // indirect
github.com/blevesearch/bleve/v2 v2.4.3 // indirect
github.com/blevesearch/bleve_index_api v1.1.12 // indirect
github.com/blevesearch/geo v0.1.20 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.15.0 h1:DiCRMscZsGyYePE9AR3sVhKqUXCt5IZvkX5AfAc5xLQ=
github.com/bits-and-blooms/bitset v1.15.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.17.0 h1:1X2TS7aHz1ELcC0yU1y2stUs/0ig5oMU6STFZGrhvHI=
github.com/bits-and-blooms/bitset v1.17.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blevesearch/bleve/v2 v2.4.3 h1:XDYj+1prgX84L2Cf+V3ojrOPqXxy0qxyd2uLMmeuD+4=
github.com/blevesearch/bleve/v2 v2.4.3/go.mod h1:hEPDPrbYw3vyrm5VOa36GyS4bHWuIf4Fflp7460QQXY=
Expand Down Expand Up @@ -138,8 +138,8 @@ github.com/dgraph-io/badger/v4 v4.4.0 h1:rA48XiDynZLyMdlaJl67p9+lqfqwxlgKtCpYLAi
github.com/dgraph-io/badger/v4 v4.4.0/go.mod h1:sONMmPPfbnj9FPwS/etCqky/ULth6CQJuAZSuWCmixE=
github.com/dgraph-io/dgo/v240 v240.0.1 h1:R0d9Cao3MOghrC9RVXshw6v8Jr/IjKgU2mK9sR9nclc=
github.com/dgraph-io/dgo/v240 v240.0.1/go.mod h1:urpjhWGdYVSVQAwd000iu4wHyHPpuHpwJ7aILsuGF5A=
github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241127082017-fd45b3875e3f h1:cFVPh3MdiAQWUoK8d6+Xj2z6+FNe/JzUAqNRNjQGSlI=
github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241127082017-fd45b3875e3f/go.mod h1:rPhzF3u8SjgEDp5v3tJI05f/8O8TUJiZ5/a6gYwj0qc=
github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241202011806-64256ce6cac9 h1:6ink3iffWaAqHCOX8j35oC8+K2oiDFLwsyNSd40YmBQ=
github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241202011806-64256ce6cac9/go.mod h1:2e/yPl+J7eEl9eaeeYSGMwuiQAxVh9x2Gm1SsaVvM3o=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
Expand Down Expand Up @@ -610,8 +610,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
Expand Down
Loading

0 comments on commit 8b34f2b

Please sign in to comment.