Source code for spyder.api.asyncdispatcher

# -*- coding: utf-8 -*-
# Copyright © 2025 Spyder Project Contributors
# Licensed under the terms of the MIT License
# (see spyder/__init__.py for details)

"""
Spyder AsyncDispatcher API.

This module provides an class decorator `AsyncDispatcher` to run coroutines on
dedicated async loops, including utilities for patching loops, managing
concurrency tasks, and executing callbacks safely within Qt applications.
"""

from __future__ import annotations

import asyncio
import asyncio.events
import atexit
import contextlib
import functools
import logging
import os
import sys
import threading
import typing
from asyncio.futures import (
    _chain_future,  # type: ignore[attr-defined] # noqa: PLC2701
)
from asyncio.tasks import (
    _current_tasks,  # type: ignore[attr-defined] # noqa: PLC2701
)
from concurrent.futures import CancelledError, Future
from heapq import heappop

if sys.version_info >= (3, 10):
    from typing import ParamSpec  # noqa: ICN003
else:
    from typing_extensions import ParamSpec

from qtpy.QtCore import QCoreApplication, QEvent, QObject

_logger = logging.getLogger(__name__)

LoopID = typing.Union[typing.Hashable, asyncio.AbstractEventLoop]

_P = ParamSpec("_P")
_T = typing.TypeVar("_T")
_RT = typing.TypeVar("_RT")


[docs] class AsyncDispatcher(typing.Generic[_RT]): """Decorator to run a coroutine in a specific event loop.""" __rlock = threading.RLock() __closed = False __running_threads: typing.ClassVar[dict[typing.Hashable, _LoopRunner]] = {} _running_tasks: typing.ClassVar[list[Future]] = [] @typing.overload def __init__( self: AsyncDispatcher[DispatcherFuture[_T]], *, loop: LoopID | None = ..., early_return: typing.Literal[True] = ..., return_awaitable: typing.Literal[False] = ..., ): ... @typing.overload def __init__( self: AsyncDispatcher[typing.Awaitable[_T]], *, loop: LoopID | None = ..., early_return: typing.Literal[True] = ..., return_awaitable: typing.Literal[True] = ..., ): ... @typing.overload def __init__( self: AsyncDispatcher[_T], *, loop: LoopID | None = ..., early_return: typing.Literal[False] = ..., return_awaitable: typing.Literal[False] = ..., ): ... @typing.overload def __init__( self: AsyncDispatcher[typing.Awaitable[_T]], *, loop: LoopID | None = ..., early_return: typing.Literal[False] = ..., return_awaitable: typing.Literal[True] = ..., ): ... def __init__( self, *, loop: LoopID | None = None, early_return: bool = True, return_awaitable: bool = False, ): """ Decorate a coroutine to run in a specific event loop. The `loop` parameter can be an existing loop or a hashable to identify an existing/new one (to be) created by the AsyncDispatcher. If the loop is not running, it will be started in a new thread and managed by the AsyncDispatcher. This instance can be called with the same arguments as the coroutine it wraps and will return a concurrent Future object, or an awaitable Future for the current running event loop or the result of the coroutine depending on the `early_return` and `return_awaitable` parameters. Usage ----- Non-Blocking usage (returns a concurrent Future): ``` @AsyncDispatcher() async def my_coroutine(...): ... future = my_coroutine(...) # Non-blocking call result = future.result() # Blocking call ``` Blocking usage (returns the result): ``` @AsyncDispatcher(early_return=False) async def my_coroutine(...): ... result = my_coroutine(...) # Blocking call ``` Coroutine usage (returns an awaitable Future): ``` @AsyncDispatcher(return_awaitable=True) async def my_coroutine(...): ... result = await my_coroutine(...) # Wait for the result to be ready ``` Parameters ---------- loop : LoopID, optional (default: None) The event loop to be used, by default get the current event loop. early_return : bool, optional (default: True) Return the coroutine as a concurrent Future before it is done. return_awaitable : bool, optional (default: False) Return the coroutine as an awaitable (asyncio) Future instead of a concurrent Future. Idenpendently of the value of `early_return`. """ self._loop = self.get_event_loop(loop) self._early_return = early_return self._return_awaitable = return_awaitable @typing.overload def __call__( self: AsyncDispatcher[typing.Awaitable[_T]], async_func: typing.Callable[_P, typing.Awaitable[_T]], ) -> typing.Callable[_P, typing.Awaitable[_T]]: ... @typing.overload def __call__( self: AsyncDispatcher[DispatcherFuture[_T]], async_func: typing.Callable[_P, typing.Awaitable[_T]], ) -> typing.Callable[_P, DispatcherFuture[_T]]: ... @typing.overload def __call__( self: AsyncDispatcher[_T], async_func: typing.Callable[_P, typing.Awaitable[_T]], ) -> typing.Callable[_P, _T]: ... def __call__( self, async_func: typing.Callable[_P, typing.Awaitable[_T]], ) -> typing.Callable[ _P, typing.Union[_T, DispatcherFuture[_T], typing.Awaitable[_T]], # noqa: UP007 ]: """ Run the coroutine in the event loop. Parameters ---------- *args : tuple The positional arguments to be passed to the coroutine. **kwargs : dict The keyword arguments to be passed to the coroutine. Returns ------- concurrent.Future or asyncio.Future or result of the coroutine Raises ------ TypeError If the function is not a coroutine function. """ if not asyncio.iscoroutinefunction(async_func): msg = f"{async_func} is not a coroutine function" raise TypeError(msg) @functools.wraps(async_func) def wrapper( *args: _P.args, **kwargs: _P.kwargs, ) -> typing.Union[_T, DispatcherFuture[_T], typing.Awaitable[_T]]: # noqa: UP007 task = run_coroutine_threadsafe( async_func(*args, **kwargs), loop=self._loop, ) if self._return_awaitable: return asyncio.wrap_future( task, loop=asyncio.get_running_loop(), ) if self._early_return: AsyncDispatcher._running_tasks.append(task) task.add_done_callback(self._callback_task_done) return task return task.result() return wrapper @staticmethod def _callback_task_done(future: Future): AsyncDispatcher._running_tasks.remove(future) with contextlib.suppress(asyncio.CancelledError, CancelledError): if (exception := future.exception()) is not None: raise exception
[docs] @classmethod def get_event_loop( cls, loop_id: LoopID | None = None, ) -> asyncio.AbstractEventLoop: """Get the event loop to run the coroutine. If the loop is not running, it will be started in a new thread and managed by the AsyncDispatcher. Parameters ---------- loop_id : LoopID, optional (default: None) The event loop to be used, by default gets the current thread event loop. Notes ----- * If a hashable is provided, it will be used to identify the loop in the AsyncDispatcher. * If an event loop is provided, it will be used as the event loop in the AsyncDispatcher. Returns ------- AbstractEventLoop The event loop to be used. """ loop, loop_id = cls._fetch_event_loop(loop_id) try: if loop.is_running(): return loop except RuntimeError: _logger.exception( "Failed to check if the loop is running, defaulting to the " "current loop.", ) return asyncio.get_event_loop() with cls.__rlock: # Re-check, perhaps it was created in the meantime... if loop_id not in cls.__running_threads: cls.__run_loop(loop_id, loop) if loop_id is None: asyncio.set_event_loop(loop) return loop
@classmethod def _fetch_event_loop( cls, loop_id: LoopID | None = None, ) -> tuple[asyncio.AbstractEventLoop, typing.Hashable | None]: """Get the event loop and its hashable id.""" if loop_id is None: with contextlib.suppress(RuntimeError): return asyncio.get_running_loop(), None elif isinstance(loop_id, asyncio.AbstractEventLoop): return loop_id, hash(loop_id) running_thread = cls.__running_threads.get(loop_id) if running_thread is not None: return running_thread.loop, loop_id return asyncio.new_event_loop(), loop_id @classmethod def __run_loop( cls, loop_id: typing.Hashable, loop: asyncio.AbstractEventLoop, ): if loop_id not in cls.__running_threads: with cls.__rlock: if loop_id not in cls.__running_threads: _patch_loop_as_reentrant(loop) # ipykernel compatibility cls.__running_threads[loop_id] = _LoopRunner(loop_id, loop) cls.__running_threads[loop_id].start()
[docs] @staticmethod @atexit.register def close(): """Close the thread pool.""" if AsyncDispatcher.__closed: return AsyncDispatcher.cancel_all() AsyncDispatcher.join() AsyncDispatcher.__closed = True
[docs] @classmethod def cancel_all(cls): """Cancel all running tasks.""" for task in cls._running_tasks: task.cancel()
[docs] @classmethod def join(cls, timeout: float | None = None): """Close all running loops and join the threads.""" for loop_id in list(cls.__running_threads.keys()): cls._stop_running_loop(loop_id, timeout)
@classmethod def _stop_running_loop(cls, loop_id: LoopID, timeout: float | None = None): runner = cls.__running_threads.pop(loop_id, None) if runner is None: return runner.join(timeout)
[docs] @staticmethod def QtSlot(func: typing.Callable[_P, None]) -> typing.Callable[_P, None]: # noqa: N802 """Mark a function to be executed inside the main qt loop. Set the `DispatcherFuture.QT_SLOT_ATTRIBUTE` attribute to the function to mark it as a slot to be executed in the main Qt loop. Parameters ---------- func : Callable The function to be marked. Returns ------- Callable The marked function. """ setattr(func, DispatcherFuture.QT_SLOT_ATTRIBUTE, True) return func
class _LoopRunner(threading.Thread): """A task runner that runs an asyncio event loop on a background thread.""" def __init__( self, loop_id: typing.Hashable, loop: asyncio.AbstractEventLoop, ): super().__init__(daemon=True, name=f"AsyncDispatcher-{loop_id}") self.__loop = loop self.__loop_stopped = threading.Event() @property def loop(self): return self.__loop def run(self): asyncio.set_event_loop(self.__loop) try: self.__loop.run_forever() finally: self.__loop.close() self.__loop_stopped.set() def stop(self): self.__loop.call_soon_threadsafe(self.__loop.stop) def join(self, timeout: float | None = None): if not self.__loop_stopped.is_set(): self.stop() self.__loop_stopped.wait(timeout) super().join(timeout) # ruff: noqa: SLF001 # TODO(hlouzada): refactor this function to reduce its complexity def _patch_loop_as_reentrant(loop): # noqa: C901, PLR0915 """ Patch an event loop in order to make it reentrant. This is a simplified version of the 'nest_asyncio'. Parameters ---------- loop : AbstractEventLoop The event loop to be patched. Raises ------ TypeError If the loop is not an instance of `asyncio.BaseEventLoop`. """ if hasattr(loop, "_nest_patched"): """Use same check as asyncio to avoid re-patching.""" return def run_forever(self): with manage_run(self), manage_asyncgens(self): while True: self._run_once() if self._stopping: break self._stopping = False def run_until_complete(self, future): with manage_run(self): f = asyncio.ensure_future(future, loop=self) if f is not future: f._log_destroy_pending = False while not f.done(): self._run_once() if self._stopping: break if not f.done(): msg = "Event loop stopped before Future completed." raise RuntimeError(msg) return f.result() def _run_once(self): """Run handles as they become ready. Simplified re-implementation of asyncio's _run_once """ ready = self._ready scheduled = self._scheduled while scheduled and scheduled[0]._cancelled: heappop(scheduled) timeout = ( 0 if ready or self._stopping else min(max(scheduled[0]._when - self.time(), 0), 86400) if scheduled else None ) event_list = self._selector.select(timeout) self._process_events(event_list) end_time = self.time() + self._clock_resolution while scheduled and scheduled[0]._when < end_time: handle = heappop(scheduled) ready.append(handle) for __ in range(len(ready)): if not ready: break handle = ready.popleft() if not handle._cancelled: # preempt the current task so that that checks in # Task.__step do not raise curr_task = _current_tasks.pop(self, None) try: handle._run() finally: # restore the current task if curr_task is not None: _current_tasks[self] = curr_task handle = None @contextlib.contextmanager def manage_run(self): """Set up the loop for running.""" self._check_closed() old_thread_id = self._thread_id old_running_loop = asyncio.events._get_running_loop() try: self._thread_id = threading.get_ident() asyncio.events._set_running_loop(self) self._num_runs_pending += 1 if self._is_proactorloop and self._self_reading_future is None: self.call_soon(self._loop_self_reading) yield finally: self._thread_id = old_thread_id asyncio.events._set_running_loop(old_running_loop) self._num_runs_pending -= 1 if ( self._is_proactorloop and self._num_runs_pending == 0 and self._self_reading_future is not None ): ov = self._self_reading_future._ov self._self_reading_future.cancel() if ov is not None: self._proactor._unregister(ov) self._self_reading_future = None @contextlib.contextmanager def manage_asyncgens(self): if not hasattr(sys, "get_asyncgen_hooks"): # Python version is too old. return old_agen_hooks = sys.get_asyncgen_hooks() try: self._set_coroutine_origin_tracking(self._debug) if self._asyncgens is not None: sys.set_asyncgen_hooks( firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook, ) yield finally: self._set_coroutine_origin_tracking(False) if self._asyncgens is not None: sys.set_asyncgen_hooks(*old_agen_hooks) def _check_running(self): """Do not throw exception if loop is already running.""" if not isinstance(loop, asyncio.BaseEventLoop): msg = f"Can't patch loop of type {type(loop)}" raise TypeError(msg) cls = loop.__class__ cls.run_forever = run_forever cls.run_until_complete = run_until_complete cls._run_once = _run_once cls._check_running = _check_running cls._num_runs_pending = 1 if loop.is_running() else 0 cls._is_proactorloop = os.name == "nt" and issubclass( cls, asyncio.ProactorEventLoop, ) cls._nest_patched = True class _QCallbackEvent(QEvent): """Event to execute a callback in the main Qt loop.""" def __init__(self, func: typing.Callable): super().__init__(QEvent.Type.User) self.func = func class _QCallbackExecutor(QObject): """Executor to run callbacks in the main Qt loop.""" def customEvent(self, e: _QCallbackEvent): # noqa: N802, PLR6301 e.func()
[docs] class DispatcherFuture(Future, typing.Generic[_T]): """Represents the result of an asynchronous computation. This class is a subclass of `concurrent.Future` that adds a `connect` method to allow attaching callbacks to be executed in the main Qt loop. """ QT_SLOT_ATTRIBUTE = "__dispatch_qt_slot__" _callback_executor = _QCallbackExecutor()
[docs] def result(self, timeout: typing.Optional[float] = None) -> _T: # noqa: UP045 """ Return the result of the call that the future represents. Parameters ---------- timeout: float | None The number of seconds to wait for the result. If None, then wait indefinitely. Returns ------- DispatchedFuture@_T The result of the call that the future represents. Raises ------ CancelledError If the future was cancelled. TimeoutError If the future didn't finish executing before the given timeout. Exception Exception raised by the call that the future represents. """ # noqa: DOC502 return super().result(timeout=timeout)
[docs] def connect(self, fn: typing.Callable[[DispatcherFuture[_T]], None]): """Attaches a callable that will be called when the future finishes. The callable will be called by a thread in the same process in which it was added if the it was not marked with `DispatherFuture.QT_SLOT_ATTRIBUTE`. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added. Parameters ---------- fn: Callable A callable that will be called with this future's as its only argument when the future completes. """ if getattr(fn, self.QT_SLOT_ATTRIBUTE, False): def callback(future: DispatcherFuture[_T]): e = _QCallbackEvent(lambda: fn(future)) QCoreApplication.postEvent(self._callback_executor, e) self.add_done_callback(callback) # type: ignore[arg-type] else: self.add_done_callback(fn) # type: ignore[arg-type]
[docs] def run_coroutine_threadsafe( coro: typing.Coroutine[_T, None, _RT], loop: asyncio.AbstractEventLoop, ) -> DispatcherFuture[_RT]: """Submit a coroutine object to a given event loop. Arguments --------- coro: Coroutine The coroutine object to be submitted. loop: AbstractEventLoop The event loop to run the coroutine. Returns ------- DispatcherFuture A future object representing the result of the coroutine. Raises ------ TypeError If the object is not a coroutine. """ if not asyncio.iscoroutine(coro): msg = "A coroutine object is required" raise TypeError(msg) future = DispatcherFuture() def callback(): try: _chain_future(asyncio.ensure_future(coro, loop=loop), future) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: if future.set_running_or_notify_cancel(): future.set_exception(exc) raise loop.call_soon_threadsafe(callback) return future