diff --git a/pkg/editor/pool/block.go b/pkg/editor/pool/block.go index 4dacaa0..38fd801 100644 --- a/pkg/editor/pool/block.go +++ b/pkg/editor/pool/block.go @@ -16,12 +16,12 @@ import ( type SynchronizedBlock struct { sync.Mutex - blockPath string - fname string - block *block.Block - changed bool - toSyncFile bool - toReadFile bool + blockPath string + fname string + block *block.Block + changed bool + toFile bool + fromFile bool } func NewEmptySynchronizedBlock(blockPath string) (*SynchronizedBlock, error) { @@ -32,19 +32,38 @@ func NewEmptySynchronizedBlock(blockPath string) (*SynchronizedBlock, error) { return &SynchronizedBlock{blockPath: blockPath}, nil } +func NewUnSynchronizedBlock(blockPath string, block *block.Block) (*SynchronizedBlock, error) { + err := os.MkdirAll(blockPath, 0777) + if err != nil { + return nil, err + } + sb, err := NewEmptySynchronizedBlock(blockPath) + if err != nil { + return nil, err + } + sb.block = block + sb.toFile = true + return sb, nil +} + func (sb *SynchronizedBlock) Sync(db *sql.DB, note *Note) (*block.Block, error) { sb.Lock() defer sb.Unlock() - if !sb.toReadFile { + if !sb.fromFile { err := sb.write() if err != nil { return nil, err } - sb.toSyncFile = false + sb.toFile = false return nil, nil } + err := local.InsertBlock(db, ¬e.ID, sb.block) + if err != nil { + return nil, err + } + fname, err := latestFileName(sb.blockPath) if err != nil { return nil, err @@ -57,8 +76,8 @@ func (sb *SynchronizedBlock) Sync(db *sql.DB, note *Note) (*block.Block, error) return nil, err } } - sb.toReadFile = false - sb.toSyncFile = false + sb.fromFile = false + sb.toFile = false sb.changed = false return nil, nil } @@ -79,8 +98,8 @@ func (sb *SynchronizedBlock) Sync(db *sql.DB, note *Note) (*block.Block, error) if err != nil { return nil, err } - sb.toReadFile = false - sb.toSyncFile = false + sb.fromFile = false + sb.toFile = false sb.changed = false } diff --git a/pkg/editor/pool/note.go b/pkg/editor/pool/note.go index dda253f..d696d57 100644 --- a/pkg/editor/pool/note.go +++ b/pkg/editor/pool/note.go @@ -38,12 +38,12 @@ type Note struct { } type Blocks map[common.BlockID]*SynchronizedBlock -func NewNote(path string, replicaID common.ReplicaID, interval time.Duration) (*Note, error) { +func NewNote(db *sql.DB, replicaID common.ReplicaID, path string, interval time.Duration) (*Note, error) { noteID, err := readNoteID(path) if err != nil { return nil, err } - err = nav.NewNoteBlockIfNeeded(path, noteID.String()) + err = nav.NewNoteBlockIfNeeded(db, path, noteID.String()) if err != nil { return nil, err } @@ -93,30 +93,30 @@ func (note *Note) Contribute(ctx context.Context, bytes []byte) error { return err } db := ctx.Value(identifier.DB).(*sql.DB) - blockIDs, err := local.Insert(db, ¬e.ID, ctrbs) + blockIDs, err := local.InsertCTRBs(db, ¬e.ID, ctrbs) if err != nil { return err } + + updated := make(map[common.BlockID]bool) + for _, ctrb := range ctrbs { sb, ok := note.blocks[ctrb.BlockID] if !ok { blockPath := filepath.Join(note.path, "blocks", ctrb.BlockID.String()) - err := os.MkdirAll(blockPath, 0777) - if err != nil { - return err - } - sb, err = NewEmptySynchronizedBlock(blockPath) + sb, err = NewUnSynchronizedBlock(blockPath, ctrb.Operations.BINS) if err != nil { return err } - sb.block = ctrb.Operations.BINS note.blocks[ctrb.BlockID] = sb } err := sb.Apply(*ctrb) if err != nil { return err } + updated[ctrb.BlockID] = true } + return note.flagSync(ctx, blockIDs, false) } @@ -138,10 +138,10 @@ func (note *Note) flagSync(ctx context.Context, blockIDs []*uuid.UUID, isFS bool } note.blocks[*blockID] = sb } - if isFS && !sb.toReadFile { - sb.toReadFile = true + if isFS && !sb.fromFile { + sb.fromFile = true } - sb.toSyncFile = true + sb.toFile = true } note.wg.Add(note.dt.Add(func() { @@ -154,7 +154,7 @@ func (note *Note) flagSync(ctx context.Context, blockIDs []*uuid.UUID, isFS bool func (note *Note) sync(ctx context.Context) { db := ctx.Value(identifier.DB).(*sql.DB) for blockID, sb := range note.blocks { - if !sb.toSyncFile { + if !sb.toFile && !sb.fromFile { continue } block, err := sb.Sync(db, note) @@ -206,6 +206,7 @@ func readBlocks(path string) (Blocks, error) { if err != nil { return nil, err } + var wg sync.WaitGroup var mutex sync.Mutex blocks := make(Blocks) @@ -224,11 +225,13 @@ func readBlocks(path string) (Blocks, error) { } sb.fname = fname sb.block = block + sb.fromFile = true mutex.Lock() blocks[sb.block.BlockID] = sb mutex.Unlock() }(bid) } wg.Wait() + return blocks, nil } diff --git a/pkg/editor/pool/pool.go b/pkg/editor/pool/pool.go index 6131c1e..412a94b 100644 --- a/pkg/editor/pool/pool.go +++ b/pkg/editor/pool/pool.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "path/filepath" "strings" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/notebox/nb-crdt-go/common" "github.com/notebox/nbfm/pkg/config" "github.com/notebox/nbfm/pkg/identifier" + local "github.com/notebox/nbfm/pkg/local/note" ) type Path = string @@ -42,7 +44,9 @@ func (pool *Pool) Open(ctx context.Context, path string) (*Note, error) { if ok { note.wg.Add(1) } else { - note, err = NewNote(path, ctx.Value(identifier.ReplicaID).(uint32), pool.syncInterval) + db := ctx.Value(identifier.DB).(*sql.DB) + replicaID := ctx.Value(identifier.ReplicaID).(common.ReplicaID) + note, err = NewNote(db, replicaID, path, pool.syncInterval) if err != nil { return nil, err } @@ -50,12 +54,28 @@ func (pool *Pool) Open(ctx context.Context, path string) (*Note, error) { note.wg = new(sync.WaitGroup) note.wg.Add(1) - for _, sb := range note.blocks { - sb.toSyncFile, err = sb.update(ctx.Value(identifier.DB).(*sql.DB), note) + + cached, err := local.SelectBlocks(db, ¬e.ID) + if err != nil { + return nil, err + } + blocksPath := filepath.Join(path, "blocks") + for _, b := range cached { + sb, ok := note.blocks[b.BlockID] + if ok { + sb.toFile, err = sb.update(db, note) + if err != nil { + return nil, err + } + continue + } + sb, err = NewUnSynchronizedBlock(filepath.Join(blocksPath, b.BlockID.String()), b) if err != nil { return nil, err } + note.blocks[b.BlockID] = sb } + note.wg.Add(1) note.sync(ctx) @@ -68,7 +88,7 @@ func (pool *Pool) Open(ctx context.Context, path string) (*Note, error) { }() go pool.watch(ctx, note) - err := note.watcher.AddRecursive(note.path) + err = note.watcher.AddRecursive(note.path) if err != nil { return nil, err } diff --git a/pkg/local/note/ctrb.go b/pkg/local/note/ctrb.go index 69fc3a1..acb93a5 100644 --- a/pkg/local/note/ctrb.go +++ b/pkg/local/note/ctrb.go @@ -35,6 +35,13 @@ func Prepare(db *sql.DB, path string) error { ); CREATE INDEX IF NOT EXISTS idx_nb_note_ctrbs_ids ON nb_note_ctrbs (note_id, block_id); CREATE INDEX IF NOT EXISTS idx_nb_note_ctrbs_block_nonce ON nb_note_ctrbs (block_nonce); + CREATE TABLE IF NOT EXISTS nb_note_blocks ( + note_id TEXT NOT NULL, + block_id TEXT NOT NULL, + data BLOB NOT NULL, + UNIQUE (note_id, block_id) + ); + CREATE INDEX IF NOT EXISTS idx_nb_note_blocks_ids ON nb_note_blocks (note_id, block_id); `) return err } @@ -48,7 +55,7 @@ func isInstalled(db *sql.DB) (bool, error) { return rows.Next(), nil } -func Insert(db *sql.DB, noteID *uuid.UUID, contributions []*block.Contribution) ([]*uuid.UUID, error) { +func InsertCTRBs(db *sql.DB, noteID *uuid.UUID, contributions []*block.Contribution) ([]*uuid.UUID, error) { var blockIDs []*uuid.UUID q := "INSERT INTO nb_note_ctrbs (note_id, block_id, block_nonce, text_nonce, replica_id, timestamp, ops) VALUES" v := []any{} @@ -72,6 +79,65 @@ func Insert(db *sql.DB, noteID *uuid.UUID, contributions []*block.Contribution) return blockIDs, err } +func InsertBlock(db *sql.DB, noteID *uuid.UUID, block *block.Block) error { + data, err := json.Marshal(block) + if err != nil { + return err + } + _, err = db.Exec("INSERT OR REPLACE INTO nb_note_blocks (note_id, block_id, data) VALUES (?, ?, ?)", noteID, block.BlockID, data) + return err +} + +func SelectBlockData(db *sql.DB, noteID *uuid.UUID, blockID *uuid.UUID) ([]byte, error) { + rows, err := db.Query("SELECT data FROM nb_note_blocks WHERE note_id = ? AND block_id LIMIT 1", noteID, blockID) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + defer rows.Close() + + for rows.Next() { + var data []byte + err := rows.Scan(&data) + if err != nil { + return nil, err + } + return data, nil + } + return nil, nil +} + +func SelectBlocks(db *sql.DB, noteID *uuid.UUID) ([]*block.Block, error) { + var blocks []*block.Block + + rows, err := db.Query("SELECT data FROM nb_note_blocks WHERE note_id = ?", noteID) + if err != nil { + if err == sql.ErrNoRows { + return blocks, nil + } + return nil, err + } + defer rows.Close() + + for rows.Next() { + var data []byte + err := rows.Scan(&data) + if err != nil { + return nil, err + } + var b block.Block + err = json.Unmarshal(data, &b) + if err != nil { + return nil, err + } + blocks = append(blocks, &b) + } + + return blocks, nil +} + func SelectAllAfter(db *sql.DB, replicaID uint32, noteID *uuid.UUID, blockID *uuid.UUID, blockNonce common.Nonce, textNonce common.Nonce) ([]*block.Contribution, error) { rows, err := db.Query("SELECT block_id, block_nonce, text_nonce, replica_id, timestamp, ops FROM nb_note_ctrbs WHERE replica_id = ? AND note_id = ? AND block_id = ? AND (block_nonce > ? OR (block_nonce = ? AND text_nonce > ?)) ORDER BY block_nonce ASC", replicaID, noteID, blockID, blockNonce, blockNonce, textNonce) if err != nil { diff --git a/pkg/nav/exec.go b/pkg/nav/exec.go index f57286e..37e57f4 100644 --- a/pkg/nav/exec.go +++ b/pkg/nav/exec.go @@ -1,6 +1,7 @@ package nav import ( + "database/sql" "fmt" "os" "path/filepath" @@ -9,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/notebox/nbfm/pkg/config" + local "github.com/notebox/nbfm/pkg/local/note" ) func DeleteFile(path string) error { @@ -27,7 +29,7 @@ func AddFile(path string) error { return err } noteIDStr := noteID.String() - err = NewNoteBlockIfNeeded(path, noteIDStr) + err = NewNoteBlockIfNeeded(nil, path, noteIDStr) if err != nil { return err } @@ -65,7 +67,7 @@ func MoveFile(src, dst string) error { return sysMoveFile(src, dst) } -func NewNoteBlockIfNeeded(path, noteIDStr string) error { +func NewNoteBlockIfNeeded(db *sql.DB, path, noteIDStr string) error { noteBlockPath := filepath.Join(path, "blocks", noteIDStr, NewBlockFileName()) dirPath := filepath.Dir(noteBlockPath) if _, err := os.Stat(dirPath); !os.IsNotExist(err) { @@ -76,7 +78,23 @@ func NewNoteBlockIfNeeded(path, noteIDStr string) error { if err != nil { return err } - return os.WriteFile(noteBlockPath, []byte(fmt.Sprintf(`["%s",{},[[0,0,1]],{"TYPE":[null,"NOTE"]},false,[]]`, noteIDStr)), 0777) + + noteID, err := uuid.Parse(noteIDStr) + if err != nil { + return err + } + var data []byte + if db != nil { + data, err = local.SelectBlockData(db, ¬eID, ¬eID) + if err != nil { + return err + } + } + if data == nil { + data = []byte(fmt.Sprintf(`["%s",{},[[0,0,1]],{"TYPE":[null,"NOTE"]},false,[]]`, noteIDStr)) + } + + return os.WriteFile(noteBlockPath, data, 0777) } func NewBlockFileName() string {