Source code for ld_eventsource.config.async_connect_strategy

from __future__ import annotations

from logging import Logger
from typing import AsyncIterator, Callable, Optional

from ld_eventsource.errors import Headers


[docs] class AsyncConnectStrategy: """ An abstraction for how :class:`.AsyncSSEClient` should obtain an input stream. The default implementation is :meth:`http()`, which makes HTTP requests with ``aiohttp``. Or, if you want to consume an input stream from some other source, you can create your own subclass of :class:`AsyncConnectStrategy`. Instances of this class should be immutable and should not contain any state that is specific to one active stream. The :class:`AsyncConnectionClient` that they produce is stateful and belongs to a single :class:`.AsyncSSEClient`. """
[docs] def create_client(self, logger: Logger) -> AsyncConnectionClient: """ Creates a client instance. This is called once when an :class:`.AsyncSSEClient` is created. :param logger: the logger being used by the AsyncSSEClient """ raise NotImplementedError("AsyncConnectStrategy base class cannot be used by itself")
[docs] @staticmethod def http( url: str, headers: Optional[dict] = None, session=None, aiohttp_request_options: Optional[dict] = None, query_params=None, ) -> AsyncConnectStrategy: """ Creates the default async HTTP implementation using aiohttp. :param url: the stream URL :param headers: optional HTTP headers to add to the request :param session: optional ``aiohttp.ClientSession`` to use :param aiohttp_request_options: optional kwargs passed to the aiohttp ``get()`` call :param query_params: optional callable that returns a dict of query params per connection """ # Import here to avoid requiring aiohttp for users who don't use async HTTP from ld_eventsource.async_http import (_AsyncHttpClientImpl, _AsyncHttpConnectParams) return _AsyncHttpConnectStrategy( _AsyncHttpConnectParams(url, headers, session, aiohttp_request_options, query_params) )
[docs] class AsyncConnectionClient: """ An object provided by :class:`.AsyncConnectStrategy` that is retained by a single :class:`.AsyncSSEClient` to perform all connection attempts by that instance. """
[docs] async def connect(self, last_event_id: Optional[str]) -> AsyncConnectionResult: """ Attempts to connect to a stream. Raises an exception if unsuccessful. :param last_event_id: the current value of last_event_id (sent to server for resuming) :return: an :class:`AsyncConnectionResult` representing the stream """ raise NotImplementedError("AsyncConnectionClient base class cannot be used by itself")
[docs] async def close(self): """ Does whatever is necessary to release resources when the AsyncSSEClient is closed. """ pass
async def __aenter__(self): return self async def __aexit__(self, type, value, traceback): await self.close()
[docs] class AsyncConnectionResult: """ The return type of :meth:`AsyncConnectionClient.connect()`. """
[docs] def __init__( self, stream: AsyncIterator[bytes], closer: Optional[Callable], headers: Optional[Headers] = None, ): self.__stream = stream self.__closer = closer self.__headers = headers
@property def stream(self) -> AsyncIterator[bytes]: """ An async iterator that returns chunks of data. """ return self.__stream @property def headers(self) -> Optional[Headers]: """ The HTTP response headers, if available. Header name lookups are case-insensitive per RFC 7230. """ return self.__headers
[docs] async def close(self): """ Does whatever is necessary to release the connection. """ if self.__closer: await self.__closer() self.__closer = None
async def __aenter__(self): return self async def __aexit__(self, type, value, traceback): await self.close()
class _AsyncHttpConnectStrategy(AsyncConnectStrategy): def __init__(self, params): self.__params = params def create_client(self, logger: Logger) -> AsyncConnectionClient: from ld_eventsource.async_http import _AsyncHttpClientImpl return _AsyncHttpConnectionClient(self.__params, logger) class _AsyncHttpConnectionClient(AsyncConnectionClient): def __init__(self, params, logger: Logger): from ld_eventsource.async_http import _AsyncHttpClientImpl self.__impl = _AsyncHttpClientImpl(params, logger) async def connect(self, last_event_id: Optional[str]) -> AsyncConnectionResult: stream, closer, headers = await self.__impl.connect(last_event_id) return AsyncConnectionResult(stream, closer, headers) async def close(self): await self.__impl.close() __all__ = ['AsyncConnectStrategy', 'AsyncConnectionClient', 'AsyncConnectionResult']