import asyncio
import logging
import threading
from enum import Enum
from feeluown.utils import aio
logger = logging.getLogger(__name__)
[docs]def is_in_loop_thread():
"""check if current thread has event loop"""
return threading.current_thread() is threading.main_thread()
[docs]class TaskKind(Enum):
preemptive = 'preemptive' #: preemptive task
cooperative = 'cooperative' #: cooperative task
[docs]class PreemptiveTaskSpec:
"""Preemptive task specification (threadsafe)"""
def __init__(self, mgr, name):
"""
:param mgr: :class:`TaskManager`
:param name: task unique name
"""
self._mgr = mgr
self.name = name
self.kind = TaskKind.preemptive
self._task = None
self._use_default_cb = True
def _before_bind(self):
if self._task is None:
return
if not self._task.done():
logger.info('preemptive-task(%s): try to cancel previous', self.name)
if is_in_loop_thread():
self._task.cancel()
else:
self._mgr.loop.call_soon_threadsafe(self._task.cancel)
self._task = None
[docs] def bind_coro(self, coro):
"""run the coroutine and bind the task
it will cancel the previous task if exists
:return: :class:`asyncio.Task`
"""
self._before_bind()
if is_in_loop_thread():
self._task = aio.create_task(coro)
else:
self._task = asyncio.run_coroutine_threadsafe(coro, loop=self._mgr.loop)
if self._use_default_cb:
self._task.add_done_callback(self._cb)
return self._task
[docs] def bind_blocking_io(self, func, *args):
"""run blocking io func in a thread executor, and bind the task
it will cancel the previous task if exists
:return: :class:`asyncio.Task`
"""
self._before_bind()
self._task = self._mgr.loop.run_in_executor(None, func, *args)
if self._use_default_cb:
self._task.add_done_callback(self._cb)
return self._task
def disable_default_cb(self):
self._use_default_cb = False
def _cb(self, future):
try:
future.result()
except asyncio.CancelledError:
logger.warning(f'Task {self.name} is cancelled')
except Exception as e: # noqa
logger.exception(f'Task {self.name} failed')
[docs]class TaskManager:
"""named task manager
Usage::
async def fetch_song():
pass
task_name = 'unique-name'
task_spec = task_mgr.get_or_create(task_name, TaskType.preemptive)
task = task_spec.bind_coro(fetch_song())
"""
def __init__(self, app):
"""
:param app: feeluown app instance
:param loop: asyncio event loop
"""
self._app = app
# only accessible for task instance
self.loop = asyncio.get_running_loop()
# store the name:taskspec mapping
self._store = {}
[docs] def get_or_create(self, name, kind=TaskKind.preemptive):
"""get task spec, it will be created if not exists
:param name: task identifier(name)
:param kind: :class:`TaskKind`
TODO: client should register first, then get by name
"""
if name not in self._store:
task_spec = self._create(name, kind)
else:
task_spec = self._store[name]
return task_spec
def _create(self, name, kind):
kind = TaskKind(kind)
task_spec = PreemptiveTaskSpec(self, name)
self._store[name] = task_spec
return task_spec