Skip to content

Commit b99ed91

Browse files
committed
ingestion: implement ingestion class helper
1 parent eb434f8 commit b99ed91

4 files changed

Lines changed: 149 additions & 2 deletions

File tree

.genignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
.zed/
2+
src/polar_sdk/ingestion/
23
src/polar_sdk/webhooks/
34
src/tests/
45
ruff.toml

ruff.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
target-version = "py39"
22

3-
include = ["src/polar_sdk/webhooks/**/*.py"]
3+
include = [
4+
"src/polar_sdk/ingestion/**/*.py",
5+
"src/polar_sdk/webhooks/**/*.py"
6+
]
47

58
[lint]
6-
extend-select = ["I", "UP", "T20"]
9+
extend-select = ["I", "UP", "T20", "G"]
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from ._base import Ingestion
2+
3+
__all__ = ["Ingestion"]

src/polar_sdk/ingestion/_base.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import atexit
2+
import contextlib
3+
import logging
4+
import queue
5+
import threading
6+
import time
7+
from collections.abc import Callable
8+
from typing import TYPE_CHECKING, Union
9+
10+
from polar_sdk import Polar
11+
12+
if TYPE_CHECKING:
13+
from polar_sdk import EventsModelTypedDict
14+
15+
logger = logging.getLogger("polar_sdk.ingestion")
16+
17+
18+
class Ingestion:
19+
"""
20+
Event ingestion client for Polar.
21+
22+
This class handles the ingestion of events into the Polar API without blocking
23+
the main thread, by using background thread sending them in batches.
24+
25+
:param access_token: The access_token required for authentication
26+
:param server: The server by name to use for all methods
27+
:param server_url: The server URL to use for all methods
28+
:param max_batch_size: The maximum number of events to send in a single batch.
29+
:param flush_interval: The interval in seconds to wait before flushing events.
30+
:param max_queue_size: The maximum number of events to keep in the queue.
31+
"""
32+
33+
def __init__(
34+
self,
35+
access_token: Union[str, Callable[[], Union[str, None]], None] = None,
36+
server: Union[str, None] = None,
37+
server_url: Union[str, None] = None,
38+
*,
39+
max_batch_size: int = 50,
40+
flush_interval: float = 5.0,
41+
max_queue_size: int = 10000,
42+
) -> None:
43+
self.max_batch_size = max_batch_size
44+
self.flush_interval = flush_interval
45+
self.max_queue_size = max_queue_size
46+
47+
self._stack = contextlib.ExitStack()
48+
self._client = self._stack.enter_context(
49+
Polar(access_token, server, server_url)
50+
)
51+
52+
self._queue = queue.Queue["EventsModelTypedDict"](maxsize=max_queue_size)
53+
54+
self._thread = threading.Thread(target=self._worker, daemon=True)
55+
self._thread_running = threading.Event()
56+
self._thread_running.set()
57+
self._thread.start()
58+
59+
atexit.register(self.close)
60+
61+
def ingest(self, event: "EventsModelTypedDict") -> None:
62+
"""
63+
Send an event to the ingestion queue.
64+
65+
:param event: The event to send.
66+
"""
67+
self._queue.put(event, block=False)
68+
69+
def flush(self, max_batch_size: Union[int, None] = None) -> None:
70+
"""
71+
Flush events from the queue to the API
72+
73+
:param max_batch_size: The maximum number of events to send in the batch.
74+
If `None`, there is no limit.
75+
"""
76+
if self._queue.empty():
77+
return
78+
79+
# Collect events up to max_batch_size
80+
events: list[EventsModelTypedDict] = []
81+
try:
82+
while not self._queue.empty() and (
83+
max_batch_size is None or len(events) < max_batch_size
84+
):
85+
events.append(self._queue.get_nowait())
86+
except queue.Empty:
87+
pass
88+
89+
if not events:
90+
return
91+
92+
# Send the batch to the API
93+
try:
94+
self._send_batch(events)
95+
# Mark tasks as done only after successful sending
96+
for _ in events:
97+
self._queue.task_done()
98+
except Exception as e:
99+
# On failure, put events back in the queue for retry
100+
logger.error("Failed to send events: %s", e)
101+
for event in events:
102+
try:
103+
self._queue.put(event, block=False)
104+
except queue.Full:
105+
logger.error("Queue full, dropping event during retry")
106+
107+
def close(self) -> None:
108+
"""
109+
Flush remaining events and close the background thread.
110+
111+
It's called automatically on program exit.
112+
"""
113+
logger.debug("Shutting down, flushing remaining events...")
114+
self._thread_running.clear()
115+
116+
# Try to flush remaining events
117+
try:
118+
self.flush()
119+
except Exception as e:
120+
logger.error("Error during shutdown flush: %s", e)
121+
122+
if self._thread.is_alive():
123+
self._thread.join(timeout=5.0)
124+
125+
self._stack.close()
126+
logger.debug("Shutdown complete")
127+
128+
def _worker(self) -> None:
129+
logger.debug("Worker thread started")
130+
while self._thread_running.is_set():
131+
try:
132+
self.flush(self.max_batch_size)
133+
except Exception as e:
134+
logger.error("Error in worker thread: %s", e)
135+
136+
time.sleep(self.flush_interval)
137+
138+
def _send_batch(self, events: list["EventsModelTypedDict"]) -> None:
139+
response = self._client.events.ingest(request={"events": events})
140+
logger.debug("Ingested %d events", response.inserted)

0 commit comments

Comments
 (0)