import json
from typing import Optional
from ld_eventsource.errors import ExceptionWithHeaders, Headers
[docs]
class Action:
"""
Base class for objects that can be returned by :attr:`.SSEClient.all`.
"""
pass
[docs]
class Event(Action):
"""
An event received by :class:`.SSEClient`.
Instances of this class are returned by both :attr:`.SSEClient.events` and
:attr:`.SSEClient.all`.
"""
[docs]
def __init__(
self,
event: str = 'message',
data: str = '',
id: Optional[str] = None,
last_event_id: Optional[str] = None,
):
self._event = event
self._data = data
self._id = id
self._last_event_id = last_event_id
@property
def event(self) -> str:
"""
The event type, or "message" if not specified.
"""
return self._event
@property
def data(self) -> str:
"""
The event data.
"""
return self._data
@property
def id(self) -> Optional[str]:
"""
The value of the ``id:`` field for this event, or `None` if omitted.
"""
return self._id
@property
def last_event_id(self) -> Optional[str]:
"""
The value of the most recent ``id:`` field of an event seen in this stream so far.
"""
return self._last_event_id
def __eq__(self, other):
if not isinstance(other, Event):
return False
return (
self._event == other._event
and self._data == other._data
and self._id == other._id
and self.last_event_id == other.last_event_id
)
def __repr__(self):
return "Event(event=\"%s\", data=%s, id=%s, last_event_id=%s)" % (
self._event,
json.dumps(self._data),
"None" if self._id is None else json.dumps(self._id),
"None" if self._last_event_id is None else json.dumps(self._last_event_id),
)
[docs]
class Start(Action):
"""
Indicates that :class:`.SSEClient` has successfully connected to a stream.
Instances of this class are only available from :attr:`.SSEClient.all`.
A ``Start`` is returned for the first successful connection. If the client reconnects
after a failure, there will be a :class:`.Fault` followed by a ``Start``.
Each ``Start`` action may include HTTP response headers from the connection. These headers
are available via the :attr:`headers` property. On reconnection, a new ``Start`` will be
emitted with the headers from the new connection, which may differ from the previous one.
"""
[docs]
def __init__(self, headers: Optional[Headers] = None):
self._headers = headers
@property
def headers(self) -> Optional[Headers]:
"""
The HTTP response headers from the stream connection, if available.
Header name lookups are case-insensitive per RFC 7230.
:return: the response headers, or ``None`` if not available
"""
return self._headers
[docs]
class Fault(Action):
"""
Indicates that :class:`.SSEClient` encountered an error or end of stream.
Instances of this class are only available from :attr:`.SSEClient.all`.
If you receive a Fault, the SSEClient is now in an inactive state since either a
connection attempt has failed or an existing connection has been closed. The SSEClient
will attempt to reconnect if you either call :meth:`.SSEClient.start()`
or simply continue reading events after this point.
When the error includes HTTP response headers (such as for :class:`.HTTPStatusError`
or :class:`.HTTPContentTypeError`), they are accessible via the :attr:`headers` property.
"""
[docs]
def __init__(self, error: Optional[Exception]):
self.__error = error
@property
def error(self) -> Optional[Exception]:
"""
The exception that caused the stream to fail, if any. If this is ``None``, it means
that the stream simply ran out of data, i.e. the server shut down the connection
in an orderly way after sending an EOF chunk as defined by chunked transfer encoding.
"""
return self.__error
@property
def headers(self) -> Optional[Headers]:
"""
The HTTP response headers from the failed connection, if available.
This property returns headers when the error is an exception that includes them,
such as :class:`.HTTPStatusError` or :class:`.HTTPContentTypeError`. For other
error types or when the stream ended normally, this returns ``None``.
Header name lookups are case-insensitive per RFC 7230.
:return: the response headers, or ``None`` if not available
"""
if isinstance(self.__error, ExceptionWithHeaders):
return self.__error.headers
return None