Skip to content

Commit 5065cef

Browse files
committed
refactor: make engine singleton and restricted pipeline instantiation
1 parent 02f1305 commit 5065cef

3 files changed

Lines changed: 75 additions & 11 deletions

File tree

python/phaeton/_internal.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ACCESS_TOKEN = object()

python/phaeton/engine.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,44 @@
1+
import threading
12
from typing import List, Union, Dict
23
from .pipeline import Pipeline
4+
from ._internal import ACCESS_TOKEN
35

46
class EngineResult:
57
"""
68
Encapsulates the statistical results of a pipeline execution.
9+
This class is immutable (Read-Only).
710
811
Attributes:
912
processed (int): Total number of rows read from the source.
1013
saved (int): Total number of rows successfully cleaned and saved.
1114
quarantined (int): Total number of rows rejected and sent to quarantine.
1215
duration (int): Execution time in milliseconds.
1316
"""
14-
def __init__(self, stats: Dict[str, int]):
15-
self.processed = stats.get("processed_rows", 0)
16-
self.saved = stats.get("saved_rows", 0)
17-
self.quarantined = stats.get("quarantined_rows", 0)
18-
self.duration = stats.get("duration_ms", 0)
1917

18+
__slots__ = ('_processed', '_saved', '_quarantined', '_duration')
19+
20+
def __init__(self, stats: Dict[str, int]):
21+
self._processed = stats.get("processed_rows", 0)
22+
self._saved = stats.get("saved_rows", 0)
23+
self._quarantined = stats.get("quarantined_rows", 0)
24+
self._duration = stats.get("duration_ms", 0)
25+
26+
@property
27+
def processed(self) -> int:
28+
return self._processed
29+
30+
@property
31+
def saved(self) -> int:
32+
return self._saved
33+
34+
@property
35+
def quarantined(self) -> int:
36+
return self._quarantined
37+
38+
@property
39+
def duration(self) -> int:
40+
return self._duration
41+
2042
def __repr__(self):
2143
return (f"<EngineResult | Processed: {self.processed}, "
2244
f"Saved: {self.saved}, Quarantined: {self.quarantined} "
@@ -30,6 +52,17 @@ class Engine:
3052
of one or multiple pipelines simultaneously.
3153
"""
3254

55+
_instance = None
56+
_lock = threading.Lock()
57+
_initialized = False
58+
59+
def __new__(cls, *args, **kwargs):
60+
if cls._instance is None:
61+
with cls._lock:
62+
if cls._instance is None:
63+
cls._instance = super(Engine, cls).__new__(cls)
64+
return cls._instance
65+
3366
def __init__(self, workers: int = 0, batch_size: int = 10000):
3467
"""
3568
Initialize the Engine.
@@ -39,10 +72,15 @@ def __init__(self, workers: int = 0, batch_size: int = 10000):
3972
Set to 0 to automatically use all available cores. Defaults to 0.
4073
batch_size (int, optional): Number of rows to process in each batch.
4174
Defaults to 10000.
42-
43-
4475
"""
45-
self.config = {"workers": workers, "batch_size": batch_size}
76+
77+
if self._initialized:
78+
return
79+
80+
with self._lock:
81+
if not self._initialized:
82+
self.config = {"workers": workers, "batch_size": batch_size}
83+
self._initialized = True
4684

4785
def ingest(self, source: str) -> Pipeline:
4886
"""
@@ -54,7 +92,7 @@ def ingest(self, source: str) -> Pipeline:
5492
Returns:
5593
Pipeline: A new pipeline builder instance.
5694
"""
57-
return Pipeline(source, self.config)
95+
return Pipeline(source, self.config, token=ACCESS_TOKEN)
5896

5997
def validate(self, pipelines: Union[Pipeline, List[Pipeline]]) -> bool:
6098
"""

python/phaeton/pipeline.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import List, Dict, Optional, Literal, Union
1+
from typing import List, Dict, Optional, Literal, Union, Any
2+
from ._internal import ACCESS_TOKEN
3+
import copy
24

35
# --- Type Definitions ---
46
HeaderCase = Literal["snake", "camel", "pascal", "kebab", "constant"]
@@ -16,7 +18,14 @@ class Pipeline:
1618
All methods in this class are lazy; they record instructions ("steps")
1719
that are executed only when `.run()` or `Engine.exec()` is called.
1820
"""
19-
def __init__(self, source: str, config: dict):
21+
def __init__(self, source: str, config: dict, token: Any = None):
22+
23+
if token is not ACCESS_TOKEN:
24+
raise PermissionError(
25+
"Illegal Access: Pipeline cannot be instantiated directly. "
26+
"You must use 'eng.ingest()' to create a pipeline."
27+
)
28+
2029
self.source = source
2130
self.config = config
2231
self.steps: List[Dict] = []
@@ -299,6 +308,22 @@ def dump(self, path: str, format: ExportFormat = "parquet") -> "Pipeline":
299308
self.steps.append({"action": "dump", "path": path, "format": format})
300309
return self
301310

311+
def fork(self) -> "Pipeline":
312+
"""
313+
Creates a new independent branch (fork) of the pipeline logic.
314+
Inherits all previous steps but RESETS output targets.
315+
"""
316+
317+
318+
new_obj = copy.copy(self)
319+
320+
new_obj.steps = copy.deepcopy(self.steps)
321+
322+
new_obj.output_target = None
323+
new_obj.quarantine_path = None
324+
325+
return new_obj
326+
302327
def run(self):
303328
"""
304329
Triggers a single pipeline execution immediately.

0 commit comments

Comments
 (0)