-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbase.py
More file actions
64 lines (51 loc) · 1.72 KB
/
base.py
File metadata and controls
64 lines (51 loc) · 1.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import asyncio
import logging
from typing import List, Tuple
from synapse.api.node_pb2 import NodeConfig, NodeType
from synapse.server.status import Status
from synapse.utils.types import SynapseData
class BaseNode(object):
def __init__(self, id, type) -> None:
self.id: int = id
self.type: NodeType = type
self.socket: Tuple[str, int] = None
self.logger = logging.getLogger(f"[{self.__class__.__name__} id: {self.id}]")
self.data_queue = asyncio.Queue()
self.downstream_nodes = []
self.running = False
self.tasks: List[asyncio.Task] = []
def config(self) -> NodeConfig:
return NodeConfig(
id=self.id,
type=self.type,
)
def configure(self, config) -> Status:
raise NotImplementedError
def add_downstream_node(self, node):
self.downstream_nodes.append(node)
def start(self):
self.logger.info("starting...")
if self.running:
return Status()
self.running = True
task = asyncio.create_task(self.run())
self.tasks.append(task)
self.logger.info("started")
return Status()
def stop(self):
self.logger.info("stopping...")
if not self.running:
return Status()
self.running = False
for task in self.tasks:
task.cancel()
self.tasks = []
self.logger.info("Stopped")
return Status()
async def on_data_received(self, data: SynapseData):
await self.data_queue.put(data)
async def emit_data(self, data):
for node in self.downstream_nodes:
asyncio.create_task(node.on_data_received(data))
def tap_connections(self):
return []