"""Periodic Task Admin interface."""
from celery import current_app
from celery.utils import cached_property
from django import forms
from django.conf import settings
from django.contrib import admin, messages
from django.db.models import Case, Value, When
from django.forms.widgets import Select
from django.template.defaultfilters import pluralize
from django.utils.translation import gettext_lazy as _
from kombu.utils.json import loads
from .models import (ClockedSchedule, CrontabSchedule, IntervalSchedule,
PeriodicTask, PeriodicTasks, SolarSchedule)
from .utils import is_database_scheduler
[docs]class TaskChoiceField(forms.ChoiceField):
"""Field that lets you choose between task names."""
widget = TaskSelectWidget
[docs] def valid_value(self, value):
return True
[docs]class PeriodicTaskAdmin(admin.ModelAdmin):
"""Admin-interface for periodic tasks."""
form = PeriodicTaskForm
model = PeriodicTask
celery_app = current_app
date_hierarchy = 'start_time'
list_display = ('name', 'enabled', 'scheduler', 'interval', 'start_time',
'last_run_at', 'one_off')
list_filter = ['enabled', 'one_off', 'task', 'start_time', 'last_run_at']
actions = ('enable_tasks', 'disable_tasks', 'toggle_tasks', 'run_tasks')
search_fields = ('name',)
fieldsets = (
(None, {
'fields': ('name', 'regtask', 'task', 'enabled', 'description',),
'classes': ('extrapretty', 'wide'),
}),
(_('Schedule'), {
'fields': ('interval', 'crontab', 'crontab_translation', 'solar',
'clocked', 'start_time', 'last_run_at', 'one_off'),
'classes': ('extrapretty', 'wide'),
}),
(_('Arguments'), {
'fields': ('args', 'kwargs'),
'classes': ('extrapretty', 'wide', 'collapse', 'in'),
}),
(_('Execution Options'), {
'fields': ('expires', 'expire_seconds', 'queue', 'exchange',
'routing_key', 'priority', 'headers'),
'classes': ('extrapretty', 'wide', 'collapse', 'in'),
}),
)
readonly_fields = (
'last_run_at', 'crontab_translation',
)
[docs] def crontab_translation(self, obj):
return obj.crontab.human_readable
change_form_template = 'admin/djcelery/change_periodictask_form.html'
[docs] def changelist_view(self, request, extra_context=None):
extra_context = extra_context or {}
scheduler = getattr(settings, 'CELERY_BEAT_SCHEDULER', None)
extra_context['wrong_scheduler'] = not is_database_scheduler(scheduler)
return super().changelist_view(
request, extra_context)
[docs] def get_queryset(self, request):
qs = super().get_queryset(request)
return qs.select_related('interval', 'crontab', 'solar', 'clocked')
def _message_user_about_update(self, request, rows_updated, verb):
"""Send message about action to user.
`verb` should shortly describe what have changed (e.g. 'enabled').
"""
self.message_user(
request,
_('{0} task{1} {2} successfully {3}').format(
rows_updated,
pluralize(rows_updated),
pluralize(rows_updated, _('was,were')),
verb,
),
)
[docs] def enable_tasks(self, request, queryset):
rows_updated = queryset.update(enabled=True)
PeriodicTasks.update_changed()
self._message_user_about_update(request, rows_updated, 'enabled')
enable_tasks.short_description = _('Enable selected tasks')
[docs] def disable_tasks(self, request, queryset):
rows_updated = queryset.update(enabled=False, last_run_at=None)
PeriodicTasks.update_changed()
self._message_user_about_update(request, rows_updated, 'disabled')
disable_tasks.short_description = _('Disable selected tasks')
def _toggle_tasks_activity(self, queryset):
return queryset.update(enabled=Case(
When(enabled=True, then=Value(False)),
default=Value(True),
))
[docs] def toggle_tasks(self, request, queryset):
rows_updated = self._toggle_tasks_activity(queryset)
PeriodicTasks.update_changed()
self._message_user_about_update(request, rows_updated, 'toggled')
toggle_tasks.short_description = _('Toggle activity of selected tasks')
[docs] def run_tasks(self, request, queryset):
self.celery_app.loader.import_default_modules()
tasks = [(self.celery_app.tasks.get(task.task),
loads(task.args),
loads(task.kwargs),
task.queue,
task.name)
for task in queryset]
if any(t[0] is None for t in tasks):
for i, t in enumerate(tasks):
if t[0] is None:
break
# variable "i" will be set because list "tasks" is not empty
not_found_task_name = queryset[i].task
self.message_user(
request,
_(f'task "{not_found_task_name}" not found'),
level=messages.ERROR,
)
return
task_ids = [
task.apply_async(args=args, kwargs=kwargs, queue=queue,
periodic_task_name=periodic_task_name)
if queue and len(queue)
else task.apply_async(args=args, kwargs=kwargs,
periodic_task_name=periodic_task_name)
for task, args, kwargs, queue, periodic_task_name in tasks
]
tasks_run = len(task_ids)
self.message_user(
request,
_('{0} task{1} {2} successfully run').format(
tasks_run,
pluralize(tasks_run),
pluralize(tasks_run, _('was,were')),
),
)
run_tasks.short_description = _('Run selected tasks')
[docs]class ClockedScheduleAdmin(admin.ModelAdmin):
"""Admin-interface for clocked schedules."""
fields = (
'clocked_time',
)
list_display = (
'clocked_time',
)
[docs]class CrontabScheduleAdmin(admin.ModelAdmin):
"""Admin class for CrontabSchedule."""
list_display = ('__str__', 'human_readable')
admin.site.register(IntervalSchedule)
admin.site.register(CrontabSchedule, CrontabScheduleAdmin)
admin.site.register(SolarSchedule)
admin.site.register(ClockedSchedule, ClockedScheduleAdmin)
admin.site.register(PeriodicTask, PeriodicTaskAdmin)