1"""The :class:`NtfySubscription` class used for handling subscriptions.
2
3:copyright: (c) 2024 Tanner Corcoran
4:license: Apache 2.0, see LICENSE for more details.
5
6"""
7
8import dataclasses
9import json
10import logging
11import sys
12import threading
13from types import MappingProxyType, TracebackType
14from typing import Literal, Union
15
16if sys.version_info >= (3, 11): # pragma: no cover
17 from typing import Self
18else: # pragma: no cover
19 from typing_extensions import Self
20
21from websockets import exceptions as ws_exc
22from websockets.sync import client as ws_client
23
24from .__version__ import * # noqa: F401,F403
25from ._internals import URL, ClearableQueue, StrTuple
26from .creds import Credentials
27from .filter import Filter
28from .message import ReceivedMessage, _ReceivedMessage
29
30__all__ = ("NtfySubscription",)
31logger = logging.Logger(__name__)
32
33
[docs]
34@dataclasses.dataclass(eq=False, frozen=True)
35class NtfySubscription:
36 """The class that handles subscriptions.
37
38 :param base_url: The base URL of a ntfy server.
39 :param topics: The topics to subscribe to.
40 :param credentials: The user credentials, if any.
41 :param filter: Optional response filters.
42 :param max_queue_size: The maximum size of the message queue. If
43 ``<=0``, the queue is unbounded. If the queue is filled, all new
44 messages are discarded. Only when the queue has room for
45 another message, will messages start being added again. This
46 means that, if bounded, some messages may be dropped if the
47 frequency of received messages is greater than your
48 program's ability to handle those messages.
49
50 """
51
52 base_url: str
53 """See the :paramref:`~NtfySubscription.base_url` parameter."""
54
55 topics: StrTuple
56 """See the :paramref:`~NtfySubscription.topics` parameter."""
57
58 credentials: Union[Credentials, None] = None
59 """See the :paramref:`~NtfySubscription.credentials` parameter."""
60
61 filter: Union[Filter, None] = None
62 """See the :paramref:`~NtfySubscription.filter` parameter."""
63
64 max_queue_size: int = 0
65 """See the :paramref:`~NtfySubscription.max_queue_size` parameter.
66
67 """
68
69 messages: ClearableQueue[ReceivedMessage] = dataclasses.field(init=False)
70 """The message queue.
71
72 This attribute stores received messages. See :class:`queue.Queue`
73 for details on how to interact with this attribute.
74
75 """
76
77 _url: URL = dataclasses.field(init=False)
78 _auth_header: MappingProxyType[str, str] = dataclasses.field(init=False)
79 _ws_conn: Union[ws_client.ClientConnection, None] = dataclasses.field(
80 default=None, init=False
81 )
82 _thread: Union[threading.Thread, None] = dataclasses.field(
83 default=None, init=False
84 )
85
86 def __post_init__(self) -> None:
87 """Create message queue, and set URL and credentials."""
88 # message queue
89 object.__setattr__(
90 self, "messages", ClearableQueue(self.max_queue_size)
91 )
92
93 # url
94 object.__setattr__(self, "_url", URL.parse(self.base_url))
95
96 # credentials
97 object.__setattr__(
98 self,
99 "_auth_header",
100 (self.credentials or Credentials()).get_header(),
101 )
102
[docs]
103 def __enter__(self) -> Self:
104 """Enter the context manager protocol.
105
106 :returns: The `NtfySubscription` instance.
107 :rtype: NtfySubscription
108
109 """
110 if not self._ws_conn:
111 self.connect()
112 return self
113
[docs]
114 def __exit__(
115 self,
116 exc_type: Union[type[BaseException], None],
117 exc_val: Union[BaseException, None],
118 exc_tb: Union[TracebackType, None],
119 ) -> Literal[False]:
120 """Exit the context manager protocol.
121
122 This ensures the client is closed.
123
124 :returns: Always :py:obj:`False`. See :meth:`object.__exit__`
125 for more information on what this return value means.
126 :rtype: typing.Literal[False]
127
128 """
129 self.close()
130 return False
131
[docs]
132 def connect(
133 self, connection: Union[ws_client.ClientConnection, None] = None
134 ) -> Self:
135 """Initiate the websocket connection.
136
137 .. note::
138 This also clears :attr:`~NtfySubscription.messages`.
139
140 :param connection: The websocket connection to use. If not
141 provided, one will be created.
142 :type connection: websockets.sync.client.ClientConnection |
143 None, optional
144
145 :returns: This :class:`NtfySubscription` instance.
146 :rtype: NtfySubscription
147
148 """
149 object.__setattr__(
150 self,
151 "_ws_conn",
152 connection
153 or ws_client.connect(
154 uri=self._url.unparse(
155 endpoint=(",".join(self.topics), "ws"),
156 scheme=("ws", "wss"),
157 ),
158 additional_headers={
159 **self._auth_header,
160 **(self.filter.serialize() if self.filter else {}),
161 },
162 ),
163 )
164 self.messages.clear()
165 object.__setattr__(
166 self, "_thread", threading.Thread(target=self._thread_fn)
167 )
168 if self._thread:
169 self._thread.start()
170
171 # this if/else is mostly here for type safety, as self._thread
172 # can be None, hence the pragma below
173 else: # pragma: no cover
174 raise ValueError(
175 "Attempted to start consumer thread, but the thread was not"
176 " successfully created"
177 )
178
179 return self
180
181 def _thread_fn(self) -> None:
182 while True:
183 if self._ws_conn is None:
184 return
185 try:
186 raw = self._ws_conn.recv()
187 data = json.loads(raw)
188 self.messages.put(
189 _ReceivedMessage.from_json(data), block=False
190 )
191 print(self.messages)
192 except json.JSONDecodeError as e:
193 logger.warning(
194 f"Failed to process JSON input ('{e}'): {raw!r}"
195 )
196 continue
197 except (AttributeError, TypeError, ValueError) as e:
198 logger.warning(
199 "Failed to instantiated _ReceivedMessage instance"
200 f" ('{e}'): {raw!r}"
201 )
202 continue
203 except ws_exc.ConnectionClosed:
204 return
205
[docs]
206 def close(self) -> None:
207 """Close the websocket connection, if it exists."""
208 if self._ws_conn: # pragma: no branch
209 self._ws_conn.close()
210 object.__setattr__(self, "_ws_conn", None)
211 if self._thread and self._thread.is_alive(): # pragma: no branch
212 self._thread.join()