Skip to content

Commit a0d784e

Browse files
authored
Merge pull request patr1ck-m#31 from patr1ck-m/documentation/readme
Add README and ARCHITECTURE documentation
2 parents 9ef7383 + 578ca97 commit a0d784e

2 files changed

Lines changed: 465 additions & 85 deletions

File tree

ARCHITECTURE.md

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Architecture & Developer Documentation
2+
3+
This document provides an in-depth look at the architecture of the Real-Time Stream Processing Library, implementation details, and guidelines for extending the library with custom components.
4+
5+
## Table of Contents
6+
7+
- [Architecture Overview](#architecture-overview)
8+
- [Effects and Handlers](#effects-and-handlers)
9+
- [Handler Exchangeability](#handler-exchangeability)
10+
- [Stream Model](#stream-model)
11+
- [Module Structure](#module-structure)
12+
- [Implementation Details](#implementation-details)
13+
- [The Counter Record](#the-counter-record)
14+
- [Sliding Window Implementation](#sliding-window-implementation)
15+
- [File I/O](#file-io)
16+
- [FFI Usage](#ffi-usage)
17+
- [Extending the Library](#extending-the-library)
18+
- [Writing a Custom Anomaly Detector](#writing-a-custom-anomaly-detector)
19+
- [Writing a Custom Input Source](#writing-a-custom-input-source)
20+
- [Writing a Custom Logger](#writing-a-custom-logger)
21+
- [Acknowledgments](#acknowledgments)
22+
23+
---
24+
25+
## Architecture Overview
26+
27+
### Effects and Handlers
28+
29+
The entire library is built around Effekt's effect system. Instead of calling functions that return values directly to connect the components, components communicate through effects that get handled by outer handlers from another component.
30+
31+
The main effects used are:
32+
33+
- **`read[Event]`**: Pull-based stream input. When a component needs the next event, it performs `do read[Event]()`. A handler somewhere up the call stack provides the actual value.
34+
- **`emit[Event]`**: Push-based stream output. Components emit results using `do emit(event)`. Handlers decide what to do with each value.
35+
- **`AnomalyDetection`**: Interface with `anomaly()` and `noAnomaly()` operations. Anomaly detectors call these, and handlers decide how to react (log, alert, store, etc.).
36+
- **`stop`**: Used to signal the end of the stream.
37+
38+
The benefit of this design is composability and exchangability of handlers: you stack handlers from the different components to build pipelines:
39+
40+
```js
41+
with anomaly_logger::logToConsole() // 4. Handle anomaly results
42+
with minMaxAnomalyDetector(0.0, 0.25) // 3. Detect anomalies
43+
with cpuUsagePullStream(100) // 0. Provide events
44+
with pullStreamDelayer(100) // 1. Add delay between reads
45+
aggregateMean() // 2. Aggregate events
46+
```
47+
48+
Each layer shares the same handler "interface", making components independent and handlers swappable. Also new handlers can be easily added without changing the whole pipeline (see [Handler Exchangeability](#handler-exchangeability)).
49+
The downside of this implementation is that the logical flow can be harder to follow, as the stream input is in the middle of the stack (see example above).
50+
51+
### Handler Exchangeability
52+
53+
A key design principle is that **handlers within each component category share the same interface**. This means you can swap out any handler for another without changing the rest of your pipeline.
54+
55+
For example, all anomaly loggers handle the `AnomalyDetection` effect the same way:
56+
- `logAnomaliesToConsole()` - prints only anomalies
57+
- `logToConsole()` - prints all events
58+
- `logToFileCsv(path)` - writes to CSV
59+
- `logToFileCsvRethrow(path)` - writes to CSV AND forwards to next handler
60+
61+
Switching from console to file logging is just changing one line. The rest of the pipeline stays exactly the same.
62+
63+
Similarly for the rest of the library - whether you use `csvFeedPull()`, `normalDistributedPull()`, or `cpuUsagePullStream()`, they all provide events via `read[Event]`, so any aggregator works with any input source and any anomaly detector works with any aggregator.
64+
65+
### Stream Model
66+
67+
The library uses two stream models:
68+
69+
1. **Pull streams**: The consumer requests data by performing `read[Event]`. The handler decides where the data comes from. This is lazy—no work happens until someone reads.
70+
71+
2. **Push streams**: The producer emits data by performing `emit[Event]`. The handler decides what to do with each value.
72+
73+
You can convert between them using `pushToPullStream`:
74+
75+
```js
76+
pushToPullStream[Event]() {
77+
// push stream producer (emits events)
78+
} {
79+
// pull stream consumer (reads events)
80+
}
81+
```
82+
83+
Since we do not work with real parallelism or concurrency here, it is not possible to run the data input and the aggregation/anomaly detection in separate thread and let them communicate via a buffer. This could be an interesting extension for future work since it then could handle bursts of incoming data. With this implementation the aggregator pulls data as fast as it can process it from the input source. Therefore the input source doesn't run independently and hast to wait until the aggregator requests the next event.
84+
85+
### Module Structure
86+
87+
```
88+
src/lib/
89+
├── event.effekt # Event record definition and utilities
90+
├── counter.effekt # Running statistics (sum, mean, variance, stddev)
91+
├── aggregation.effekt # Aggregation functions (min, max, mean, median)
92+
├── anomaly_detection.effekt # Anomaly detection strategies
93+
├── stream_input.effekt # CSV and simulated data sources
94+
├── cpu_utilization_input.effekt # Real CPU usage (Node.js FFI)
95+
├── event_logger.effekt # Logging handlers for events
96+
├── anomaly_logger.effekt # Logging handlers for anomaly results
97+
├── csv.effekt # CSV parsing utilities (from @jiribenes, adapted as described in the file)
98+
├── timestamp.effekt # Timestamp utilities
99+
└── typeconversion.effekt # String to number conversions
100+
```
101+
102+
---
103+
104+
## Implementation Details
105+
106+
### The Counter Record
107+
108+
The `Counter` record is used internally by aggregators and detectors to compute running statistics in O(1) space and time complexity per event provided:
109+
110+
```js
111+
record Counter(sum: Double, sumSq: Double, count: Int)
112+
```
113+
114+
It tracks the sum and sum of squares, which allows computing mean, variance, and standard deviation without storing all values. This is based on the mathematical identity:
115+
116+
$$\text{Var}(X) = \frac{1}{n-1}\left(\sum x_i^2 - \frac{(\sum x_i)^2}{n}\right)$$
117+
118+
### Sliding Window Implementation
119+
120+
Windowed aggregations use a fixed-size array as a circular buffer. For min/max, we only recompute from scratch when the outgoing value equals the current min/max. This gives O(1) amortized time in practice.
121+
122+
### File I/O
123+
124+
To support logging to CSV files in long-running processes, the data is buffered and flushed periodically. This reduces the number of write operations and improves performance.
125+
126+
### FFI Usage
127+
128+
The library uses minimal FFI:
129+
- `infinity` and `negInfinity` for initial min/max values (JS `Infinity`)
130+
- `getCpuUsage()` for reading CPU stats via Node.js `os` module
131+
- `toDoubleUnsafe(String)`, `isNaN(String)` and `toDouble(String)` for string-to-number conversions
132+
133+
---
134+
135+
## Extending the Library
136+
137+
### Writing a Custom Anomaly Detector
138+
139+
To add your own detection strategy, write a function that reads events and calls the `AnomalyDetection` effect:
140+
141+
```js
142+
def myCustomDetector(param: Double): Unit / { read[Event], AnomalyDetection } = {
143+
with boundary
144+
while (true) {
145+
val ev = do read[Event]()
146+
// your detection logic here
147+
if (isAnomalous) {
148+
do anomaly(AnomalyEvaluation(ev, true, score))
149+
} else {
150+
do noAnomaly(AnomalyEvaluation(ev, false, 0.0))
151+
}
152+
}
153+
}
154+
```
155+
156+
### Writing a Custom Input Source
157+
158+
Implement a handler for `read[Event]`:
159+
160+
```js
161+
def myDataSource() { body: () => Unit / read[Event] }: Unit = {
162+
try body() with read[Event] {
163+
val nextEvent = // fetch from somewhere
164+
resume { nextEvent }
165+
}
166+
}
167+
```
168+
169+
### Writing a Custom Logger
170+
171+
Handle the `AnomalyDetection` or `emit[Event]` effects:
172+
173+
```js
174+
def myLogger() { body: () => Unit / emit[Event] }: Unit = {
175+
try body() with emit[Event] { ev =>
176+
// do something with ev
177+
resume(())
178+
}
179+
}
180+
```
181+
182+
---
183+
184+
## Acknowledgments
185+
186+
The library design follows the principle of keeping the FFI surface minimal and implementing as much as possible in pure Effekt.

0 commit comments

Comments
 (0)