Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions qunetsim/components/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
from inspect import signature
from queue import Queue
from threading import Thread

import matplotlib.pyplot as plt
import networkx as nx
Expand Down Expand Up @@ -38,8 +39,7 @@ def __init__(self):
self._quantum_routing_algo = nx.shortest_path
self._classical_routing_algo = nx.shortest_path
self._use_hop_by_hop = True
self._packet_queue = Queue()
self._stop_thread = False
self._packet_queues = {}
self._use_ent_swap = False
self._queue_processor_thread = None
self._delay = 0.1
Expand Down Expand Up @@ -178,6 +178,12 @@ def packet_drop_rate(self, drop_rate):
def arp(self):
return self.ARP

def add_queue(self, host_id):
self._packet_queues[host_id] = Queue()

def remove_queue(self, host_id):
del self._packet_queues[host_id]

@property
def num_hosts(self):
return len(self.arp.keys())
Expand All @@ -191,6 +197,7 @@ def add_host(self, host):
"""

Logger.get_instance().debug('host added: ' + host.host_id)
self.add_queue(host.host_id)
self.ARP[host.host_id] = host
self._update_network_graph(host)

Expand All @@ -214,6 +221,7 @@ def remove_host(self, host):

if host.host_id in self.ARP:
del self.ARP[host.host_id]
self.remove_queue(host.host_id)
if self.quantum_network.has_node(host.host_id):
self.quantum_network.remove_node(host.host_id)
if self.classical_network.has_node(host.host_id):
Expand Down Expand Up @@ -498,19 +506,17 @@ def transfer_qubits(r, s, original_sender=None):
i += 1
return True

def _process_queue(self):
def _process_queues(self):
"""
Runs a thread for processing the packets in the packet queue.
Runs multiple threads for processing the packets in the packet queues.
"""

while True:

packet = self._packet_queue.get()
def process_queue(packet_queue):
packet = packet_queue.get()

if not packet:
# Stop the network
self._stop_thread = True
break
# If None packet is received, then stop thread
if not packet.payload:
return

# Artificially delay the network
if self.delay > 0:
Expand All @@ -522,14 +528,14 @@ def _process_queue(self):
Logger.get_instance().log("PACKET DROPPED")
if packet.payload_type == Constants.QUANTUM:
packet.payload.release()
continue
return

sender, receiver = packet.sender, packet.receiver

if packet.payload_type == Constants.QUANTUM:
if not self._route_quantum_info(sender, receiver,
[packet.payload]):
continue
return

try:
if packet.protocol == Constants.RELAY and not self.use_hop_by_hop:
Expand Down Expand Up @@ -584,17 +590,30 @@ def _process_queue(self):
Logger.get_instance().error(
"route couldn't be calculated, value error")
except Exception as e:
import traceback
traceback.format_exc()
Logger.get_instance().error('Error in network: ' + str(e))

while True:
Comment thread
WingCode marked this conversation as resolved.
threads = []
for queue in self._packet_queues.values():
if not queue.empty():
threads.append(Thread(target=process_queue, args=[queue], daemon=False))

for t in threads:
t.start()

for t in threads:
t.join()

def send(self, packet):
"""
Puts the packet to the packet queue of the network.

Args:
packet (Packet): Packet to be sent
"""

self._packet_queue.put(packet)
self._packet_queues.get(packet.sender).put(packet)

def stop(self, stop_hosts=False):
"""
Expand All @@ -606,8 +625,10 @@ def stop(self, stop_hosts=False):
if stop_hosts:
for host in self.ARP:
self.ARP[host].stop(release_qubits=True)
# Send None to queue to stop the queue
self.send(RoutingPacket(sender=host, receiver=None, protocol=None, payload_type=None, payload=None,
ttl=None, route=None))

self.send(None) # Send None to queue to stop the queue
if self._backend is not None:
self._backend.stop()
except Exception as e:
Expand All @@ -624,7 +645,7 @@ def start(self, nodes=None, backend=None):
self._backend = backend
if nodes is not None:
self._backend.start(nodes=nodes)
self._queue_processor_thread = DaemonThread(target=self._process_queue)
self._queue_processor_thread = DaemonThread(target=self._process_queues)

def draw_classical_network(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion qunetsim/objects/packets/routing_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, sender, receiver, protocol, payload_type, payload, ttl, route
ttl(int): Time-to-Live parameter
route (List): Route the packet takes to its target host.
"""
if not isinstance(payload, Packet):
if not isinstance(payload, Packet) and payload is not None:
raise ValueError("For the routing packet the payload has to be a packet.")

self._ttl = ttl
Expand Down