Skip to content

Commit

Permalink
MB-57888: New apis for index update
Browse files Browse the repository at this point in the history
  • Loading branch information
Likith101 committed Jan 9, 2025
1 parent 88f6374 commit 62ad148
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 68 deletions.
18 changes: 18 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,24 @@ func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error)
return openIndexUsing(path, runtimeConfig)
}

// Update index at the specified path, must exist.
// The mapping used when created will be overwritten by the mapping provided
// for all Index/Search operations.
// Throws an error without any changes to the index if an unupdatable mapping is provided
func Update(path string, newParams string) (Index, error) {
return updateIndexUsing(path, nil, newParams)
}

// UpdateUsing index at the specified path, must exist.
// The mapping used when created will be overwritten by the mapping provided
// for all Index/Search operations.
// The provided runtimeConfig can override settings
// persisted when the kvstore was created.
// Throws an error without any changes to the index if an unupdatable mapping is provided
func UpdateUsing(path string, runtimeConfig map[string]interface{}, newParams string) (Index, error) {
return updateIndexUsing(path, runtimeConfig, newParams)
}

// Builder is a limited interface, used to build indexes in an offline mode.
// Items cannot be updated or deleted, and the caller MUST ensure a document is
// indexed only once.
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (s *Scorch) loadSnapshot(snapshot *bolt.Bucket) (*IndexSnapshot, error) {
segmentBucket := snapshot.Bucket(k)
if segmentBucket == nil {
_ = rv.DecRef()
return nil, fmt.Errorf("segment key, but bucket missing % x", k)
return nil, fmt.Errorf("segment key, but bucket missing %x", k)
}
segmentSnapshot, err := s.loadSegment(segmentBucket)
if err != nil {
Expand Down
43 changes: 27 additions & 16 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const Version uint8 = 2

var ErrClosed = fmt.Errorf("scorch closed")

var mappingInternalKey = []byte("_mapping")
var MappingInternalKey = []byte("_mapping")

type Scorch struct {
nextSegmentID uint64
Expand Down Expand Up @@ -886,15 +886,27 @@ func (s *Scorch) FireIndexEvent() {
s.fireEvent(EventKindIndexStart, 0)
}

// Updates bolt db with the given field info. Existing field info already in bolt
// will be merged before persisting. The index mapping is also overwritted both
// in bolt as well as the index snapshot
func (s *Scorch) UpdateFields(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error {

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 892 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
err := s.updateBolt(fieldInfo, mappingBytes)
// Switch from pointer to value to marshal into a json for storage
updatedFields := make(map[string]index.FieldInfo)

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 894 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
for field, info := range fieldInfo {
updatedFields[field] = *info
}
err := s.updateBolt(updatedFields, mappingBytes)
if err != nil {
return err
}
s.root.m.Lock()
s.root.updatedFields = updatedFields
s.root.m.Unlock()
return nil
}

func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes []byte) error {
// Merge and update deleted field info and rewrite index mapping
func (s *Scorch) updateBolt(fieldInfo map[string]index.FieldInfo, mappingBytes []byte) error {

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 909 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
return s.rootBolt.Update(func(tx *bolt.Tx) error {
snapshots := tx.Bucket(boltSnapshotsBucket)
if snapshots == nil {
Expand All @@ -910,20 +922,20 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes
}
snapshot := snapshots.Bucket(k)
cc := snapshot.Cursor()
for kk, _ := cc.First(); kk != nil; kk, _ = c.Next() {
if k[0] == boltInternalKey[0] {
internalBucket := snapshot.Bucket(k)
for kk, _ := cc.First(); kk != nil; kk, _ = cc.Next() {
if kk[0] == boltInternalKey[0] {
internalBucket := snapshot.Bucket(kk)
if internalBucket == nil {
return fmt.Errorf("segment key, but bucket missing % x", k)
return fmt.Errorf("segment key, but bucket missing %x", kk)
}
err = internalBucket.Put(mappingInternalKey, mappingBytes)
err = internalBucket.Put(MappingInternalKey, mappingBytes)
if err != nil {
return err
}
} else if k[0] != boltMetaDataKey[0] {
segmentBucket := snapshot.Bucket(k)
} else if kk[0] != boltMetaDataKey[0] {
segmentBucket := snapshot.Bucket(kk)
if segmentBucket == nil {
return fmt.Errorf("segment key, but bucket missing % x", k)
return fmt.Errorf("segment key, but bucket missing %x", kk)
}
var updatedFields map[string]index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.FieldInfo

Check failure on line 940 in index/scorch/scorch.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.FieldInfo
updatedFieldBytes := segmentBucket.Get(boltUpdatedFieldsKey)
Expand All @@ -932,11 +944,11 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes
if err != nil {
return fmt.Errorf("error reading updated field bytes: %v", err)
}
for field, info := range fieldInfo {
updatedFields[field] = info
}
} else {
updatedFields = make(map[string]index.FieldInfo)
}
for field, info := range fieldInfo {
updatedFields[field] = *info
updatedFields = fieldInfo
}
b, err := json.Marshal(updatedFields)
if err != nil {
Expand All @@ -949,7 +961,6 @@ func (s *Scorch) updateBolt(fieldInfo map[string]*index.FieldInfo, mappingBytes
}
}
}

return nil
})
}
10 changes: 8 additions & 2 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,12 @@ func (is *IndexSnapshot) Document(id string) (rv index.Document, err error) {
// Keeping that TODO for now until we have a cleaner way.
rvd.StoredFieldsSize += uint64(len(val))

// Skip fields that are supposed to have deleted store values
if info, ok := is.updatedFields[name]; ok &&
(info.All || info.Store) {
return true
}

// copy value, array positions to preserve them beyond the scope of this callback
value := append([]byte(nil), val...)
arrayPos := append([]uint64(nil), pos...)
Expand Down Expand Up @@ -612,6 +614,8 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field

var dict segment.TermDictionary
var err error

// Skip fields that are supposed to have no indexing
if info, ok := is.updatedFields[field]; ok &&
(info.Index || info.All) {
dict = nil
Expand All @@ -621,6 +625,7 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field
if err != nil {
return nil, err
}

if dictStats, ok := dict.(segment.DiskStatsReporter); ok {
bytesRead := dictStats.BytesRead()
rv.incrementBytesRead(bytesRead)
Expand Down Expand Up @@ -765,6 +770,7 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
}
}

// Filter out fields that are supposed to have no doc values
var filteredFields []string
for _, field := range vFields {
if info, ok := is.updatedFields[field]; ok &&
Expand Down Expand Up @@ -797,8 +803,8 @@ func (is *IndexSnapshot) documentVisitFieldTermsOnSegment(
}
}

if ssvOk && ssv != nil && len(vFields) > 0 {
dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs)
if ssvOk && ssv != nil && len(filteredFields) > 0 {
dvs, err = ssv.VisitDocValues(localDocNum, filteredFields, visitor, dvs)
if err != nil {
return nil, nil, err
}
Expand Down
124 changes: 105 additions & 19 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ type indexImpl struct {

const storePath = "store"

var mappingInternalKey = []byte("_mapping")

const SearchQueryStartCallbackKey = "_search_query_start_callback_key"
const SearchQueryEndCallbackKey = "_search_query_end_callback_key"

Expand Down Expand Up @@ -129,7 +127,7 @@ func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string,
if err != nil {
return nil, err
}
err = rv.i.SetInternal(mappingInternalKey, mappingBytes)
err = rv.i.SetInternal(scorch.MappingInternalKey, mappingBytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,25 +162,110 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
storeConfig = map[string]interface{}{}
}

storeConfig["path"] = indexStorePath(path)
storeConfig["create_if_missing"] = false
storeConfig["error_if_exists"] = false
for rck, rcv := range runtimeConfig {
storeConfig[rck] = rcv
}

// open the index
indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
if indexTypeConstructor == nil {
return nil, ErrorUnknownIndexType
}

rv.i, err = indexTypeConstructor(rv.meta.Storage, storeConfig, Config.analysisQueue)
if err != nil {
return nil, err
}
err = rv.i.Open()
if err != nil {
return nil, err
}
defer func(rv *indexImpl) {
if !rv.open {
rv.i.Close()
}
}(rv)

// now load the mapping
indexReader, err := rv.i.Reader()
if err != nil {
return nil, err
}
defer func() {
if cerr := indexReader.Close(); cerr != nil && err == nil {
err = cerr
}
}()

mappingBytes, err := indexReader.GetInternal(scorch.MappingInternalKey)
if err != nil {
return nil, err
}

var im *mapping.IndexMappingImpl
err = util.UnmarshalJSON(mappingBytes, &im)
if err != nil {
return nil, fmt.Errorf("error parsing mapping JSON: %v\nmapping contents:\n%s", err, string(mappingBytes))
}

// mark the index as open
rv.mutex.Lock()
defer rv.mutex.Unlock()
rv.open = true

// validate the mapping
err = im.Validate()
if err != nil {
// note even if the mapping is invalid
// we still return an open usable index
return rv, err
}

rv.m = im
indexStats.Register(rv)
return rv, err
}

func updateIndexUsing(path string, runtimeConfig map[string]interface{}, newParams string) (rv *indexImpl, err error) {
rv = &indexImpl{
path: path,
name: path,
}
rv.stats = &IndexStat{i: rv}

rv.meta, err = openIndexMeta(path)
if err != nil {
return nil, err
}

// backwards compatibility if index type is missing
if rv.meta.IndexType == "" {
rv.meta.IndexType = upsidedown.Name
}

storeConfig := rv.meta.Config
if storeConfig == nil {
storeConfig = map[string]interface{}{}
}

var um *mapping.IndexMappingImpl
var umBytes []byte

if len(newParams) == 0 {
return nil, fmt.Errorf(("updated mapping is empty"))
}

err = util.UnmarshalJSON([]byte(newParams), &um)
if err != nil {
return nil, fmt.Errorf("error parsing updated mapping JSON: %v\nmapping contents:\n%s", err, newParams)
}

storeConfig["path"] = indexStorePath(path)
storeConfig["create_if_missing"] = false
storeConfig["error_if_exists"] = false
for rck, rcv := range runtimeConfig {
if rck == "mapping" {
if val, ok := rcv.([]byte); ok {
err = util.UnmarshalJSON(val, &um)
if err != nil {
return nil, fmt.Errorf("error parsing updated mapping JSON: %v\nmapping contents:\n%s", err, val)
}
umBytes = val
} else {
return nil, fmt.Errorf("error typecasting updated mapping JSON\nmapping contents: %v", rcv)
}
continue
}
storeConfig[rck] = rcv
}

Expand All @@ -196,6 +279,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
if err != nil {
return nil, err
}

err = rv.i.Open()
if err != nil {
return nil, err
Expand All @@ -217,7 +301,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
}
}()

mappingBytes, err := indexReader.GetInternal(mappingInternalKey)
mappingBytes, err := indexReader.GetInternal(scorch.MappingInternalKey)
if err != nil {
return nil, err
}
Expand All @@ -241,6 +325,7 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
return rv, err
}

// Validate and update the index with the new mapping
if um != nil {
ui, ok := rv.i.(index.UpdateIndex)
if !ok {
Expand All @@ -252,15 +337,16 @@ func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *inde
return rv, err
}

fieldInfo, err := deletedFields(im, um)
fieldInfo, err := DeletedFields(im, um)
if err != nil {
return rv, err
}

err = ui.UpdateFields(fieldInfo, umBytes)
err = ui.UpdateFields(fieldInfo, []byte(newParams))
if err != nil {
return rv, err
}
im = um
}

rv.m = im
Expand Down
Loading

0 comments on commit 62ad148

Please sign in to comment.