Source code for ld_eventsource.async_client

import asyncio
import logging
import time
from typing import AsyncIterable, Optional, Union

from ld_eventsource.actions import Action, Event, Fault, Start
from ld_eventsource.async_reader import (_AsyncBufferedLineReader,
                                         _AsyncSSEReader)
from ld_eventsource.config.async_connect_strategy import (
    AsyncConnectionClient, AsyncConnectionResult, AsyncConnectStrategy)
from ld_eventsource.config.error_strategy import ErrorStrategy
from ld_eventsource.config.retry_delay_strategy import RetryDelayStrategy


[docs] class AsyncSSEClient: """ An async client for reading a Server-Sent Events stream. This is an async/await implementation. The expected usage is to create an ``AsyncSSEClient`` instance (either as an async context manager or directly), then read from it using the async iterator properties :attr:`events` or :attr:`all`. By default, ``AsyncSSEClient`` uses ``aiohttp`` to make HTTP requests to an SSE endpoint. You can customize this behavior using :class:`.AsyncConnectStrategy`. Connection failures and error responses can be handled in various ways depending on the constructor parameters. The default behavior is the same as :class:`.SSEClient`. Example:: async with AsyncSSEClient("https://my-server/events") as client: async for event in client.events: print(event.data) """
[docs] def __init__( self, connect: Union[str, AsyncConnectStrategy], initial_retry_delay: float = 1, retry_delay_strategy: Optional[RetryDelayStrategy] = None, retry_delay_reset_threshold: float = 60, error_strategy: Optional[ErrorStrategy] = None, last_event_id: Optional[str] = None, logger: Optional[logging.Logger] = None, ): """ Creates an async client instance. :param connect: either an :class:`.AsyncConnectStrategy` instance or a URL string :param initial_retry_delay: the initial delay before reconnecting after a failure, in seconds :param retry_delay_strategy: allows customization of the delay behavior for retries :param retry_delay_reset_threshold: minimum connection time before resetting retry delay :param error_strategy: allows customization of the behavior after a stream failure :param last_event_id: if provided, the ``Last-Event-Id`` value will be preset to this :param logger: if provided, log messages will be written here """ if isinstance(connect, str): connect = AsyncConnectStrategy.http(connect) elif not isinstance(connect, AsyncConnectStrategy): raise TypeError("connect must be either a string or AsyncConnectStrategy") self.__base_retry_delay = initial_retry_delay self.__base_retry_delay_strategy = ( retry_delay_strategy or RetryDelayStrategy.default() ) self.__retry_delay_reset_threshold = retry_delay_reset_threshold self.__current_retry_delay_strategy = self.__base_retry_delay_strategy self.__next_retry_delay = 0 self.__base_error_strategy = error_strategy or ErrorStrategy.always_fail() self.__current_error_strategy = self.__base_error_strategy self.__last_event_id = last_event_id if logger is None: logger = logging.getLogger('launchdarkly-eventsource-async.null') logger.addHandler(logging.NullHandler()) logger.propagate = False self.__logger = logger self.__connection_client: AsyncConnectionClient = connect.create_client(logger) self.__connection_result: Optional[AsyncConnectionResult] = None self._retry_reset_baseline: float = 0 self.__disconnected_time: float = 0 self.__closed = False self.__interrupted = False
[docs] async def start(self): """ Attempts to start the stream if it is not already active. """ await self._try_start(False)
[docs] async def close(self): """ Permanently shuts down this client instance and closes any active connection. """ self.__closed = True await self.interrupt() await self.__connection_client.close()
[docs] async def interrupt(self): """ Stops the stream connection if it is currently active, without permanently closing. """ if self.__connection_result: self.__interrupted = True await self.__connection_result.close() self.__connection_result = None self._compute_next_retry_delay()
@property def all(self) -> AsyncIterable[Action]: """ An async iterable series of notifications from the stream. Each can be any subclass of :class:`.Action`: :class:`.Event`, :class:`.Comment`, :class:`.Start`, or :class:`.Fault`. """ return self._all_generator() @property def events(self) -> AsyncIterable[Event]: """ An async iterable series of :class:`.Event` objects received from the stream. """ return self._events_generator() async def _all_generator(self): while True: while self.__connection_result is None: result = await self._try_start(True) if result is not None: yield result current_result = self.__connection_result lines = _AsyncBufferedLineReader.lines_from(current_result.stream) reader = _AsyncSSEReader(lines, self.__last_event_id, None) error: Optional[Exception] = None try: async for ec in reader.events_and_comments(): self.__last_event_id = reader.last_event_id yield ec if self.__interrupted: break except Exception as e: if self.__closed: return error = e finally: self.__last_event_id = reader.last_event_id await current_result.close() self.__connection_result = None self._compute_next_retry_delay() fail_or_continue, self.__current_error_strategy = ( self.__current_error_strategy.apply(error) ) if fail_or_continue == ErrorStrategy.FAIL: if error is None: yield Fault(None) return raise error yield Fault(error) continue async def _events_generator(self): async for item in self._all_generator(): if isinstance(item, Event): yield item @property def next_retry_delay(self) -> float: """ The retry delay that will be used for the next reconnection, in seconds. """ return self.__next_retry_delay def _compute_next_retry_delay(self): if self.__retry_delay_reset_threshold > 0 and self._retry_reset_baseline != 0: now = time.time() connection_duration = now - self._retry_reset_baseline if connection_duration >= self.__retry_delay_reset_threshold: self.__current_retry_delay_strategy = self.__base_retry_delay_strategy self._retry_reset_baseline = now self.__next_retry_delay, self.__current_retry_delay_strategy = ( self.__current_retry_delay_strategy.apply(self.__base_retry_delay) ) async def _try_start(self, can_return_fault: bool): if self.__connection_result is not None: return None while True: if self.__next_retry_delay > 0: delay = ( self.__next_retry_delay if self.__disconnected_time == 0 else self.__next_retry_delay - (time.time() - self.__disconnected_time) ) if delay > 0: self.__logger.info("Will reconnect after delay of %fs" % delay) await asyncio.sleep(delay) try: self.__connection_result = await self.__connection_client.connect( self.__last_event_id ) except Exception as e: self.__disconnected_time = time.time() self._compute_next_retry_delay() fail_or_continue, self.__current_error_strategy = ( self.__current_error_strategy.apply(e) ) if fail_or_continue == ErrorStrategy.FAIL: raise e if can_return_fault: return Fault(e) continue self._retry_reset_baseline = time.time() self.__current_error_strategy = self.__base_error_strategy self.__interrupted = False return Start(self.__connection_result.headers) @property def last_event_id(self) -> Optional[str]: """ The ID value, if any, of the last known event. """ return self.__last_event_id async def __aenter__(self): return self async def __aexit__(self, type, value, traceback): await self.close()
__all__ = ['AsyncSSEClient']