diff --git a/django_celery_monitor/admin.py b/django_celery_monitor/admin.py index da4465e8..be6fcb60 100644 --- a/django_celery_monitor/admin.py +++ b/django_celery_monitor/admin.py @@ -6,7 +6,7 @@ from django.contrib import admin from django.contrib.admin import helpers from django.contrib.admin.views import main as main_views -from django.shortcuts import render_to_response +from django.shortcuts import render from django.template import RequestContext from django.utils.encoding import force_text from django.utils.html import escape @@ -14,7 +14,8 @@ from celery import current_app from celery import states -from celery.task.control import broadcast, revoke, rate_limit +# from celery.task.control import broadcast, revoke, rate_limit +from celery import Celery from celery.utils.text import abbrtask from .models import TaskState, WorkerState @@ -31,6 +32,7 @@ NODE_STATE_COLORS = {'ONLINE': 'green', 'OFFLINE': 'gray'} +app = Celery('django_celery_monitor', broker='redis://localhost:6379/0') class MonitorList(main_views.ChangeList): """A custom changelist to set the page title automatically.""" @@ -175,19 +177,19 @@ class Media: def revoke_tasks(self, request, queryset): with current_app.default_connection() as connection: for state in queryset: - revoke(state.task_id, connection=connection) + app.control.revoke(state.task_id, connection=connection) @action(_('Terminate selected tasks')) def terminate_tasks(self, request, queryset): with current_app.default_connection() as connection: for state in queryset: - revoke(state.task_id, connection=connection, terminate=True) + app.control.revoke(state.task_id, connection=connection, terminate=True) @action(_('Kill selected tasks')) def kill_tasks(self, request, queryset): with current_app.default_connection() as connection: for state in queryset: - revoke(state.task_id, connection=connection, + app.control.revoke(state.task_id, connection=connection, terminate=True, signal='KILL') @action(_('Rate limit selected tasks')) @@ -199,7 +201,7 @@ def rate_limit_tasks(self, request, queryset): rate = request.POST['rate_limit'] with current_app.default_connection() as connection: for task_name in tasks: - rate_limit(task_name, rate, connection=connection) + app.control.rate_limit(task_name, rate, connection=connection) return None context = { @@ -211,7 +213,7 @@ def rate_limit_tasks(self, request, queryset): 'app_label': app_label, } - return render_to_response( + return render( self.rate_limit_confirmation_template, context, context_instance=RequestContext(request), ) @@ -241,16 +243,16 @@ class WorkerMonitor(ModelMonitor): @action(_('Shutdown selected worker nodes')) def shutdown_nodes(self, request, queryset): - broadcast('shutdown', destination=[n.hostname for n in queryset]) + app.control.broadcast('shutdown', destination=[n.hostname for n in queryset]) @action(_('Enable event mode for selected nodes.')) def enable_events(self, request, queryset): - broadcast('enable_events', + app.control.broadcast('enable_events', destination=[n.hostname for n in queryset]) @action(_('Disable event mode for selected nodes.')) def disable_events(self, request, queryset): - broadcast('disable_events', + app.control.broadcast('disable_events', destination=[n.hostname for n in queryset]) def get_actions(self, request): diff --git a/django_celery_monitor/managers.py b/django_celery_monitor/managers.py index 2d21d5ae..50511f23 100644 --- a/django_celery_monitor/managers.py +++ b/django_celery_monitor/managers.py @@ -14,31 +14,7 @@ class ExtendedQuerySet(models.QuerySet): """A custom model queryset that implements a few helpful methods.""" def select_for_update_or_create(self, defaults=None, **kwargs): - """Extend update_or_create with select_for_update. - - Look up an object with the given kwargs, updating one with defaults - if it exists, otherwise create a new one. - Return a tuple (object, created), where created is a boolean - specifying whether an object was created. - - This is a backport from Django 1.11 - (https://code.djangoproject.com/ticket/26804) to support - select_for_update when getting the object. - """ - defaults = defaults or {} - lookup, params = self._extract_model_params(defaults, **kwargs) - self._for_write = True - with transaction.atomic(using=self.db): - try: - obj = self.select_for_update().get(**lookup) - except self.model.DoesNotExist: - obj, created = self._create_object_from_params(lookup, params) - if created: - return obj, created - for k, v in defaults.items(): - setattr(obj, k, v() if callable(v) else v) - obj.save(using=self.db) - return obj, False + return self.update_or_create(defaults, **kwargs) class WorkerStateQuerySet(ExtendedQuerySet): diff --git a/django_celery_monitor/models.py b/django_celery_monitor/models.py index b5605cb5..30ba2ef7 100644 --- a/django_celery_monitor/models.py +++ b/django_celery_monitor/models.py @@ -9,7 +9,7 @@ from celery import states from celery.events.state import heartbeat_expires -from celery.five import python_2_unicode_compatible +from six import python_2_unicode_compatible from . import managers diff --git a/requirements/default.txt b/requirements/default.txt index 01be9daa..0d452e9e 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1 +1 @@ -celery>=4.0,<5.0 +celery>=5.0 diff --git a/setup.py b/setup.py index 1de64b7a..552d7140 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,7 @@ def _pyimp(): Programming Language :: Python :: 3.4 Programming Language :: Python :: 3.5 Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 Programming Language :: Python :: Implementation :: CPython Programming Language :: Python :: Implementation :: PyPy Framework :: Django @@ -52,6 +53,7 @@ def _pyimp(): Framework :: Django :: 1.9 Framework :: Django :: 1.10 Framework :: Django :: 1.11 + Framework :: Django :: 2.2 Operating System :: OS Independent Topic :: Communications Topic :: System :: Distributed Computing @@ -147,7 +149,12 @@ def run_tests(self): license='BSD', classifiers=classifiers, install_requires=reqs('default.txt'), - tests_require=reqs('test.txt'), + test_require=[ + 'case>=1.3.1', + 'pytest>=3.0', + 'pytest-django', + 'pytz>dev', + ], cmdclass={'test': pytest}, zip_safe=False, include_package_data=True,