1"""The `NtfyClient` class used for handling interactions with the ntfy
2API.
3
4:copyright: (c) 2024 Tanner Corcoran
5:license: Apache 2.0, see LICENSE for more details.
6
7"""
8
9import dataclasses
10import json
11import sys
12from collections.abc import Generator, Iterable
13from types import MappingProxyType, TracebackType
14from typing import Literal, Union
15
16if sys.version_info >= (3, 11): # pragma: no cover
17 from typing import Self
18else: # pragma: no cover
19 from typing_extensions import Self
20
21import httpx
22
23from .__version__ import * # noqa: F401,F403
24from ._internals import URL, StrTuple
25from .creds import Credentials
26from .errors import APIError
27from .filter import Filter
28from .message import Message, ReceivedMessage, _ReceivedMessage
29from .subscription import NtfySubscription
30
31__all__ = ("NtfyClient",)
32
33
[docs]
34@dataclasses.dataclass(eq=False, frozen=True)
35class NtfyClient:
36 """The class that handles publishing :class:`.Message` instances and
37 creation of :class:`.NtfySubscription` instances.
38
39 :param base_url: The base URL of a ntfy server.
40 :type base_url: str
41 :param default_topic: If provided, it will be used as a fallback
42 topic in cases where no topic is provided in the
43 :class:`.Message` instance being published, or as an argument
44 (:paramref:`.publish.topic`) to the :attr:`.publish` method.
45 :type default_topic: str, optional
46 :param credentials: The user credentials, if any.
47 :type credentials: Credentials, optional
48
49 """
50
51 base_url: str
52 """See the :paramref:`~NtfyClient.base_url` parameter."""
53
54 default_topic: Union[str, None] = None
55 """See the :paramref:`~NtfyClient.default_topic` parameter."""
56
57 credentials: Union[Credentials, None] = None
58 """See the :paramref:`~NtfyClient.credentials` parameter."""
59
60 _url: URL = dataclasses.field(init=False)
61 _auth_header: MappingProxyType[str, str] = dataclasses.field(init=False)
62 _http_client: Union[httpx.Client, None] = dataclasses.field(
63 default=None, init=False
64 )
65
66 def __post_init__(self) -> None:
67 """Initialize the :class:`URL` instance, and set the
68 authentication headers.
69
70 """
71 # url
72 object.__setattr__(self, "_url", URL.parse(self.base_url))
73
74 # credentials
75 object.__setattr__(
76 self,
77 "_auth_header",
78 (self.credentials or Credentials()).get_header(),
79 )
80
[docs]
81 def __enter__(self) -> Self:
82 """Enter the context manager protocol.
83
84 :return: This :class:`NtfyClient` instance.
85 :rtype: NtfyClient
86
87 """
88 if not self._http_client:
89 self.connect()
90 return self
91
[docs]
92 def __exit__(
93 self,
94 exc_type: Union[type[BaseException], None],
95 exc_val: Union[BaseException, None],
96 exc_tb: Union[TracebackType, None],
97 ) -> Literal[False]:
98 """Exit the context manager protocol.
99
100 This ensures the client is closed.
101
102 :returns: Always :py:obj:`False`. See :meth:`object.__exit__`
103 for more information on what this return value means.
104 :rtype: typing.Literal[False]
105
106 """
107 self.close()
108 return False
109
[docs]
110 def connect(self, client: Union[httpx.Client, None] = None) -> Self:
111 """Start a new connection pool.
112
113 :param client: If defined, uses the provided client. Otherwise,
114 a new one is created.
115 :type client: httpx.Client | None, optional
116
117 :return: This :class:`NtfyClient` instance.
118 :rtype: NtfyClient
119
120 """
121 object.__setattr__(self, "_http_client", client or httpx.Client())
122 return self
123
[docs]
124 def close(self) -> None:
125 """Close the HTTP connection pool, if it exists."""
126 if self._http_client and not self._http_client.is_closed:
127 self._http_client.close()
128 object.__setattr__(self, "_http_client", None)
129
[docs]
130 def publish(
131 self,
132 msg: Message,
133 topic: Union[str, None] = None,
134 topic_override: bool = False,
135 ) -> httpx.Response:
136 """Publish a :class:`.Message` instance.
137
138 .. note::
139 If no connection pool exists (i.e. :meth:`.connect` wasn't
140 called and this client is not being used as a context
141 manager), a single-use connection will be used.
142
143 :param msg: The :class:`.Message` instance containing the
144 content to publish.
145 :type msg: Message
146 :param topic: A topic that will be used if no
147 topic was provided in the given Message object.
148 :type topic: str | None, optional
149 :param topic_override: A flag that, if set to ``True``, will
150 cause the given :paramref:`topic` (if provided) to be used
151 regardless of whether or not a topic is present in the given
152 :class:`.Message` instance.
153 :type topic_override: bool, optional
154
155 :return: The httpx.Response instance.
156 :rtype: httpx.Response
157
158 :raises ValueError: If no topic was provided as an argument, no
159 topic was found in the provided :class:`.Message` instance,
160 and no :attr:`~NtfyClient.default_topic` was given when
161 creating this :class:`NtfyClient` instance.
162
163 """
164 msg_topic, headers, kwargs = msg.get_args()
165 if msg_topic:
166 if topic and topic_override:
167 msg_topic = topic
168 else:
169 if topic:
170 msg_topic = topic
171 else:
172 if not self.default_topic:
173 raise ValueError("No topic could be resolved")
174 msg_topic = self.default_topic
175
176 resp = (self._http_client.post if self._http_client else httpx.post)(
177 url=self._url.unparse(endpoint=msg_topic),
178 headers={**self._auth_header, **headers},
179 **kwargs,
180 )
181 if resp.status_code != 200:
182 raise APIError(resp, False)
183 return resp
184
[docs]
185 def poll(
186 self,
187 topic: Union[str, None] = None,
188 filter: Union[Filter, None] = None,
189 ) -> Generator[ReceivedMessage, None, None]:
190 """Poll for messages using an HTTP connection.
191
192 .. note::
193 If no connection pool exists (i.e. :meth:`.connect` wasn't
194 called and this client is not being used as a context
195 manager), a single-use connection will be used.
196
197 :param topic: The topic to poll. If not provide, this instance's
198 default topic will be used.
199 :type topic: str, optional
200 :param filter: An optional :class:`.Filter` instance used to
201 filter responses.
202 :type filter: Filter, optional
203
204 :return: A generator of
205 :class:`~ntfy_api.message.ReceivedMessage` instances.
206 :rtype: typing.Iterator[ReceivedMessage]
207
208 :raises ValueError: If no topic was provided as an argument, and
209 no :attr:`~NtfyClient.default_topic` was given when creating
210 this :class:`NtfyClient` instance.
211
212 """
213 if not topic:
214 if not self.default_topic:
215 raise ValueError("No topic could be resolved")
216 topic = self.default_topic
217
218 with (self._http_client.stream if self._http_client else httpx.stream)(
219 method="GET",
220 url=self._url.unparse(
221 endpoint=(
222 topic,
223 "json",
224 )
225 ),
226 headers={
227 "X-Poll": "1",
228 **self._auth_header,
229 **(filter.serialize() if filter else {}),
230 },
231 ) as response:
232 if response.status_code != 200:
233 raise APIError(response, True)
234 for line in response.iter_lines():
235 if line:
236 data = json.loads(line)
237 yield _ReceivedMessage.from_json(data)
238
[docs]
239 def subscribe(
240 self,
241 topics: Union[str, Iterable[str], None] = None,
242 filter: Union[Filter, None] = None,
243 max_queue_size: int = 0,
244 ) -> NtfySubscription:
245 """A factory for :class:`.NtfySubscription` instances, which uses
246 the same base URL and credentials as this :class:`NtfyClient`
247 instance.
248
249 .. note::
250 The :class:`.NtfySubscription` instance is created but the
251 connection is not yet started. Use its
252 :meth:`.NtfySubscription.connect` method or the context
253 manager protocol to start receiving messages.
254
255 :param topics: Zero or more topics to subscribe to. If none are
256 given, this instance's default topic will be used.
257 :type topics: str | typing.Iterable[str] | None
258 :param filter: An optional :class:`.Filter`
259 instance used to filter responses.
260 :type filter: Filter | None
261 :param max_queue_size: The maximum size of the message queue. If
262 `<=0`, the queue is unbounded. If the queue is filled,
263 all new messages are discarded. Only when the queue has
264 room for another message, will messages start being
265 added again. This means that, if bounded, some messages
266 may be dropped if the frequency of received messages is
267 greater than your program's ability to handle those
268 messages. Defaults to ``0``.
269 :type max_queue_size: int, optional
270
271 :return: The created :class:`.NtfySubscription`
272 instance.
273 :rtype: NtfySubscription
274
275 :raises ValueError: If both the :paramref:`.subscribe.topics`
276 argument and this instance's
277 :attr:`~NtfyClient.default_topic` are :py:obj:`None`.
278
279 """
280 _topics: StrTuple
281 if topics is None:
282 if self.default_topic is None:
283 raise ValueError(
284 "the 'topics' argument must be provided if the NtfyClient"
285 " instance's 'default_topic' is also not defined"
286 )
287 _topics = (self.default_topic,)
288 else:
289 _topics = (topics,) if isinstance(topics, str) else tuple(topics)
290
291 return NtfySubscription(
292 base_url=self.base_url,
293 topics=_topics,
294 credentials=self.credentials,
295 filter=filter,
296 max_queue_size=max_queue_size,
297 )