Skip to content

Commit

Permalink
[ZODB4] Backport the way MVCC is handled from ZODB5
Browse files Browse the repository at this point in the history
This backports to ZODB4 Connection ZODB5's approach to handle MVCC via
always calling storage.loadBefore() instead of "load for latest version
+ loadBefore if we were notified of database changes" approach.

Why?
----

Short answer: because Wendelin.core 2 needs to know at which particular
database state application-level ZODB connection is viewing the
database, and it is hard to implement such functionality correctly
without this backport. Please see appendix for the explanation.

What
----

This backports to ZODB4 the minimum necessary part of upstream commit 227953b
(Simplify MVCC by determining transaction start time using lastTransaction) +
follow-up correctness fixes:

zopefoundation#50
zopefoundation#56
zopefoundation#291
zopefoundation#307

In short:

- a Connection is always opened with explicitly corresponding to a particular database revision
- Connection uses only loadBefore with that revision to load objects
- every time a Connection is (re)opened, the result of queued invalidations and
  explicit query to storage.lastTransaction is carefully merged to refresh
  Connection's idea about which database state it corresponds to.

The "careful" in last point is important. Historically ZODB5 was first reworked
in commit 227953b (zopefoundation#56) to always
call lastTransaction to refresh state of Connection view. Since there
was no proper synchronisation with respect to process of handling
invalidations, that lead to data corruption issue due to race in
Connection.open() vs invalidations:

zopefoundation#290

That race and data corruption was fixed in commit b5895a5
(zopefoundation#291) by way of avoiding
lastTransaction call and relying only on invalidations channel when
refreshing Connection view.

This fix in turn led to another data corruption issue because in
presence of client-server reconnections, ZODB storage drivers can partly
skip notifying client with detailed invalidation messages:

zopefoundation#291 (comment)

A fix to that issue (zopefoundation#307)
proposed to change back to query storage for lastTransaction on every
Connection refresh, but to implement careful merging of lastTransaction
result and data from invalidation channel. However it was found that the
"careful merging" can work correctly only if we require from storage
drivers a particular ordering of invalidation events wrt lastTransaction
return and result:

zopefoundation#307 (comment)

While ZEO was already complying with that requirements, NEO had to be
fixed to support that:

zopefoundation#307 (comment)
https://lab.nexedi.com/nexedi/neoppod/commit/a7d101ec
https://lab.nexedi.com/nexedi/neoppod/commit/96a5c01f

Patch details
-------------

We change Connection._txn_time to be a "before" for the database state
to which Connection view corresponds. This state is hooked to be
initialized and updated in Connection._flush_invalidations - the
function that is called from both explicit Connection (re)open and at
transaction boundaries via Connection.afterCompletion hook.

Objects loading is factored into Connection._load which replaces old
"load + check invalidated + fallback to loadBefore" game in
Connection._setstate.

Connection.open now calls Connection._flush_invalidations
unconditionally - even if it was global cache reset event - because
besides invalidation flushes the latter is now responsible for querying
storage lastTransaction.

TmpStore - a "storage" that provides runtime support for savepoints - is
refactored correspondingly to delegate loading of original objects back
to underlying Connection.

DB.close is modified - similarly to ZODB5 - to release DB's Connections
carefully with preventing connections from DB poll from implicitly
starting new transactions via afterCompletion hook.

ZODB.nxd_patches is introduced to indicate to client software that this
particular patch is present and can be relied upon.

Tests are updated correspondingly. In 227953b Jim talks about
converting many tests - because

	"Lots of tests didn't clean up databases and connections properly"

and because new MVCC approach

	"makes database and connection hygiene a bit more important,
	especially for tests, because a connection will continue to interact
	with storages if it isn't properly closed, which can lead to errors if
	the storage is closed."

but finally implementing automatic cleanup at transaction boundaries
because there are too many tests to fix. We backport only automatic
cleanup + necessary explicit test fixes to keep the diff minimal.

All tests pass. This includes tests for ZODB itself, ZEO and NEO test
over hereby modified ZODB(*), my test programs from

zopefoundation#290	and
zopefoundation/ZEO#155

and ERP5 tests. Upcoming wendelin.core 2 also work with this change.

(*) ZEO, NEO and ERP5 tests fail sometimes, but there is no regression
here because ZEO, NEO and ERP5 tests are failing regularly, and in the
same way, even with unmodified ZODB.

Appendix. zconn_at
------------------

This appendix provides motivation for the backport:

For wendelin.core v2 we need a way to know at which particular database
state application-level ZODB connection is viewing the database. Knowing
that state, WCFS client library interacts with WCFS filesystem server
and, in simple terms, requests the server to provide data as of that
particular database state. Let us call the function that for a client
ZODB connection returns database state corresponding to its database
view zconn_at.

Now here is the problem: zconn_at is relatively easy to implement for
ZODB5 - see e.g. here:

https://lab.nexedi.com/nexedi/wendelin.core/blob/v0.13-54-ga6a8f5b/lib/zodb.py#L142-181
https://lab.nexedi.com/nexedi/wendelin.core/commit/3bd82127

however, for ZODB4, since its operational models is not
directly MVCC, it is not that straightforward. Still, even for older
ZODB4, for every client connection, there _is_ such at that corresponds
to that connection view of the database.

We need ZODB4 support, because ZODB4 is currently the version that
Nexedi uses, and my understanding is that it will stay like this for not
a small time. I have the feeling that ZODB5 was reworked in better
direction, but without caring enough about quality which resulted in
concurrency bugs with data corruption effects like

zopefoundation#290
zopefoundation/ZEO#155
etc.

Even though the first one is now fixed (but it broke other parts and so
both ZODB had to be fixed again _and_ NEO had to be fixed for that ZODB
fix to work currently), I feel that upgrading into ZODB5 for Nexedi will
require some non-negligible amount of QA work, and thus it is better if
we move step-by-step - even if we eventually upgrade to ZODB5 - it is
better we first migrate wendelin.core 1 -> wendelin.core 2 with keeping
current version of ZODB.

Now please note what would happen if zconn_at gives, even a bit, wrong
answer: wcfs client will ask wcfs server to provide array data as of
different database state compared to current on-client ZODB connection.
This will result in that data accessed via ZBigArray will _not_
correspond to all other data accessed via regular ZODB mechanism.
It is, in other words, a data corruptions.
In some scenarios it can be harmless, but generally it is the worst
that can happen to a database.

It is good to keep in mind ZODB issue290 when imagining corner cases
that zconn_at has to deal with. Even though that issue is ZODB5 only, it
shows what kind of bugs it can be in zconn_at implementation for ZODB4.

Just for the reference: in Wendelin-based systems there is usually constant
stream of database ingestions coming from many places simultaneously. Plus many
activities crunching on the DB at the same time as well. And the more clients a
system handles, the more there will be level-of-concurrency increase. This
means that the problem of correctly handling concurrency issues in zconn_at is
not purely theoretical, but has direct relation to our systems.

--------

With this backport, zconn_at for ZODB4 becomes trivial and robust to implement:

https://lab.nexedi.com/kirr/wendelin.core/blob/484071b3/lib/zodb.py#L183-195

I would like to thank Joshua Wölfel whose internship helped this topic
to shape up:

https://www.erp5.com/project_section/wendelin-ia-project/forum/Joshua-internship-D8b7NNhWfz

/cc @Nexedi, @jwolf083
Signed-off-by: Kirill Smelkov <[email protected]>
  • Loading branch information
navytux committed Nov 30, 2020
1 parent 4011637 commit 8e7eab3
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 204 deletions.
156 changes: 65 additions & 91 deletions src/ZODB/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys
import tempfile
import threading
import traceback
import warnings
import os
import time
Expand Down Expand Up @@ -199,10 +200,12 @@ def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
# _conflicts).
self._conflicts = {}

# _ltid stores last transaction received via invalidate from storage.
self._ltid = None

# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
# written before _txn_time.
self._txn_time = None

# To support importFile(), implemented in the ExportImport base
Expand Down Expand Up @@ -238,6 +241,23 @@ def add(self, obj):
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)

# _load loads object data according to connection view of the database.
def _load(self, oid):
return self._loadFrom(oid, self._storage)
def _loadFrom(self, oid, storage): # -> (data, serial) | POSKeyError | ReadConflicError
if self._mvcc_storage:
data, serial = storage.load(oid)
else:
# NOTE: loadBefore raises POSKeyError if oid is recorded as deleted as of <._txn_time
r = storage.loadBefore(oid, self._txn_time)
if r is None:
# oid is not present at all os of <._txn_time
raise ReadConflictError(oid)
data, serial, _ = r

self._load_count += 1
return data, serial

def get(self, oid):
"""Return the persistent object with oid 'oid'."""
if self.opened is None:
Expand All @@ -253,7 +273,7 @@ def get(self, oid):
if obj is not None:
return obj

p, serial = self._storage.load(oid, '')
p, serial = self._load(oid)
obj = self._reader.getGhost(p)

# Avoid infiniate loop if obj tries to load its state before
Expand Down Expand Up @@ -358,14 +378,16 @@ def invalidate(self, tid, oids):
if self.before is not None:
# This is a historical connection. Invalidations are irrelevant.
return
if tid is None: # ZEO can still call invalidate(tid=None) from ClientStorage.finish_verification
self._log.warning("invalidate(tid=None) called:\n%s" % ''.join(traceback.format_stack()))
self.invalidateCache()
return
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
elif (tid is not None) and (tid < self._txn_time):
if tid < self._ltid:
raise AssertionError("invalidations out of order, %r < %r"
% (tid, self._txn_time))

% (tid, self._ltid))
self._ltid = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
Expand Down Expand Up @@ -500,7 +522,7 @@ def _tpc_cleanup(self):
self._registered_objects = []
self._creating.clear()

# Process pending invalidations.
# Process pending invalidations and query storage for lastTransaction.
def _flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
Expand All @@ -512,7 +534,19 @@ def _flush_invalidations(self):
elif invalidated:
self._cache.invalidate(invalidated)

self._inv_lock.acquire()
self._inv_lock.acquire()

else:
# Storage implementations don't always call invalidate() when
# the last TID changes, e.g. after network reconnection,
# so we still have to poll.
ltid = self._storage.lastTransaction()
# But at this precise moment, a transaction may be committed and
# we have already received the new tid, along with invalidations.
self._inv_lock.acquire()
# So we must pick the greatest value.
self._txn_time = p64(u64(max(ltid, self._ltid)) + 1)

try:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
Expand Down Expand Up @@ -544,7 +578,6 @@ def _flush_invalidations(self):

invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
Expand Down Expand Up @@ -920,16 +953,6 @@ def _setstate(self, obj, oid):
# as a performance optimization for the pure-Python persistent implementation
# where accessing an attribute involves __getattribute__ calls

# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.

# If an object has been invalidated, among the cases to consider:
# - Try MVCC
# - Raise ConflictError.

if self.before is not None:
# Load data that was current before the time we have.
before = self.before
Expand All @@ -939,29 +962,7 @@ def _setstate(self, obj, oid):
p, serial, end = t

else:
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.

if self._invalidatedCache:
raise ReadConflictError()

if (oid in self._invalidated):
self._load_before_or_conflict(obj)
return

p, serial = self._storage.load(oid, '')
self._load_count += 1

self._inv_lock.acquire()
try:
invalid = oid in self._invalidated
finally:
self._inv_lock.release()

if invalid:
self._load_before_or_conflict(obj)
return
p, serial = self._load(oid)

self._reader.setGhostState(obj, p)
obj._p_serial = serial
Expand All @@ -973,43 +974,6 @@ def _setstate(self, obj, oid):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(oid, serial)

def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)

def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.

assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start

# MVCC Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)

return True

def register(self, obj):
"""Register obj with the current transaction manager.
Expand Down Expand Up @@ -1094,8 +1058,7 @@ def open(self, transaction_manager=None, delegate=True):
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
self._flush_invalidations()

transaction_manager.registerSynch(self)

Expand Down Expand Up @@ -1170,7 +1133,7 @@ def exchange(self, old, new):

def savepoint(self):
if self._savepoint_storage is None:
tmpstore = TmpStore(self._normal_storage)
tmpstore = TmpStore(self)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage

Expand Down Expand Up @@ -1234,7 +1197,7 @@ def _commit_savepoint(self, transaction):
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
self.invalidate(None, (oid, ))
self._cache.invalidate(oid)
else:
s = self._storage.store(oid, serial, data,
'', transaction)
Expand Down Expand Up @@ -1292,13 +1255,14 @@ class TmpStore:
"""A storage-like thing to support savepoints."""


def __init__(self, storage):
self._storage = storage
def __init__(self, conn):
self._conn = conn
self._storage = conn._normal_storage
for method in (
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
'getName', 'new_oid', 'getSize', 'sortKey',
'isReadOnly'
):
setattr(self, method, getattr(storage, method))
setattr(self, method, getattr(self._storage, method))

self._file = tempfile.TemporaryFile(prefix='TmpStore')
# position: current file position
Expand All @@ -1319,10 +1283,13 @@ def close(self):
remove_committed_dir(self._blob_dir)
self._blob_dir = None

def load(self, oid, version):
def _load(self, oid):
return self._conn._loadFrom(oid, self._storage)

def load(self, oid, version=''):
pos = self.index.get(oid)
if pos is None:
return self._storage.load(oid, '')
return self._load(oid)
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
Expand All @@ -1334,6 +1301,13 @@ def load(self, oid, version):
serial = h[:8]
return self._file.read(size), serial

def loadBefore(self, oid, before):
if before != self._conn._txn_time:
raise ValueError('TmpStore.loadBefore called with before != conn._txn_time')

p, serial = self.load(oid)
return p, serial, None # NOTE next_serial is ignored by caller

def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
# commit logic
Expand Down
23 changes: 18 additions & 5 deletions src/ZODB/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,25 @@ def close(self):
noop = lambda *a: None
self.close = noop

# go over all connections and prepare them to handle last txn.abort()
txn_managers = set() # of conn.transaction_manager
@self._connectionMap
def _(c):
if c.transaction_manager is not None:
c.transaction_manager.abort()
c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources()
def _(conn):
if conn.transaction_manager is not None:
for c in six.itervalues(conn.connections):
# Prevent connections from implicitly starting new
# transactions.
c.afterCompletion = c.newTransaction = noop
txn_managers.add(conn.transaction_manager)
conn.close = noop
conn._release_resources()

# abort transaction managers for all above connections
# call txn.abort only after all connections are prepared, as else - if
# we call txn.abort() above - some connections could be not yet
# prepared and with still active afterCompletion callback.
for transaction_manager in txn_managers:
transaction_manager.abort()

self.storage.close()
del self.storage
Expand Down
12 changes: 12 additions & 0 deletions src/ZODB/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,15 @@
del mapping, list, sys

from ZODB.DB import DB, connection

# set of changes backported by Nexedi.
nxd_patches = {
# Rework Connection MVCC implementation to always call
# storage.loadBefore(zconn._txn_time) to load objects.
# storage.load() is no longer called at all.
# https://github.com/zopefoundation/ZODB/issues/50
# https://github.com/zopefoundation/ZODB/pull/56
# https://github.com/zopefoundation/ZODB/pull/307
# ...
'conn:MVCC-via-loadBefore-only',
}
2 changes: 1 addition & 1 deletion src/ZODB/tests/PackableStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,5 +776,5 @@ def setup(test):

return doctest.DocFileSuite(
'IExternalGC.test',
setUp=setup, tearDown=zope.testing.setupstack.tearDown,
setUp=setup, tearDown=ZODB.tests.util.tearDown,
checker=ZODB.tests.util.checker)
8 changes: 4 additions & 4 deletions src/ZODB/tests/dbopen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ While it's boring, it's important to verify that the same relationships
hold if the default pool size is overridden.

>>> handler.clear()
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> PS = 2 # smaller pool size
>>> db = DB(st, pool_size=PS)
Expand Down Expand Up @@ -117,7 +117,7 @@ We can change the pool size on the fly:
Enough of that.

>>> handler.clear()
>>> st.close()
>>> db.close()

More interesting is the stack-like nature of connection reuse. So long as
we keep opening new connections, and keep them alive, all connections
Expand Down Expand Up @@ -256,7 +256,7 @@ Nothing in that last block should have logged any msgs:
If "too many" connections are open, then closing one may kick an older
closed one out of the available connection stack.

>>> st.close()
>>> db.close()
>>> st = Storage()
>>> db = DB(st, pool_size=3)
>>> conns = [db.open() for dummy in range(6)]
Expand Down Expand Up @@ -324,7 +324,7 @@ gc to reclaim the Connection and its cache eventually works, but that can
take "a long time" and caches can hold on to many objects, and limited
resources (like RDB connections), for the duration.

>>> st.close()
>>> db.close()
>>> st = Storage()
>>> db = DB(st, pool_size=2)
>>> conn0 = db.open()
Expand Down
Loading

0 comments on commit 8e7eab3

Please sign in to comment.