|
1 | 1 | The Python client library for the Opsqueue lightweight batch processing queue system. |
2 | 2 |
|
3 | | -## Installation instructions |
| 3 | +Find the full README with examples at https://github.com/channable/opsqueue |
| 4 | + |
| 5 | +## Getting Started: |
| 6 | + |
| 7 | +### 1. Grab the `opsqueue` binary and the Python client library |
| 8 | + |
| 9 | +1. Install the Opsqueue binary, using `cargo install opsqueue` (if you do not have Cargo/Rust installed yet, follow the instructions at https://rustup.rs/ first) |
| 10 | +2. Install the Python client using `pip install opsqueue`, `uv install opsqueue` or similar. |
| 11 | + |
| 12 | +### 2. Create a `Producer` |
| 13 | + |
| 14 | +```python |
| 15 | +import logging |
| 16 | +from opsqueue.producer import ProducerClient |
| 17 | +from collections.abc import Iterable |
| 18 | + |
| 19 | +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG) |
| 20 | + |
| 21 | +def file_to_words(filename: str) -> Iterable[str]: |
| 22 | + """ |
| 23 | + Iterates over each word and inter-word whitespace strings in a file |
| 24 | + while keeping at most one line in memory at a time. |
| 25 | + """ |
| 26 | + with open(filename) as input_file: |
| 27 | + for line in input_file: |
| 28 | + for word in line.split(): |
| 29 | + yield word |
| 30 | + |
| 31 | +def print_words(words: Iterable[str]) -> None: |
| 32 | + """ |
| 33 | + Prints all words and inter-word whitespace tokens |
| 34 | + without first loading the full string into memory |
| 35 | + """ |
| 36 | + for word in words: |
| 37 | + print(word, end="") |
| 38 | + |
| 39 | +def main() -> None: |
| 40 | + client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/") |
| 41 | + stream_of_words = file_to_words("lipsum.txt") |
| 42 | + stream_of_capitalized_words = client.run_submission(stream_of_words, chunk_size=4000) |
| 43 | + print_words(stream_of_capitalized_words) |
| 44 | + |
| 45 | +if __name__ == "__main__": |
| 46 | + main() |
| 47 | +``` |
| 48 | + |
| 49 | +### 3. Create a `Consumer` |
| 50 | + |
| 51 | +```python |
| 52 | +import logging |
| 53 | +from opsqueue.consumer import ConsumerClient, Strategy |
| 54 | + |
| 55 | +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) |
| 56 | + |
| 57 | +def capitalize_word(word: str) -> str: |
| 58 | + output = word.capitalize() |
| 59 | + # print(f"Capitalized word: {word} -> {output}") |
| 60 | + return output |
| 61 | + |
| 62 | +def main() -> None: |
| 63 | + client = ConsumerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/") |
| 64 | + client.run_each_op(capitalize_word, strategy=Strategy.Random()) |
| 65 | + |
| 66 | +if __name__ == "__main__": |
| 67 | + main() |
| 68 | +``` |
| 69 | + |
| 70 | + |
| 71 | +4. Run the Producer, queue and Consumer |
| 72 | + |
| 73 | +- Run `opsqueue`. |
| 74 | +- Run `python3 capitalize_text_consumer.py` to run a consumer. Feel free to start multiple instances of this program to try out consumer concurrency. |
| 75 | +- Run `python3 capitalize_text_producer.py` to run a producer. |
| 76 | + |
| 77 | +The order you start these in does not matter; systems will reconnect and continue after any kind of failure or disconnect. |
| 78 | + |
| 79 | +By default the queue will listen on `http://localhost:3999`. The exact port can of course be changed. |
| 80 | +Producer and Consumer need to share the same object store location to store the content of their submission chunks. |
| 81 | +In development, this can be a local folder as shown in the code above. |
| 82 | +In production, you probably want to use Google's GCS, Amazon's S3 or Microsoft's Azure buckets. |
| 83 | + |
| 84 | +Please tinker with above code! |
| 85 | +If you want more logging to look under the hood, run `RUST_LOG=debug opsqueue` to enable extra logging for the queue. |
| 86 | +The Producer/Consumer will use whatever log level is configured in Python. |
| 87 | + |
| 88 | +More examples can be found in `./libs/opsqueue_python/examples/` |
4 | 89 |
|
5 | | -1. Install the Python client using `pip`, `uv` or similar. |
6 | | -2. Install the Opsqueue binary, using `cargo install opsqueue` (if you do not have Cargo/Rust installed yet, follow the instructions at https://rustup.rs/ first) |
7 | | -3. Enjoy! |
8 | 90 |
|
9 | 91 | ## More Info |
10 | 92 |
|
|
0 commit comments