Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): introduce long-running migration system using Celery TASK-1394 #5379

Merged
merged 13 commits into from
Dec 18, 2024
Merged
1 change: 1 addition & 0 deletions dependencies/pip/dev_requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pytest
pytest-cov
pytest-django
pytest-env
freezegun


# Kobocat
Expand Down
3 changes: 3 additions & 0 deletions dependencies/pip/dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ fabric==3.2.2
# via -r dependencies/pip/dev_requirements.in
flower==2.0.1
# via -r dependencies/pip/requirements.in
freezegun==1.5.1
# via -r dependencies/pip/dev_requirements.in
frozenlist==1.4.1
# via
# aiohttp
Expand Down Expand Up @@ -488,6 +490,7 @@ python-dateutil==2.9.0.post0
# -r dependencies/pip/requirements.in
# botocore
# celery
# freezegun
# pandas
# python-crontab
python3-openid==3.2.0
Expand Down
42 changes: 42 additions & 0 deletions kobo/apps/long_running_migrations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Long Running Migrations

This feature allows you to execute long-running migrations using Celery. Each migration will attempt to complete within the maximum time allowed by Celery (see settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT`). If it does not complete within this time, the periodic task will retry and resume the migration from where it left off, continuing until the long-running migration either successfully completes or raises an exception.

## How to Use

1. **Create your migration**
Define your migrations in the `jobs` folder. Each migration should have a unique name, following Django's migration naming convention (e.g., `0001_description`). The migration file must contain a function called `run()`.

2. **Register the migration**
Create a `LongRunningMigration` entry by running:

```python
LongRunningMigration.objects.create(name='0001_fix_transfer')
```

You can automate this step by adding it to a Django migration with `RunPython`


3. **Execute the migration**
Wait for the periodic task `execute_long_running_migrations` to run automatically or trigger it manually (beware of the lock, it can only run one at a time).


## Writing a good long-running migration

When writing long-running migrations, ensure they are both **atomic** and **tolerant** to interruptions at any point in their execution.

```python
# 2024-10-13
from django.db import transaction

def run():
for foo in Foo.objects.filter(is_barred=False): # Checks actually needs to run still
with transaction.atomic(): # Atomic!
foo.make_it_bar() # Perhaps this does multiple things that could succeed or fail
```

* Notice that if the task is interrupted, it will simply continue in the next run.

* Because tasks are slow, your code should run regardless of when the data migration takes place.

* Add a timestamp to your migration definition to help future developers identify when it can be safely removed (if needed).
Empty file.
8 changes: 8 additions & 0 deletions kobo/apps/long_running_migrations/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.contrib import admin

from .models import LongRunningMigration


@admin.register(LongRunningMigration)
class LongRunningMigrationAdmin(admin.ModelAdmin):
readonly_fields=('date_created', 'date_modified')
Empty file.
29 changes: 29 additions & 0 deletions kobo/apps/long_running_migrations/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.2.15 on 2024-12-13 19:53
from django.db import migrations, models

import kpi.models.abstract_models


class Migration(migrations.Migration):

initial = True

dependencies = [
]

operations = [
migrations.CreateModel(
name='LongRunningMigration',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('date_created', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)),
('date_modified', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)),
('name', models.CharField(max_length=255, unique=True)),
('status', models.CharField(choices=[('created', 'Created'), ('in_progress', 'In Progress'), ('failed', 'Failed'), ('completed', 'Completed')], default='created', max_length=20)),
('attempts', models.PositiveSmallIntegerField(default=0)),
],
options={
'abstract': False,
},
),
]
Empty file.
86 changes: 86 additions & 0 deletions kobo/apps/long_running_migrations/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import importlib
import os

from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.db import models

from kpi.models.abstract_models import AbstractTimeStampedModel
from kpi.utils.log import logging


class LongRunningMigrationStatus(models.TextChoices):
CREATED = 'created'
IN_PROGRESS = 'in_progress'
FAILED = 'failed'
COMPLETED = 'completed'


class LongRunningMigration(AbstractTimeStampedModel):

LONG_RUNNING_MIGRATIONS_DIR = os.path.join(
'kobo',
'apps',
'long_running_migrations',
'jobs'
)

name = models.CharField(max_length=255, unique=True)
status = models.CharField(
default=LongRunningMigrationStatus.CREATED,
choices=LongRunningMigrationStatus.choices,
max_length=20,
)
attempts = models.PositiveSmallIntegerField(default=0)

def clean(self):
super().clean()
if '..' in self.name or '/' in self.name or '\\' in self.name:
raise SuspiciousOperation(
f"Invalid migration name '{self.name}'. "
f"Migration names cannot contain directory traversal characters "
f"such as '..', '/', or '\\'."
)

def execute(self):
# Skip execution if the migration is already completed
if self.status == LongRunningMigrationStatus.COMPLETED:
return

base_import = self.LONG_RUNNING_MIGRATIONS_DIR.replace('/', '.')
try:
module = importlib.import_module('.'.join([base_import, self.name]))
except ModuleNotFoundError as e:
logging.error(
f'LongRunningMigration.execute(), '
f'failed to import task module: {str(e)}'
)
return

self.status = LongRunningMigrationStatus.IN_PROGRESS
self.attempts += self.attempts
self.save(update_fields=['status', 'attempts'])

try:
module.run()
except Exception as e:
# Log the error and update the status to 'failed'
logging.error(f'LongRunningMigration.execute(): {str(e)}')
self.status = LongRunningMigrationStatus.FAILED
self.save(update_fields=['status'])
return

self.status = LongRunningMigrationStatus.COMPLETED
self.save(update_fields=['status'])

def save(self, **kwargs):

self.clean()

if self._state.adding:
file_path = os.path.join(
Guitlle marked this conversation as resolved.
Show resolved Hide resolved
settings.BASE_DIR, self.LONG_RUNNING_MIGRATIONS_DIR, f'{self.name}.py'
)
if not os.path.exists(file_path):
raise ValueError('Task does not exist in tasks directory')
super().save(**kwargs)
41 changes: 41 additions & 0 deletions kobo/apps/long_running_migrations/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from kobo.celery import celery_app

from django.conf import settings
from django.core.cache import cache
from django.db.models import Q
from django.utils import timezone
from dateutil.relativedelta import relativedelta

from .models import LongRunningMigration, LongRunningMigrationStatus


@celery_app.task(
queue='kpi_low_priority_queue',
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
)
def execute_long_running_migrations():
lock_key = 'execute_long_running_migrations'

if cache.add(
lock_key, 'true', timeout=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
):
try:
# Adding an offset to account for potential delays in task execution and
# clock drift between the Celery workers and the database, ensuring tasks
# are not prematurely skipped.
offset = 5 * 60
task_expiry_time = timezone.now() - relativedelta(
seconds=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT + offset
)
# Run tasks that were just created or are in progress but have exceeded
# the maximum runtime allowed for a Celery task, causing Celery to terminate
# them and raise a SoftTimeLimitExceeded exception.
for migration in LongRunningMigration.objects.filter(
Guitlle marked this conversation as resolved.
Show resolved Hide resolved
Q(status=LongRunningMigrationStatus.CREATED)
| Q(status=LongRunningMigrationStatus.IN_PROGRESS)
& Q(date_modified__lte=task_expiry_time)
).order_by('date_created'):
migration.execute()
finally:
cache.delete(lock_key)
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run():
raise Exception
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run():
print('hello from long running migration')
102 changes: 102 additions & 0 deletions kobo/apps/long_running_migrations/tests/test_case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import os
from unittest.mock import patch

from django.core.exceptions import SuspiciousOperation
from django.test import TestCase, override_settings
from freezegun import freeze_time

from ..models import LongRunningMigration, LongRunningMigrationStatus
from ..tasks import execute_long_running_migrations


@override_settings(
CACHES={
'default': {'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}
}
)
@patch.object(LongRunningMigration, 'LONG_RUNNING_MIGRATIONS_DIR', os.path.join(
'kobo',
'apps',
'long_running_migrations',
'tests',
'fixtures'
))
class LongRunningMigrationTestCase(TestCase):
def test_sample_task(self):
migration = LongRunningMigration.objects.create(name='sample_task')
migration.execute()
migration.refresh_from_db()
self.assertEqual(migration.status, LongRunningMigrationStatus.COMPLETED)

def test_invalid_task_name(self):
with self.assertRaises(ValueError):
LongRunningMigration.objects.create(name='foo')

def test_traversal_characters(self):
with self.assertRaises(SuspiciousOperation):
LongRunningMigration.objects.create(name='../fixtures/sample_task')

def test_sample_failure(self):
migration = LongRunningMigration.objects.create(name='sample_failure')
migration.execute()
migration.refresh_from_db()
self.assertEqual(migration.status, LongRunningMigrationStatus.FAILED)

def test_not_updated_worker(self):
migrations = LongRunningMigration.objects.bulk_create(
[LongRunningMigration(name='foo')]
)
migration = migrations[0]
migration.execute()
migration.refresh_from_db()
self.assertEqual(migration.status, LongRunningMigrationStatus.CREATED)


@override_settings(
CACHES={
'default': {'BACKEND': 'django.core.cache.backends.dummy.DummyCache'}
}
)
class LongRunningMigrationPeriodicTaskTestCase(TestCase):

def setUp(self):
self.patcher = patch.object(
LongRunningMigration,
'LONG_RUNNING_MIGRATIONS_DIR',
os.path.join('kobo', 'apps', 'long_running_migrations', 'tests', 'fixtures')
)
self.patcher.start()
self.migration = LongRunningMigration.objects.create(name='sample_task')

def tearDown(self):
self.patcher.stop()

def test_migration_as_completed(self):
execute_long_running_migrations()
self.migration.refresh_from_db()
self.assertEqual(self.migration.status, LongRunningMigrationStatus.COMPLETED)

def test_failed_migration_is_ignored(self):
# Ignore failed task
self.migration.status = LongRunningMigrationStatus.FAILED
self.migration.save(update_fields=['status'])
execute_long_running_migrations()
self.migration.refresh_from_db()
self.assertEqual(self.migration.status, LongRunningMigrationStatus.FAILED)

def test_ignore_recent_started_migration(self):
# Ignore recent started tasks
self.migration.status = LongRunningMigrationStatus.IN_PROGRESS
self.migration.save()
execute_long_running_migrations()
self.migration.refresh_from_db()
self.assertEqual(self.migration.status, LongRunningMigrationStatus.IN_PROGRESS)

def test_resume_stuck_migration(self):
# Run an old in-progress task
with freeze_time('2024-12-10'):
self.migration.status = LongRunningMigrationStatus.IN_PROGRESS
self.migration.save()
execute_long_running_migrations()
self.migration.refresh_from_db()
self.assertEqual(self.migration.status, LongRunningMigrationStatus.COMPLETED)
7 changes: 7 additions & 0 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
'guardian',
'kobo.apps.openrosa.libs',
'kobo.apps.project_ownership.ProjectOwnershipAppConfig',
'kobo.apps.long_running_migrations',
)

MIDDLEWARE = [
Expand Down Expand Up @@ -1201,6 +1202,12 @@ def dj_stripe_request_callback_method():
'schedule': crontab(minute=0, hour=0),
'options': {'queue': 'kpi_low_priority_queue'}
},
# Schedule every hour, every day
'long-running-migrations': {
'task': 'kobo.apps.long_running_migrations.tasks.execute_long_running_migrations', # noqa
'schedule': crontab(minute=0),
'options': {'queue': 'kpi_low_priority_queue'}
},
}


Expand Down
3 changes: 2 additions & 1 deletion kpi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ def enketo_flush_cached_preview(server_url, form_id):
@celery_app.task(time_limit=LIMIT_HOURS_23, soft_time_limit=LIMIT_HOURS_23)
def perform_maintenance():
"""
Run daily maintenance tasks
Run daily maintenance tasks. Ensure it cannot run multiple times.
"""

remove_unused_markdown_files()
remove_old_import_tasks()
remove_old_asset_snapshots()