Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion example/batches/batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ def main():

articles_found = []

def article_callback(article_json):
def article_callback(article_json) -> bool:
"""A simple callback to process one article from the batch."""
if 'identifier' in article_json:
articles_found.append(article_json['identifier'])

return True

api_client.read_all(buffer, article_callback)

logger.info("Successfully processed %s articles from the batch.", len(articles_found))
Expand Down
39 changes: 39 additions & 0 deletions example/callback/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Real-Time Stream Callback Demo

This script demonstrates the ability to programmatically stop a real-time stream from within the processing callback.

It connects to the `/v2/articles` real-time stream, processes articles one by one, and automatically stops and disconnects after receiving a predefined number of articles (5, by default).

## Key Features
- Connects to the real-time article stream (`api_client.stream_articles`).

- Implements a custom callback function (`stream_callback`).

- Uses the callback's bool return value to control the stream (returning `False` stops the stream).

- Handles authentication and graceful token revocation using `AuthClient` and `Helper`.

## How it Works
The core logic of this demo is inside the `stream_callback` function. The `api_client`'s internal streaming loop checks the boolean value returned by this callback after every article it processes.

- A list, `articles_received_tracker`, is used as a counter (it's a list so it can be mutated from within the callback).

- The callback receives an `article` (a `dict`) from the stream.

- It logs the article's details and appends it to the tracker.

- It checks the count: `if len(articles_received_tracker) >= STOP_AFTER_N_ARTICLES:`

- If the count is met, it logs a warning and returns `False`.

- If the count is not met, it returns `True`.

A `True` value tells the `api_client` to "keep processing." A `False` value tells the `api_client` to "stop immediately," at which point it closes the stream connection and the `api_client.stream_articles()` function returns.

## Running the Application

To run the application, use the following command from the project's root:

```sh
python -m example.callback.callback
```
129 changes: 129 additions & 0 deletions example/callback/callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# pylint: disable=W0718, R0914, R0801, W0612

"""
Demonstrates the ability to stop a client callback midway through processing.

This script connects to the real-time article stream and uses the callback's
boolean return value to stop the stream after receiving 5 articles.
"""

import logging
import time

# --- Import custom modules ---
from modules.auth.auth_client import AuthClient
from modules.auth.helper import Helper
from modules.api.api_client import Client, Request
from modules.api.exceptions import APIRequestError, APIStatusError, APIDataError

# --- Setup Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Configuration ---
STOP_AFTER_N_ARTICLES = 5

def main():
"""Runs the callback stop demo"""
helper = None
auth_client = None

articles_received_tracker = []

def stream_callback(article: dict) -> bool:
"""
Callback function to process streamed articles.

This callback will log the received article and check if it's
time to stop the stream.

Args:
article (dict): The JSON object for the article.

Returns:
bool: True to continue processing, False to stop the stream.
"""
try:
article_name = article.get('name', article.get('identifier', 'Unknown'))
event_id = article.get('event', {}).get('identifier', 'unknown_event')

logger.info(
"[%s] Received article (event: %s): %s",
len(articles_received_tracker) + 1,
event_id,
article_name
)

articles_received_tracker.append(article)

# --- The Stop Logic ---
if len(articles_received_tracker) >= STOP_AFTER_N_ARTICLES:
logger.warning(
"Reached stop limit of %s articles. Returning False to stop stream.",
STOP_AFTER_N_ARTICLES
)
return False

except Exception as e:
logger.error("Error within callback function: %s", e)
return False
return True

try:
# --- Authentication Setup ---
logger.info("Setting up authentication...")
auth_client = AuthClient()
helper = Helper(auth_client)

api_client = Client(timeout=3600.0)

token = helper.get_access_token()
api_client.set_access_token(token)
logger.info("Succesfully authenticated!")

# --- Stream Demonstration ---
logger.info("\nStarting real-time stream callback demo...")

stream_req = Request(
fields=["name", "abstract", "event.*"]
)

logger.info(
"Connecting to real-time article stream (will stop after %s articles)...",
STOP_AFTER_N_ARTICLES
)

start_time = time.time()

api_client.stream_articles(stream_req, stream_callback)

end_time = time.time()

logger.info(
"Stream processing finished. Total articles received: %s",
len(articles_received_tracker)
)
logger.info("Stream was active for %.2f seconds.", end_time - start_time)
logger.info("\n--- Callback stop demo complete ---")

except (APIRequestError, APIStatusError, APIDataError) as e:
logger.fatal("API Error encountered: %s", e)
if isinstance(e, APIStatusError) and e.response and e.response.status_code == 401:
logger.error("Got 401 Unauthorized. Check your token permissions for the real-time stream.")
except ValueError as e:
logger.fatal("Configuration Error (check .env): %s", e)
except KeyboardInterrupt:
logger.info("\nUser interrupted stream. Shutting down.")
except Exception as e:
logger.fatal("An unexpected error ocurred: %s", e, exc_info=True)
finally:
# --- Graceful Shutdown ---
if helper:
logger.info("Shutting down helper and revoking tokens...")
helper.stop()
elif auth_client:
auth_client.close()
logger.info("Exiting!")

if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion example/snapshots/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ def main():
buffer.seek(0)

articles_found_in_snapshot = []
def snapshot_article_callback(article_json):
def snapshot_article_callback(article_json) -> bool:
"""Simple callback to collect article names"""
if 'name' in article_json:
articles_found_in_snapshot.append(article_json['name'])
elif 'identifier' in article_json:
articles_found_in_snapshot.append(article_json['identifier'])

return True

try:
api_client.read_all(buffer, snapshot_article_callback)
logger.info("Succesfully processed %s articles from snapshot.", len(articles_found_in_snapshot))
Expand Down
2 changes: 2 additions & 0 deletions example/streaming/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def article_callback(article):
logger.info("event.identifiers: %s", article.get('event', {}).get('identifier'))
logger.info("-----------END------------\n\n\n")

return True


def main():
"""Main execution function to initiate the article stream.
Expand Down
Loading