55
66from koi_net .exceptions import RequestError
77from koi_net .config .base import BaseNodeConfig
8+ from koi_net .infra import depends_on
89from koi_net .protocol .node import NodeProfile , NodeType
910from koi_net .protocol .edge import EdgeProfile , EdgeStatus , EdgeType , generate_edge_bundle
1011from koi_net .protocol .knowledge_object import KnowledgeObject
@@ -28,19 +29,12 @@ class NodeContactHandler(KnowledgeHandler):
2829 handler_type = HandlerType .Network
2930 rid_types = (KoiNetNode ,)
3031
31- def handle (self , kobj : KnowledgeObject ):
32- """Makes contact with providers of RID types of interest.
33-
34- When an incoming node knowledge object is identified as a provider
35- of an RID type of interest, this handler will propose a new edge
36- subscribing to future node events, and fetch existing nodes to catch
37- up to the current state.
38- """
32+ def process_node (self , node_rid : KoiNetNode , node_bundle : Bundle ):
3933 # prevents nodes from attempting to form a self loop
40- if kobj . rid == self .identity .rid :
34+ if node_rid == self .identity .rid :
4135 return
4236
43- node_profile = kobj . bundle .validate_contents (NodeProfile )
37+ node_profile = node_bundle .validate_contents (NodeProfile )
4438
4539 available_rid_types = list (
4640 set (self .config .koi_net .rid_types_of_interest ) &
@@ -51,7 +45,7 @@ def handle(self, kobj: KnowledgeObject):
5145 return
5246
5347 edge_rid = self .graph .get_edge (
54- source = kobj . rid ,
48+ source = node_rid ,
5549 target = self .identity .rid ,
5650 )
5751
@@ -74,7 +68,7 @@ def handle(self, kobj: KnowledgeObject):
7468 else :
7569 self .log .info (f"Proposing new edge with node provider { available_rid_types } " )
7670 edge_bundle = generate_edge_bundle (
77- source = kobj . rid ,
71+ source = node_rid ,
7872 target = self .identity .rid ,
7973 rid_types = available_rid_types ,
8074 edge_type = (
@@ -90,7 +84,7 @@ def handle(self, kobj: KnowledgeObject):
9084 self .log .info ("Catching up on network state" )
9185 try :
9286 payload = self .request_handler .fetch_rids (
93- node = kobj . rid ,
87+ node = node_rid ,
9488 rid_types = available_rid_types
9589 )
9690 except RequestError :
@@ -107,5 +101,25 @@ def handle(self, kobj: KnowledgeObject):
107101
108102 # marked as external since we are handling RIDs from another node
109103 # will fetch remotely instead of checking local cache
110- self .kobj_queue .push (rid = rid , source = kobj . rid )
104+ self .kobj_queue .push (rid = rid , source = node_rid )
111105 self .log .info ("Done" )
106+
107+ def handle (self , kobj : KnowledgeObject ):
108+ """Makes contact with providers of RID types of interest.
109+
110+ When an incoming node knowledge object is identified as a provider
111+ of an RID type of interest, this handler will propose a new edge
112+ subscribing to future node events, and fetch existing nodes to catch
113+ up to the current state.
114+ """
115+ self .process_node (kobj .rid , kobj .bundle )
116+
117+ @depends_on ("graph" , "kobj_worker" )
118+ def start (self ):
119+ self .log .info ("Starting node contact analysis on cached profiles..." )
120+ for rid in self .cache .list_rids (rid_types = (KoiNetNode ,)):
121+ bundle = self .cache .read (rid )
122+ if not bundle :
123+ continue
124+
125+ self .process_node (rid , bundle )
0 commit comments