Skip to content

Commit

Permalink
[IMP] connector_extension: run of exporter refactor. New decorator at…
Browse files Browse the repository at this point in the history
…omic created to lock relation to do the _run logic in a different transaction
  • Loading branch information
KNVx committed Oct 19, 2023
1 parent ceaab5c commit fa436b6
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 71 deletions.
86 changes: 86 additions & 0 deletions connector_extension/common/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright NuoBiT Solutions - Kilian Niubo <[email protected]>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import hashlib
import logging
import struct

_logger = logging.getLogger(__name__)


def get_int_lock(lock):
hasher = hashlib.sha1(str(lock).encode())

Check warning on line 12 in connector_extension/common/database.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/common/database.py#L12

Added line #L12 was not covered by tests
# pg_lock accepts an int8 so we build an hash composed with
# contextual information and we throw away some bits
return struct.unpack("q", hasher.digest()[:8])

Check warning on line 15 in connector_extension/common/database.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/common/database.py#L15

Added line #L15 was not covered by tests


def session_pg_try_advisory_lock(env, lock):
"""Try to acquire a Postgres session advisory lock.
The function tries to acquire a lock, returns a boolean indicating
if it could be obtained or not. An acquired lock is released at the
advisory unlock.
A typical use is to acquire a lock at the beginning of an importer
to prevent 2 jobs to do the same import at the same time. Since the
record doesn't exist yet, we can't put a lock on a record, so we put
an advisory lock.
Example:
- Job 1 imports Partner A
- Job 2 imports Partner B
- Partner A has a category X which happens not to exist yet
- Partner B has a category X which happens not to exist yet
- Job 1 import category X as a dependency
- Job 2 import category X as a dependency
Since both jobs are executed concurrently, they both create a record
for category X so we have duplicated records. With this lock:
- Job 1 imports Partner A, it acquires a lock for this partner
- Job 2 imports Partner B, it acquires a lock for this partner
- Partner A has a category X which happens not to exist yet
- Partner B has a category X which happens not to exist yet
- Job 1 import category X as a dependency, it acquires a lock for
this category
- Job 2 import category X as a dependency, try to acquire a lock
but can't, Job 2 is retried later, and when it is retried, it
sees the category X created by Job 1.
The lock is acquired until the end of the transaction.
Usage example:
::
lock_name = 'import_record({}, {}, {}, {})'.format(
self.backend_record._name,
self.backend_record.id,
self.model._name,
self.external_id,
)
if pg_try_advisory_lock(lock_name):
# do sync
else:
raise RetryableJobError('Could not acquire advisory lock',
seconds=2,
ignore_retry=True)
:param env: the Odoo Environment
:param lock: The lock name. Can be anything convertible to a
string. It needs to represents what should not be synchronized
concurrently so usually the string will contain at least: the
action, the backend type, the backend id, the model name, the
external id
:return True/False whether lock was acquired.
"""
int_lock = get_int_lock(lock)
env.cr.execute("SELECT pg_try_advisory_lock(%s);", (int_lock))
return env.cr.fetchone()[0]

Check warning on line 80 in connector_extension/common/database.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/common/database.py#L78-L80

Added lines #L78 - L80 were not covered by tests


def session_pg_advisory_unlock(env, lock):
int_lock = get_int_lock(lock)
env.cr.execute("SELECT pg_advisory_unlock(%s);", (int_lock))
return env.cr.fetchone()[0]

Check warning on line 86 in connector_extension/common/database.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/common/database.py#L84-L86

Added lines #L84 - L86 were not covered by tests
1 change: 1 addition & 0 deletions connector_extension/components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from . import adapter
from . import binder
from . import core
from . import exporter
from . import importer
from . import mapper
36 changes: 18 additions & 18 deletions connector_extension/components/binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,24 +296,24 @@ def bind_export(self, external_data, relation):
relation_id = relation

external_id = self.dict2id(external_data, in_field=False)
with self._retry_unique_violation():
binding = self.model.with_context(connector_no_export=True).create(
{
self._backend_field: self.backend_record.id,
self._odoo_field: relation_id,
self._sync_date_field: fields.Datetime.now(),
**self.id2dict(external_id, in_field=True),
**self._additional_external_binding_fields(external_data),
}
)
# Eager commit to avoid having 2 jobs
# exporting at the same time. The constraint
# will pop if an other job already created
# the same binding. It will be caught and
# raise a RetryableJobError.
if not odoo.tools.config["test_enable"]:
self.env.cr.commit() # pylint: disable=E8102
return binding
binding = self.model.with_context(connector_no_export=True).create(

Check warning on line 299 in connector_extension/components/binder.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/binder.py#L299

Added line #L299 was not covered by tests
{
self._backend_field: self.backend_record.id,
self._odoo_field: relation_id,
self._sync_date_field: fields.Datetime.now(),
**self.id2dict(external_id, in_field=True),
**self._additional_external_binding_fields(external_data),
}
)

# Eager commit to avoid having 2 jobs
# exporting at the same time. The constraint
# will pop if an other job already created
# the same binding. It will be caught and
# raise a RetryableJobError.
if not odoo.tools.config["test_enable"]:
self.env.cr.commit() # pylint: disable=E8102
return binding

Check warning on line 316 in connector_extension/components/binder.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/binder.py#L315-L316

Added lines #L315 - L316 were not covered by tests

@api.model
def _additional_external_binding_fields(self, external_data):
Expand Down
53 changes: 53 additions & 0 deletions connector_extension/components/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright NuoBiT Solutions - Kilian Niubo <[email protected]>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from odoo.addons.component.core import AbstractComponent
from odoo.addons.queue_job.exception import RetryableJobError

from ..common.database import session_pg_advisory_unlock, session_pg_try_advisory_lock


class BaseConnectorComponent(AbstractComponent):
_inherit = "base.connector"

def session_advisory_lock_or_retry(self, lock, retry_seconds=1):
"""Acquire a Postgres session advisory lock or retry job
When the lock cannot be acquired, it raises a
:exc:`odoo.addons.queue_job.exception.RetryableJobError` so the job
is retried after n ``retry_seconds``.
Usage example:
.. code-block:: python
lock_name = 'import_record({}, {}, {}, {})'.format(
self.backend_record._name,
self.backend_record.id,
self.model._name,
self.external_id,
)
self.session_advisory_lock_or_retry(lock_name, retry_seconds=2)
See :func:`odoo.addons.connector.connector.session_pg_try_advisory_lock` for
details.
:param lock: The lock name. Can be anything convertible to a
string. It needs to represent what should not be synchronized
concurrently, usually the string will contain at least: the
action, the backend name, the backend id, the model name, the
external id
:param retry_seconds: number of seconds after which a job should
be retried when the lock cannot be acquired.
"""
if not session_pg_try_advisory_lock(self.env, lock):
raise RetryableJobError(

Check warning on line 43 in connector_extension/components/core.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/core.py#L43

Added line #L43 was not covered by tests
"Could not acquire advisory lock",
seconds=retry_seconds,
ignore_retry=True,
)

def session_pg_advisory_unlock(
self,
lock,
):
return session_pg_advisory_unlock(self.env, lock)

Check warning on line 53 in connector_extension/components/core.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/core.py#L53

Added line #L53 was not covered by tests
124 changes: 71 additions & 53 deletions connector_extension/components/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import psycopg2

from odoo import _, fields
import odoo
from odoo import _, api, fields

from odoo.addons.component.core import AbstractComponent
from odoo.addons.connector.exception import RetryableJobError
Expand All @@ -25,56 +26,82 @@ class GenericDirectExporter(AbstractComponent):
def _mapper_options(self, binding):
return {"binding": binding}

def run(self, relation, internal_fields=None):
"""Run the synchronization
:param binding: binding record to export
"""
now_fmt = fields.Datetime.now()
def _get_lock_name(self, relation):
lock_name = "export_record({}, {}, {}, {})".format(

Check warning on line 30 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L30

Added line #L30 was not covered by tests
self.backend_record._name,
self.backend_record.id,
relation._name,
relation.id,
)
return lock_name

Check warning on line 36 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L36

Added line #L36 was not covered by tests

def atomic(func): # noqa: B902
def wrapper(self, now_fmt, relation, always, internal_fields):
lock_name = self._get_lock_name(relation)
self.session_advisory_lock_or_retry(lock_name)
try:

Check warning on line 42 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L40-L42

Added lines #L40 - L42 were not covered by tests
with odoo.registry(self.env.cr.dbname).cursor() as new_cr:
new_env = api.Environment(new_cr, self.env.uid, self.env.context)
new_backend_record = new_env[self.backend_record._name].browse(

Check warning on line 45 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L44-L45

Added lines #L44 - L45 were not covered by tests
self.backend_record.id
)
with new_backend_record.work_on(self.model._name) as work:
new_self = work.component(self._usage)
result = func(

Check warning on line 50 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L49-L50

Added lines #L49 - L50 were not covered by tests
new_self, now_fmt, relation, always, internal_fields
)
finally:
self.session_pg_advisory_unlock(lock_name)
return result

Check warning on line 55 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L54-L55

Added lines #L54 - L55 were not covered by tests

return wrapper

@atomic
def _run(self, now_fmt, relation, always, internal_fields):
result = None
# get binding from real record
binding = self.binder_for().wrap_record(relation)

# if not binding, try to link to existing external record with
# the same alternate key and create/update binding
if not binding:
binding = (
self.binder_for().to_binding_from_internal_key(relation) or binding
)

if not binding:
internal_fields = None # should be created with all the fields
if self._has_to_skip(binding, relation):
return _("Nothing to export")

# export the missing linked resources
self._export_dependencies(relation)

# prevent other jobs to export the same record
# will be released on commit (or rollback)
self._lock(relation)

map_record = self.mapper.map_record(relation)

# passing info to the mapper
opts = self._mapper_options(binding)
if binding:
values = self._update_data(map_record, fields=internal_fields, **opts)
if values:
external_id = self.binder_for().dict2id(binding, in_field=True)
result = self._update(external_id, values)
else:
values = self._create_data(map_record, fields=internal_fields, **opts)
if values:
external_data = self._create(values)
binding = self.binder_for().bind_export(external_data, relation)
if not values:
result = _("Nothing to export")
if not result:
result = _("Record exported with ID %s on Backend.") % "external_id"
self._after_export()
binding[self.binder_for()._sync_date_field] = now_fmt
return result
if always or not binding:
if binding:
values = self._update_data(map_record, fields=internal_fields, **opts)

Check warning on line 76 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L76

Added line #L76 was not covered by tests
if values:
external_id = self.binder_for().dict2id(binding, in_field=True)
result = self._update(external_id, values)

Check warning on line 79 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L78-L79

Added lines #L78 - L79 were not covered by tests
else:
values = self._create_data(map_record, fields=internal_fields, **opts)

Check warning on line 81 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L81

Added line #L81 was not covered by tests
if values:
external_data = self._create(values)
binding = self.binder_for().bind_export(external_data, relation)

Check warning on line 84 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L83-L84

Added lines #L83 - L84 were not covered by tests
if not values:
result = _("Nothing to export")

Check warning on line 86 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L86

Added line #L86 was not covered by tests
if not result:
result = _("Record exported with ID %s on Backend.") % "external_id"
self._after_export()
binding[self.binder_for()._sync_date_field] = now_fmt
return result

Check warning on line 91 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L88-L91

Added lines #L88 - L91 were not covered by tests

def run(self, relation, always=True, internal_fields=None):
"""Run the synchronization
:param binding: binding record to export
"""
now_fmt = fields.Datetime.now()

Check warning on line 98 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L98

Added line #L98 was not covered by tests
if self._has_to_skip(relation):
return _("Nothing to export")
self._export_dependencies(relation)
return self._run(

Check warning on line 102 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L100-L102

Added lines #L100 - L102 were not covered by tests
now_fmt, relation, always=always, internal_fields=internal_fields
)

def _after_export(self):
"""Can do several actions after exporting a record on the backend"""
Expand Down Expand Up @@ -111,10 +138,12 @@ def _lock(self, record):
raise RetryableJobError(
"A concurrent job is already exporting the same record "
"(%s with id %s). The job will be retried later."
% (self.model._name, record.id)
% (self.model._name, record.id),
seconds=5,
ignore_retry=True,
) from e

def _has_to_skip(self, binding, relation):
def _has_to_skip(self, relation):
"""Return True if the export can be skipped"""
return False

Expand Down Expand Up @@ -192,19 +221,8 @@ def _export_dependency(
pass extra values for this binding
:type binding_extra_vals: dict
"""
if not relation:
return

binding = None
if not always:
rel_binder = self.binder_for(binding_model)
binding = rel_binder.wrap_record(relation)
if not binding:
binding = rel_binder.to_binding_from_internal_key(relation)

if always or not binding:
exporter = self.component(usage=component_usage, model_name=binding_model)
exporter.run(relation)
exporter = self.component(usage=component_usage, model_name=binding_model)
exporter.run(relation, always=always)

Check warning on line 225 in connector_extension/components/exporter.py

View check run for this annotation

Codecov / codecov/patch

connector_extension/components/exporter.py#L224-L225

Added lines #L224 - L225 were not covered by tests

def _export_dependencies(self, relation):
"""Export the dependencies for the record"""
Expand Down

0 comments on commit fa436b6

Please sign in to comment.