Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): introduce long-running migration system using Celery TASK-1394 #5379

Merged
merged 13 commits into from
Dec 18, 2024
Merged
1 change: 1 addition & 0 deletions dependencies/pip/dev_requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pytest
pytest-cov
pytest-django
pytest-env
freezegun


# Kobocat
Expand Down
3 changes: 3 additions & 0 deletions dependencies/pip/dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ fabric==3.2.2
# via -r dependencies/pip/dev_requirements.in
flower==2.0.1
# via -r dependencies/pip/requirements.in
freezegun==1.5.1
# via -r dependencies/pip/dev_requirements.in
frozenlist==1.4.1
# via
# aiohttp
Expand Down Expand Up @@ -488,6 +490,7 @@ python-dateutil==2.9.0.post0
# -r dependencies/pip/requirements.in
# botocore
# celery
# freezegun
# pandas
# python-crontab
python3-openid==3.2.0
Expand Down
71 changes: 71 additions & 0 deletions kobo/apps/long_running_migrations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Long Running Migrations

This feature allows you to execute long-running migrations using Celery. Each migration will attempt to complete within the maximum time allowed by Celery (see settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT`). If it does not complete within this time, the periodic task will retry and resume the migration from where it left off, continuing until the long-running migration either successfully completes or raises an exception.

## How to Use

1. **Create your migration**
Define your migrations in the `jobs` folder. Each migration should have a unique name, following Django's migration naming convention (e.g., `0001_description`). The migration file must contain a function called `run()`.

2. **Register the migration**
Create a `LongRunningMigration` entry by running:

```python
LongRunningMigration.objects.create(name='0001_sample')
```

You can automate this step by adding it to a Django migration with `RunPython`.

```python
from django.db import migrations


def add_long_running_migration(apps, schema_editor):
LongRunningMigration = apps.get_model('long_running_migrations', 'LongRunningMigration') # noqa
LongRunningMigration.objects.create(
name='0001_sample'
)


def noop(*args, **kwargs):
pass


class Migration(migrations.Migration):

dependencies = [
('long_running_migrations', '0001_initial'),
]

operations = [
migrations.RunPython(add_long_running_migration, noop),
]


```



3. **Execute the migration**
Wait for the periodic task `execute_long_running_migrations` to run automatically or trigger it manually (beware of the lock, it can only run one at a time).


## Writing a good long-running migration

When writing long-running migrations, ensure they are both **atomic** and **tolerant** to interruptions at any point in their execution.

```python
# 2024-10-13
from django.db import transaction

def run():
for foo in Foo.objects.filter(is_barred=False): # Checks actually needs to run still
with transaction.atomic(): # Atomic!
foo.make_it_bar() # Perhaps this does multiple things that could succeed or fail
```

* Notice that if the task is interrupted, it will simply continue in the next run.

* Because tasks are slow, your code should run regardless of when the data migration takes place.

* Add a timestamp to your migration definition to help future developers identify when it can be safely removed (if needed).
Empty file.
8 changes: 8 additions & 0 deletions kobo/apps/long_running_migrations/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.contrib import admin

from .models import LongRunningMigration


@admin.register(LongRunningMigration)
class LongRunningMigrationAdmin(admin.ModelAdmin):
readonly_fields=('date_created', 'date_modified')
8 changes: 8 additions & 0 deletions kobo/apps/long_running_migrations/jobs/0001_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated on 2024-12-18

def run():
"""
Describe your long-running migration
"""

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Generated on 2024-12-18
from django.db.models import Q, OuterRef, Subquery

from kobo.apps.openrosa.apps.logger.models import XForm
from kobo.apps.openrosa.apps.main.models import MetaData
from kpi.models.asset import Asset
from kpi.models.asset_file import AssetFile
from kobo.apps.project_ownership.models import Transfer


def run():
"""
Update OpenRosa MetaData objects that were not updated when project
ownership was transferred to someone else. This fixes a bug introduced
and later addressed in KPI (issue #5365).
"""

# Step 1: Retrieve all assets that were transferred since the bug was present and
# use media files
asset_uids = Asset.objects.filter(
Q(
pk__in=AssetFile.objects.values_list('asset_id', flat=True).exclude(
file_type=AssetFile.PAIRED_DATA
)
)
& Q(
pk__in=Transfer.objects.values_list('asset_id', flat=True).filter(
invite__date_created__date__gte='2024-09-15'
)
)
).values_list('uid', flat=True)

username_subquery = XForm.objects.filter(pk=OuterRef('xform_id')).values(
'user__username'
)[:1]

# Step 2: Iterate through relevant MetaData objects and fix their data_file fields
for metadata in (
MetaData.objects.filter(
xform_id__in=XForm.objects.filter(
kpi_asset_uid__in=list(asset_uids)
),
)
.exclude(
Q(data_file__startswith=Subquery(username_subquery))
| Q(data_file__isnull=True)
| Q(data_file='')
)
.select_related('xform', 'xform__user')
.iterator()
):
data_file = str(metadata.data_file)
old_username, *other_parts = data_file.split('/')
other_parts.insert(0, metadata.xform.user.username)
metadata.data_file = '/'.join(other_parts)
metadata.save(update_fields=['data_file'])
Empty file.
29 changes: 29 additions & 0 deletions kobo/apps/long_running_migrations/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.2.15 on 2024-12-13 19:53
from django.db import migrations, models

import kpi.models.abstract_models


class Migration(migrations.Migration):

initial = True

dependencies = [
]

operations = [
migrations.CreateModel(
name='LongRunningMigration',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('date_created', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)),
('date_modified', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)),
('name', models.CharField(max_length=255, unique=True)),
('status', models.CharField(choices=[('created', 'Created'), ('in_progress', 'In Progress'), ('failed', 'Failed'), ('completed', 'Completed')], default='created', max_length=20)),
('attempts', models.PositiveSmallIntegerField(default=0)),
],
options={
'abstract': False,
},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.2.15 on 2024-12-18 20:00

from django.db import migrations


def add_long_running_migration(apps, schema_editor):
LongRunningMigration = apps.get_model('long_running_migrations', 'LongRunningMigration') # noqa
LongRunningMigration.objects.create(
name='0002_fix_project_ownership_transfer_with_media_files'
)


def noop(*args, **kwargs):
pass


class Migration(migrations.Migration):

dependencies = [
('long_running_migrations', '0001_initial'),
]

operations = [
migrations.RunPython(add_long_running_migration, noop),
]
Empty file.
108 changes: 108 additions & 0 deletions kobo/apps/long_running_migrations/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
from importlib.util import module_from_spec, spec_from_file_location

from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.db import models

from kpi.models.abstract_models import AbstractTimeStampedModel
from kpi.utils.log import logging


class LongRunningMigrationStatus(models.TextChoices):
CREATED = 'created'
IN_PROGRESS = 'in_progress'
FAILED = 'failed'
COMPLETED = 'completed'


class LongRunningMigration(AbstractTimeStampedModel):

LONG_RUNNING_MIGRATIONS_DIR = os.path.join(
'kobo',
'apps',
'long_running_migrations',
'jobs'
)

name = models.CharField(max_length=255, unique=True)
status = models.CharField(
default=LongRunningMigrationStatus.CREATED,
choices=LongRunningMigrationStatus.choices,
max_length=20,
)
attempts = models.PositiveSmallIntegerField(default=0)

def clean(self):
super().clean()
if '..' in self.name or '/' in self.name or '\\' in self.name:
raise SuspiciousOperation(
f"Invalid migration name '{self.name}'. "
f"Migration names cannot contain directory traversal characters "
f"such as '..', '/', or '\\'."
)

def execute(self):
# Skip execution if the migration is already completed
if self.status == LongRunningMigrationStatus.COMPLETED:
return

if not (module := self._load_module()):
return

self.status = LongRunningMigrationStatus.IN_PROGRESS
self.attempts += 1
self.save(update_fields=['status', 'attempts'])

try:
module.run()
except Exception as e:
# Log the error and update the status to 'failed'
logging.error(f'LongRunningMigration.execute(): {str(e)}')
self.status = LongRunningMigrationStatus.FAILED
self.save(update_fields=['status'])
return

self.status = LongRunningMigrationStatus.COMPLETED
self.save(update_fields=['status'])

def save(self, **kwargs):

self.clean()

if self._state.adding:
file_path = os.path.join(
Guitlle marked this conversation as resolved.
Show resolved Hide resolved
settings.BASE_DIR, self.LONG_RUNNING_MIGRATIONS_DIR, f'{self.name}.py'
)
if not os.path.exists(file_path):
raise ValueError('Task does not exist in tasks directory')
super().save(**kwargs)

def _load_module(self):
"""
This function allows you to load a Python module from a file path even if
the module's name does not follow Python's standard naming conventions
(e.g., starting with numbers or containing special characters). Normally,
Python identifiers must adhere to specific rules, but this method bypasses
those restrictions by dynamically creating a module from its file.
"""
module_path = f'{self.LONG_RUNNING_MIGRATIONS_DIR}/{self.name}.py'
if not os.path.exists(f'{settings.BASE_DIR}/{module_path}'):
logging.error(
f'LongRunningMigration._load_module():'
f'File not found `{module_path}`'
)
return

spec = spec_from_file_location(self.name, module_path)
try:
module = module_from_spec(spec)
except (ModuleNotFoundError, AttributeError):
logging.error(
f'LongRunningMigration._load_module():'
f'Failed to import migration module `{self.name}`'
)
return

spec.loader.exec_module(module)
return module
41 changes: 41 additions & 0 deletions kobo/apps/long_running_migrations/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from kobo.celery import celery_app

from django.conf import settings
from django.core.cache import cache
from django.db.models import Q
from django.utils import timezone
from dateutil.relativedelta import relativedelta

from .models import LongRunningMigration, LongRunningMigrationStatus


@celery_app.task(
queue='kpi_low_priority_queue',
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
)
def execute_long_running_migrations():
lock_key = 'execute_long_running_migrations'

if cache.add(
lock_key, 'true', timeout=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
):
try:
# Adding an offset to account for potential delays in task execution and
# clock drift between the Celery workers and the database, ensuring tasks
# are not prematurely skipped.
offset = 5 * 60
task_expiry_time = timezone.now() - relativedelta(
seconds=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT + offset
)
# Run tasks that were just created or are in progress but have exceeded
# the maximum runtime allowed for a Celery task, causing Celery to terminate
# them and raise a SoftTimeLimitExceeded exception.
for migration in LongRunningMigration.objects.filter(
Guitlle marked this conversation as resolved.
Show resolved Hide resolved
Q(status=LongRunningMigrationStatus.CREATED)
| Q(status=LongRunningMigrationStatus.IN_PROGRESS)
& Q(date_modified__lte=task_expiry_time)
).order_by('date_created'):
migration.execute()
finally:
cache.delete(lock_key)
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run():
raise Exception
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run():
print('hello from long running migration')
Loading