Source code for django_celery_beat.clockedschedule

"""Clocked schedule Implementation."""

from celery import schedules
from celery.utils.time import maybe_make_aware

from .utils import NEVER_CHECK_TIMEOUT


[docs]class clocked(schedules.BaseSchedule): """clocked schedule. Depends on PeriodicTask one_off=True """ def __init__(self, clocked_time, nowfun=None, app=None): """Initialize clocked.""" self.clocked_time = maybe_make_aware(clocked_time) super().__init__(nowfun=nowfun, app=app)
[docs] def remaining_estimate(self, last_run_at): return self.clocked_time - self.now()
[docs] def is_due(self, last_run_at): rem_delta = self.remaining_estimate(None) remaining_s = max(rem_delta.total_seconds(), 0) if remaining_s == 0: return schedules.schedstate(is_due=True, next=NEVER_CHECK_TIMEOUT) return schedules.schedstate(is_due=False, next=remaining_s)
def __repr__(self): return f'<clocked: {self.clocked_time}>' def __eq__(self, other): if isinstance(other, clocked): return self.clocked_time == other.clocked_time return False def __ne__(self, other): return not self.__eq__(other) def __reduce__(self): return self.__class__, (self.clocked_time, self.nowfun)