Skip to content

Commit

Permalink
fix(embedded/store): handle key mapping in ongoing txs
Browse files Browse the repository at this point in the history
Signed-off-by: Jeronimo Irazabal <[email protected]>
  • Loading branch information
jeroiraz committed Nov 2, 2023
1 parent e055bcf commit 7978a12
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions embedded/store/ongoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,30 +275,42 @@ func (tx *OngoingTx) set(key []byte, md *KVMetadata, value []byte, hashValue [sh
IsValueTruncated: isValueTruncated,
}

// updates are not needed because valueRef are resolved with the "interceptor"
if !tx.IsWriteOnly() && !isKeyUpdate && (md == nil || !md.NonIndexable()) {
// vLen=0 + vOff=0 + vHash=0 + txmdLen=0 + kvmdLen=0
var indexedValue [lszSize + offsetSize + sha256.Size + sszSize + sszSize]byte
// vLen=0 + vOff=0 + vHash=0 + txmdLen=0 + kvmdLen=0
var indexedValue [lszSize + offsetSize + sha256.Size + sszSize + sszSize]byte

tx.st.indexersMux.RLock()
defer tx.st.indexersMux.RUnlock()
tx.st.indexersMux.RLock()
defer tx.st.indexersMux.RUnlock()

for _, indexer := range tx.st.indexers {
if !hasPrefix(key, indexer.SourcePrefix()) || (!isTransient && indexer.spec.SourceEntryMapper != nil) {
continue
}
for _, indexer := range tx.st.indexers {
if isTransient && !hasPrefix(key, indexer.TargetPrefix()) {
continue
}

if !isTransient && (!hasPrefix(key, indexer.SourcePrefix()) || indexer.spec.SourceEntryMapper != nil) {
continue
}

var targetKey []byte

if isTransient {
targetKey = key
} else {
// map the key, get the snapshot for mapped key, set
sourceKey, err := mapKey(key, value, indexer.spec.SourceEntryMapper)
if err != nil {
return err
}

targetKey, err := mapKey(sourceKey, value, indexer.spec.TargetEntryMapper)
targetKey, err = mapKey(sourceKey, value, indexer.spec.TargetEntryMapper)
if err != nil {
return err
}
}

isIndexable := md == nil || !md.NonIndexable()

// updates are not needed because valueRef are resolved with the "interceptor"
if !tx.IsWriteOnly() && !isKeyUpdate && isIndexable {
snap, err := tx.snap(targetKey)
if err != nil {
return err
Expand All @@ -308,17 +320,17 @@ func (tx *OngoingTx) set(key []byte, md *KVMetadata, value []byte, hashValue [sh
if err != nil {
return err
}
}

if !bytes.Equal(key, targetKey) {
tkid := sha256.Sum256(targetKey)
if !bytes.Equal(key, targetKey) {
kid := sha256.Sum256(targetKey)
keyRef, isKeyUpdate := tx.entriesByKey[kid]

if isTransient {
tx.transientEntries[len(tx.entriesByKey)] = e
tx.entriesByKey[tkid] = len(tx.entriesByKey)
} else {
tx.entries = append(tx.entries, e)
tx.entriesByKey[tkid] = len(tx.entries) - 1
}
if isKeyUpdate {
tx.transientEntries[keyRef] = e
} else {
tx.transientEntries[len(tx.entriesByKey)] = e
tx.entriesByKey[kid] = len(tx.entriesByKey)
}
}
}
Expand Down

0 comments on commit 7978a12

Please sign in to comment.