Source code for ntfy_api.client

  1"""The `NtfyClient` class used for handling interactions with the ntfy
  2API.
  3
  4:copyright: (c) 2024 Tanner Corcoran
  5:license: Apache 2.0, see LICENSE for more details.
  6
  7"""
  8
  9import dataclasses
 10import json
 11import sys
 12from collections.abc import Generator, Iterable
 13from types import MappingProxyType, TracebackType
 14from typing import Literal, Union
 15
 16if sys.version_info >= (3, 11):  # pragma: no cover
 17    from typing import Self
 18else:  # pragma: no cover
 19    from typing_extensions import Self
 20
 21import httpx
 22
 23from .__version__ import *  # noqa: F401,F403
 24from ._internals import URL, StrTuple
 25from .creds import Credentials
 26from .errors import APIError
 27from .filter import Filter
 28from .message import Message, ReceivedMessage, _ReceivedMessage
 29from .subscription import NtfySubscription
 30
 31__all__ = ("NtfyClient",)
 32
 33
[docs] 34@dataclasses.dataclass(eq=False, frozen=True) 35class NtfyClient: 36 """The class that handles publishing :class:`.Message` instances and 37 creation of :class:`.NtfySubscription` instances. 38 39 :param base_url: The base URL of a ntfy server. 40 :type base_url: str 41 :param default_topic: If provided, it will be used as a fallback 42 topic in cases where no topic is provided in the 43 :class:`.Message` instance being published, or as an argument 44 (:paramref:`.publish.topic`) to the :attr:`.publish` method. 45 :type default_topic: str, optional 46 :param credentials: The user credentials, if any. 47 :type credentials: Credentials, optional 48 49 """ 50 51 base_url: str 52 """See the :paramref:`~NtfyClient.base_url` parameter.""" 53 54 default_topic: Union[str, None] = None 55 """See the :paramref:`~NtfyClient.default_topic` parameter.""" 56 57 credentials: Union[Credentials, None] = None 58 """See the :paramref:`~NtfyClient.credentials` parameter.""" 59 60 _url: URL = dataclasses.field(init=False) 61 _auth_header: MappingProxyType[str, str] = dataclasses.field(init=False) 62 _http_client: Union[httpx.Client, None] = dataclasses.field( 63 default=None, init=False 64 ) 65 66 def __post_init__(self) -> None: 67 """Initialize the :class:`URL` instance, and set the 68 authentication headers. 69 70 """ 71 # url 72 object.__setattr__(self, "_url", URL.parse(self.base_url)) 73 74 # credentials 75 object.__setattr__( 76 self, 77 "_auth_header", 78 (self.credentials or Credentials()).get_header(), 79 ) 80
[docs] 81 def __enter__(self) -> Self: 82 """Enter the context manager protocol. 83 84 :return: This :class:`NtfyClient` instance. 85 :rtype: NtfyClient 86 87 """ 88 if not self._http_client: 89 self.connect() 90 return self
91
[docs] 92 def __exit__( 93 self, 94 exc_type: Union[type[BaseException], None], 95 exc_val: Union[BaseException, None], 96 exc_tb: Union[TracebackType, None], 97 ) -> Literal[False]: 98 """Exit the context manager protocol. 99 100 This ensures the client is closed. 101 102 :returns: Always :py:obj:`False`. See :meth:`object.__exit__` 103 for more information on what this return value means. 104 :rtype: typing.Literal[False] 105 106 """ 107 self.close() 108 return False
109
[docs] 110 def connect(self, client: Union[httpx.Client, None] = None) -> Self: 111 """Start a new connection pool. 112 113 :param client: If defined, uses the provided client. Otherwise, 114 a new one is created. 115 :type client: httpx.Client | None, optional 116 117 :return: This :class:`NtfyClient` instance. 118 :rtype: NtfyClient 119 120 """ 121 object.__setattr__(self, "_http_client", client or httpx.Client()) 122 return self
123
[docs] 124 def close(self) -> None: 125 """Close the HTTP connection pool, if it exists.""" 126 if self._http_client and not self._http_client.is_closed: 127 self._http_client.close() 128 object.__setattr__(self, "_http_client", None)
129
[docs] 130 def publish( 131 self, 132 msg: Message, 133 topic: Union[str, None] = None, 134 topic_override: bool = False, 135 ) -> httpx.Response: 136 """Publish a :class:`.Message` instance. 137 138 .. note:: 139 If no connection pool exists (i.e. :meth:`.connect` wasn't 140 called and this client is not being used as a context 141 manager), a single-use connection will be used. 142 143 :param msg: The :class:`.Message` instance containing the 144 content to publish. 145 :type msg: Message 146 :param topic: A topic that will be used if no 147 topic was provided in the given Message object. 148 :type topic: str | None, optional 149 :param topic_override: A flag that, if set to ``True``, will 150 cause the given :paramref:`topic` (if provided) to be used 151 regardless of whether or not a topic is present in the given 152 :class:`.Message` instance. 153 :type topic_override: bool, optional 154 155 :return: The httpx.Response instance. 156 :rtype: httpx.Response 157 158 :raises ValueError: If no topic was provided as an argument, no 159 topic was found in the provided :class:`.Message` instance, 160 and no :attr:`~NtfyClient.default_topic` was given when 161 creating this :class:`NtfyClient` instance. 162 163 """ 164 msg_topic, headers, kwargs = msg.get_args() 165 if msg_topic: 166 if topic and topic_override: 167 msg_topic = topic 168 else: 169 if topic: 170 msg_topic = topic 171 else: 172 if not self.default_topic: 173 raise ValueError("No topic could be resolved") 174 msg_topic = self.default_topic 175 176 resp = (self._http_client.post if self._http_client else httpx.post)( 177 url=self._url.unparse(endpoint=msg_topic), 178 headers={**self._auth_header, **headers}, 179 **kwargs, 180 ) 181 if resp.status_code != 200: 182 raise APIError(resp, False) 183 return resp
184
[docs] 185 def poll( 186 self, 187 topic: Union[str, None] = None, 188 filter: Union[Filter, None] = None, 189 ) -> Generator[ReceivedMessage, None, None]: 190 """Poll for messages using an HTTP connection. 191 192 .. note:: 193 If no connection pool exists (i.e. :meth:`.connect` wasn't 194 called and this client is not being used as a context 195 manager), a single-use connection will be used. 196 197 :param topic: The topic to poll. If not provide, this instance's 198 default topic will be used. 199 :type topic: str, optional 200 :param filter: An optional :class:`.Filter` instance used to 201 filter responses. 202 :type filter: Filter, optional 203 204 :return: A generator of 205 :class:`~ntfy_api.message.ReceivedMessage` instances. 206 :rtype: typing.Iterator[ReceivedMessage] 207 208 :raises ValueError: If no topic was provided as an argument, and 209 no :attr:`~NtfyClient.default_topic` was given when creating 210 this :class:`NtfyClient` instance. 211 212 """ 213 if not topic: 214 if not self.default_topic: 215 raise ValueError("No topic could be resolved") 216 topic = self.default_topic 217 218 with (self._http_client.stream if self._http_client else httpx.stream)( 219 method="GET", 220 url=self._url.unparse( 221 endpoint=( 222 topic, 223 "json", 224 ) 225 ), 226 headers={ 227 "X-Poll": "1", 228 **self._auth_header, 229 **(filter.serialize() if filter else {}), 230 }, 231 ) as response: 232 if response.status_code != 200: 233 raise APIError(response, True) 234 for line in response.iter_lines(): 235 if line: 236 data = json.loads(line) 237 yield _ReceivedMessage.from_json(data)
238
[docs] 239 def subscribe( 240 self, 241 topics: Union[str, Iterable[str], None] = None, 242 filter: Union[Filter, None] = None, 243 max_queue_size: int = 0, 244 ) -> NtfySubscription: 245 """A factory for :class:`.NtfySubscription` instances, which uses 246 the same base URL and credentials as this :class:`NtfyClient` 247 instance. 248 249 .. note:: 250 The :class:`.NtfySubscription` instance is created but the 251 connection is not yet started. Use its 252 :meth:`.NtfySubscription.connect` method or the context 253 manager protocol to start receiving messages. 254 255 :param topics: Zero or more topics to subscribe to. If none are 256 given, this instance's default topic will be used. 257 :type topics: str | typing.Iterable[str] | None 258 :param filter: An optional :class:`.Filter` 259 instance used to filter responses. 260 :type filter: Filter | None 261 :param max_queue_size: The maximum size of the message queue. If 262 `<=0`, the queue is unbounded. If the queue is filled, 263 all new messages are discarded. Only when the queue has 264 room for another message, will messages start being 265 added again. This means that, if bounded, some messages 266 may be dropped if the frequency of received messages is 267 greater than your program's ability to handle those 268 messages. Defaults to ``0``. 269 :type max_queue_size: int, optional 270 271 :return: The created :class:`.NtfySubscription` 272 instance. 273 :rtype: NtfySubscription 274 275 :raises ValueError: If both the :paramref:`.subscribe.topics` 276 argument and this instance's 277 :attr:`~NtfyClient.default_topic` are :py:obj:`None`. 278 279 """ 280 _topics: StrTuple 281 if topics is None: 282 if self.default_topic is None: 283 raise ValueError( 284 "the 'topics' argument must be provided if the NtfyClient" 285 " instance's 'default_topic' is also not defined" 286 ) 287 _topics = (self.default_topic,) 288 else: 289 _topics = (topics,) if isinstance(topics, str) else tuple(topics) 290 291 return NtfySubscription( 292 base_url=self.base_url, 293 topics=_topics, 294 credentials=self.credentials, 295 filter=filter, 296 max_queue_size=max_queue_size, 297 )