From 7978a12bf6cab2c8e0dba353c8fea31434391021 Mon Sep 17 00:00:00 2001 From: Jeronimo Irazabal Date: Fri, 3 Nov 2023 00:21:58 +0100 Subject: [PATCH] fix(embedded/store): handle key mapping in ongoing txs Signed-off-by: Jeronimo Irazabal --- embedded/store/ongoing_tx.go | 52 ++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/embedded/store/ongoing_tx.go b/embedded/store/ongoing_tx.go index dfe7221558..c077464b44 100644 --- a/embedded/store/ongoing_tx.go +++ b/embedded/store/ongoing_tx.go @@ -275,30 +275,42 @@ func (tx *OngoingTx) set(key []byte, md *KVMetadata, value []byte, hashValue [sh IsValueTruncated: isValueTruncated, } - // updates are not needed because valueRef are resolved with the "interceptor" - if !tx.IsWriteOnly() && !isKeyUpdate && (md == nil || !md.NonIndexable()) { - // vLen=0 + vOff=0 + vHash=0 + txmdLen=0 + kvmdLen=0 - var indexedValue [lszSize + offsetSize + sha256.Size + sszSize + sszSize]byte + // vLen=0 + vOff=0 + vHash=0 + txmdLen=0 + kvmdLen=0 + var indexedValue [lszSize + offsetSize + sha256.Size + sszSize + sszSize]byte - tx.st.indexersMux.RLock() - defer tx.st.indexersMux.RUnlock() + tx.st.indexersMux.RLock() + defer tx.st.indexersMux.RUnlock() - for _, indexer := range tx.st.indexers { - if !hasPrefix(key, indexer.SourcePrefix()) || (!isTransient && indexer.spec.SourceEntryMapper != nil) { - continue - } + for _, indexer := range tx.st.indexers { + if isTransient && !hasPrefix(key, indexer.TargetPrefix()) { + continue + } + + if !isTransient && (!hasPrefix(key, indexer.SourcePrefix()) || indexer.spec.SourceEntryMapper != nil) { + continue + } + + var targetKey []byte + if isTransient { + targetKey = key + } else { // map the key, get the snapshot for mapped key, set sourceKey, err := mapKey(key, value, indexer.spec.SourceEntryMapper) if err != nil { return err } - targetKey, err := mapKey(sourceKey, value, indexer.spec.TargetEntryMapper) + targetKey, err = mapKey(sourceKey, value, indexer.spec.TargetEntryMapper) if err != nil { return err } + } + isIndexable := md == nil || !md.NonIndexable() + + // updates are not needed because valueRef are resolved with the "interceptor" + if !tx.IsWriteOnly() && !isKeyUpdate && isIndexable { snap, err := tx.snap(targetKey) if err != nil { return err @@ -308,17 +320,17 @@ func (tx *OngoingTx) set(key []byte, md *KVMetadata, value []byte, hashValue [sh if err != nil { return err } + } - if !bytes.Equal(key, targetKey) { - tkid := sha256.Sum256(targetKey) + if !bytes.Equal(key, targetKey) { + kid := sha256.Sum256(targetKey) + keyRef, isKeyUpdate := tx.entriesByKey[kid] - if isTransient { - tx.transientEntries[len(tx.entriesByKey)] = e - tx.entriesByKey[tkid] = len(tx.entriesByKey) - } else { - tx.entries = append(tx.entries, e) - tx.entriesByKey[tkid] = len(tx.entries) - 1 - } + if isKeyUpdate { + tx.transientEntries[keyRef] = e + } else { + tx.transientEntries[len(tx.entriesByKey)] = e + tx.entriesByKey[kid] = len(tx.entriesByKey) } } }