Source code for django_celery_results.backends.database
from __future__ import absolute_import, unicode_literals
from celery.backends.base import BaseDictBackend
from celery.utils.serialization import b64encode, b64decode
from ..models import TaskResult
[docs]class DatabaseBackend(BaseDictBackend):
"""The Django database backend, using models to store task state."""
TaskModel = TaskResult
subpolling_interval = 0.5
def _store_result(self, task_id, result, status,
traceback=None, request=None):
"""Store return value and status of an executed task."""
content_type, content_encoding, result = self.encode_content(result)
_, _, meta = self.encode_content({
'children': self.current_task_children(request),
})
task_name = getattr(request, 'task', None) if request else None
task_args = getattr(request, 'args', None) if request else None
task_kwargs = getattr(request, 'kwargs', None) if request else None
self.TaskModel._default_manager.store_result(
content_type, content_encoding,
task_id, result, status,
traceback=traceback,
meta=meta,
task_name=task_name,
task_args=task_args,
task_kwargs=task_kwargs,
)
return result
def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
obj = self.TaskModel._default_manager.get_task(task_id)
res = obj.as_dict()
meta = self.decode_content(obj, res.pop('meta', None)) or {}
res.update(meta,
result=self.decode_content(obj, res.get('result')))
return self.meta_from_decoded(res)
[docs] def encode_content(self, data):
content_type, content_encoding, content = self._encode(data)
if content_encoding == 'binary':
content = b64encode(content)
return content_type, content_encoding, content
[docs] def decode_content(self, obj, content):
if content:
if obj.content_encoding == 'binary':
content = b64decode(content)
return self.decode(content)
def _forget(self, task_id):
try:
self.TaskModel._default_manager.get(task_id=task_id).delete()
except self.TaskModel.DoesNotExist:
pass
[docs] def cleanup(self):
"""Delete expired metadata."""
self.TaskModel._default_manager.delete_expired(self.expires)