-
-
Notifications
You must be signed in to change notification settings - Fork 184
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
21 changed files
with
529 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# Long Running Migrations | ||
|
||
This feature allows you to execute long-running migrations using Celery. Each migration will attempt to complete within the maximum time allowed by Celery (see settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT`). If it does not complete within this time, the periodic task will retry and resume the migration from where it left off, continuing until the long-running migration either successfully completes or raises an exception. | ||
|
||
## How to Use | ||
|
||
1. **Create your migration** | ||
Define your migrations in the `jobs` folder. Each migration should have a unique name, following Django's migration naming convention (e.g., `0001_description`). The migration file must contain a function called `run()`. | ||
|
||
2. **Register the migration** | ||
Create a `LongRunningMigration` entry by running: | ||
|
||
```python | ||
LongRunningMigration.objects.create(name='0001_sample') | ||
``` | ||
|
||
You can automate this step by adding it to a Django migration with `RunPython`. | ||
|
||
```python | ||
from django.db import migrations | ||
|
||
|
||
def add_long_running_migration(apps, schema_editor): | ||
LongRunningMigration = apps.get_model('long_running_migrations', 'LongRunningMigration') # noqa | ||
LongRunningMigration.objects.create( | ||
name='0001_sample' | ||
) | ||
|
||
|
||
def noop(*args, **kwargs): | ||
pass | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('long_running_migrations', '0001_initial'), | ||
] | ||
|
||
operations = [ | ||
migrations.RunPython(add_long_running_migration, noop), | ||
] | ||
|
||
|
||
``` | ||
|
||
|
||
|
||
3. **Execute the migration** | ||
Wait for the periodic task `execute_long_running_migrations` to run automatically or trigger it manually (beware of the lock, it can only run one at a time). | ||
|
||
|
||
## Writing a good long-running migration | ||
|
||
When writing long-running migrations, ensure they are both **atomic** and **tolerant** to interruptions at any point in their execution. | ||
|
||
```python | ||
# 2024-10-13 | ||
from django.db import transaction | ||
|
||
def run(): | ||
for foo in Foo.objects.filter(is_barred=False): # Checks actually needs to run still | ||
with transaction.atomic(): # Atomic! | ||
foo.make_it_bar() # Perhaps this does multiple things that could succeed or fail | ||
``` | ||
|
||
* Notice that if the task is interrupted, it will simply continue in the next run. | ||
|
||
* Because tasks are slow, your code should run regardless of when the data migration takes place. | ||
|
||
* Add a timestamp to your migration definition to help future developers identify when it can be safely removed (if needed). |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from django.contrib import admin | ||
|
||
from .models import LongRunningMigration | ||
|
||
|
||
@admin.register(LongRunningMigration) | ||
class LongRunningMigrationAdmin(admin.ModelAdmin): | ||
readonly_fields=('date_created', 'date_modified') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Generated on 2024-12-18 | ||
|
||
def run(): | ||
""" | ||
Describe your long-running migration | ||
""" | ||
|
||
pass |
56 changes: 56 additions & 0 deletions
56
...apps/long_running_migrations/jobs/0002_fix_project_ownership_transfer_with_media_files.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# Generated on 2024-12-18 | ||
from django.db.models import Q, OuterRef, Subquery | ||
|
||
from kobo.apps.openrosa.apps.logger.models import XForm | ||
from kobo.apps.openrosa.apps.main.models import MetaData | ||
from kpi.models.asset import Asset | ||
from kpi.models.asset_file import AssetFile | ||
from kobo.apps.project_ownership.models import Transfer | ||
|
||
|
||
def run(): | ||
""" | ||
Update OpenRosa MetaData objects that were not updated when project | ||
ownership was transferred to someone else. This fixes a bug introduced | ||
and later addressed in KPI (issue #5365). | ||
""" | ||
|
||
# Step 1: Retrieve all assets that were transferred since the bug was present and | ||
# use media files | ||
asset_uids = Asset.objects.filter( | ||
Q( | ||
pk__in=AssetFile.objects.values_list('asset_id', flat=True).exclude( | ||
file_type=AssetFile.PAIRED_DATA | ||
) | ||
) | ||
& Q( | ||
pk__in=Transfer.objects.values_list('asset_id', flat=True).filter( | ||
invite__date_created__date__gte='2024-09-15' | ||
) | ||
) | ||
).values_list('uid', flat=True) | ||
|
||
username_subquery = XForm.objects.filter(pk=OuterRef('xform_id')).values( | ||
'user__username' | ||
)[:1] | ||
|
||
# Step 2: Iterate through relevant MetaData objects and fix their data_file fields | ||
for metadata in ( | ||
MetaData.objects.filter( | ||
xform_id__in=XForm.objects.filter( | ||
kpi_asset_uid__in=list(asset_uids) | ||
), | ||
) | ||
.exclude( | ||
Q(data_file__startswith=Subquery(username_subquery)) | ||
| Q(data_file__isnull=True) | ||
| Q(data_file='') | ||
) | ||
.select_related('xform', 'xform__user') | ||
.iterator() | ||
): | ||
data_file = str(metadata.data_file) | ||
old_username, *other_parts = data_file.split('/') | ||
other_parts.insert(0, metadata.xform.user.username) | ||
metadata.data_file = '/'.join(other_parts) | ||
metadata.save(update_fields=['data_file']) |
Empty file.
29 changes: 29 additions & 0 deletions
29
kobo/apps/long_running_migrations/migrations/0001_initial.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# Generated by Django 4.2.15 on 2024-12-13 19:53 | ||
from django.db import migrations, models | ||
|
||
import kpi.models.abstract_models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
initial = True | ||
|
||
dependencies = [ | ||
] | ||
|
||
operations = [ | ||
migrations.CreateModel( | ||
name='LongRunningMigration', | ||
fields=[ | ||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), | ||
('date_created', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)), | ||
('date_modified', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)), | ||
('name', models.CharField(max_length=255, unique=True)), | ||
('status', models.CharField(choices=[('created', 'Created'), ('in_progress', 'In Progress'), ('failed', 'Failed'), ('completed', 'Completed')], default='created', max_length=20)), | ||
('attempts', models.PositiveSmallIntegerField(default=0)), | ||
], | ||
options={ | ||
'abstract': False, | ||
}, | ||
), | ||
] |
25 changes: 25 additions & 0 deletions
25
...ning_migrations/migrations/0002_fix_failed_project_ownership_transfer_with_media_files.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Generated by Django 4.2.15 on 2024-12-18 20:00 | ||
|
||
from django.db import migrations | ||
|
||
|
||
def add_long_running_migration(apps, schema_editor): | ||
LongRunningMigration = apps.get_model('long_running_migrations', 'LongRunningMigration') # noqa | ||
LongRunningMigration.objects.create( | ||
name='0002_fix_project_ownership_transfer_with_media_files' | ||
) | ||
|
||
|
||
def noop(*args, **kwargs): | ||
pass | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('long_running_migrations', '0001_initial'), | ||
] | ||
|
||
operations = [ | ||
migrations.RunPython(add_long_running_migration, noop), | ||
] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import os | ||
from importlib.util import module_from_spec, spec_from_file_location | ||
|
||
from django.conf import settings | ||
from django.core.exceptions import SuspiciousOperation | ||
from django.db import models | ||
|
||
from kpi.models.abstract_models import AbstractTimeStampedModel | ||
from kpi.utils.log import logging | ||
|
||
|
||
class LongRunningMigrationStatus(models.TextChoices): | ||
CREATED = 'created' | ||
IN_PROGRESS = 'in_progress' | ||
FAILED = 'failed' | ||
COMPLETED = 'completed' | ||
|
||
|
||
class LongRunningMigration(AbstractTimeStampedModel): | ||
|
||
LONG_RUNNING_MIGRATIONS_DIR = os.path.join( | ||
'kobo', | ||
'apps', | ||
'long_running_migrations', | ||
'jobs' | ||
) | ||
|
||
name = models.CharField(max_length=255, unique=True) | ||
status = models.CharField( | ||
default=LongRunningMigrationStatus.CREATED, | ||
choices=LongRunningMigrationStatus.choices, | ||
max_length=20, | ||
) | ||
attempts = models.PositiveSmallIntegerField(default=0) | ||
|
||
def clean(self): | ||
super().clean() | ||
if '..' in self.name or '/' in self.name or '\\' in self.name: | ||
raise SuspiciousOperation( | ||
f"Invalid migration name '{self.name}'. " | ||
f"Migration names cannot contain directory traversal characters " | ||
f"such as '..', '/', or '\\'." | ||
) | ||
|
||
def execute(self): | ||
# Skip execution if the migration is already completed | ||
if self.status == LongRunningMigrationStatus.COMPLETED: | ||
return | ||
|
||
if not (module := self._load_module()): | ||
return | ||
|
||
self.status = LongRunningMigrationStatus.IN_PROGRESS | ||
self.attempts += 1 | ||
self.save(update_fields=['status', 'attempts']) | ||
|
||
try: | ||
module.run() | ||
except Exception as e: | ||
# Log the error and update the status to 'failed' | ||
logging.error(f'LongRunningMigration.execute(): {str(e)}') | ||
self.status = LongRunningMigrationStatus.FAILED | ||
self.save(update_fields=['status']) | ||
return | ||
|
||
self.status = LongRunningMigrationStatus.COMPLETED | ||
self.save(update_fields=['status']) | ||
|
||
def save(self, **kwargs): | ||
|
||
self.clean() | ||
|
||
if self._state.adding: | ||
file_path = os.path.join( | ||
settings.BASE_DIR, self.LONG_RUNNING_MIGRATIONS_DIR, f'{self.name}.py' | ||
) | ||
if not os.path.exists(file_path): | ||
raise ValueError('Task does not exist in tasks directory') | ||
super().save(**kwargs) | ||
|
||
def _load_module(self): | ||
""" | ||
This function allows you to load a Python module from a file path even if | ||
the module's name does not follow Python's standard naming conventions | ||
(e.g., starting with numbers or containing special characters). Normally, | ||
Python identifiers must adhere to specific rules, but this method bypasses | ||
those restrictions by dynamically creating a module from its file. | ||
""" | ||
module_path = f'{self.LONG_RUNNING_MIGRATIONS_DIR}/{self.name}.py' | ||
if not os.path.exists(f'{settings.BASE_DIR}/{module_path}'): | ||
logging.error( | ||
f'LongRunningMigration._load_module():' | ||
f'File not found `{module_path}`' | ||
) | ||
return | ||
|
||
spec = spec_from_file_location(self.name, module_path) | ||
try: | ||
module = module_from_spec(spec) | ||
except (ModuleNotFoundError, AttributeError): | ||
logging.error( | ||
f'LongRunningMigration._load_module():' | ||
f'Failed to import migration module `{self.name}`' | ||
) | ||
return | ||
|
||
spec.loader.exec_module(module) | ||
return module |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from kobo.celery import celery_app | ||
|
||
from django.conf import settings | ||
from django.core.cache import cache | ||
from django.db.models import Q | ||
from django.utils import timezone | ||
from dateutil.relativedelta import relativedelta | ||
|
||
from .models import LongRunningMigration, LongRunningMigrationStatus | ||
|
||
|
||
@celery_app.task( | ||
queue='kpi_low_priority_queue', | ||
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT, | ||
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT, | ||
) | ||
def execute_long_running_migrations(): | ||
lock_key = 'execute_long_running_migrations' | ||
|
||
if cache.add( | ||
lock_key, 'true', timeout=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT | ||
): | ||
try: | ||
# Adding an offset to account for potential delays in task execution and | ||
# clock drift between the Celery workers and the database, ensuring tasks | ||
# are not prematurely skipped. | ||
offset = 5 * 60 | ||
task_expiry_time = timezone.now() - relativedelta( | ||
seconds=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT + offset | ||
) | ||
# Run tasks that were just created or are in progress but have exceeded | ||
# the maximum runtime allowed for a Celery task, causing Celery to terminate | ||
# them and raise a SoftTimeLimitExceeded exception. | ||
for migration in LongRunningMigration.objects.filter( | ||
Q(status=LongRunningMigrationStatus.CREATED) | ||
| Q(status=LongRunningMigrationStatus.IN_PROGRESS) | ||
& Q(date_modified__lte=task_expiry_time) | ||
).order_by('date_created'): | ||
migration.execute() | ||
finally: | ||
cache.delete(lock_key) |
Empty file.
Empty file.
2 changes: 2 additions & 0 deletions
2
kobo/apps/long_running_migrations/tests/fixtures/sample_failure.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
def run(): | ||
raise Exception |
2 changes: 2 additions & 0 deletions
2
kobo/apps/long_running_migrations/tests/fixtures/sample_task.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
def run(): | ||
print('hello from long running migration') |
Oops, something went wrong.