Skip to content

Commit a7ecbfc

Browse files
authored
Merge pull request #5 from tqsd/process_pool
Process pool
2 parents bfb0687 + abf54dd commit a7ecbfc

8 files changed

Lines changed: 257 additions & 26 deletions

eqsn/gates.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import numpy as np
44
from eqsn.qubit_thread import SINGLE_GATE, MERGE_SEND, MERGE_ACCEPT, MEASURE,\
55
MEASURE_NON_DESTRUCTIVE, GIVE_QUBITS_AND_TERMINATE, \
6-
CONTROLLED_GATE, QubitThread
6+
CONTROLLED_GATE, NEW_QUBIT, ADD_MERGED_QUBITS_TO_DICT
77
from eqsn.shared_dict import SharedDict
8+
from eqsn.worker_process import WorkerProcess
9+
from eqsn.process_picker import ProcessPicker
810

911

1012
class EQSN(object):
@@ -17,19 +19,28 @@ class EQSN(object):
1719
def __init__(self):
1820
self.manager = multiprocessing.Manager()
1921
self.shared_dict = SharedDict.get_instance()
22+
# cpu_count = multiprocessing.cpu_count()
23+
cpu_count = 1
24+
process_queue_list = []
25+
for _ in range(cpu_count):
26+
q = multiprocessing.Queue()
27+
new_worker = WorkerProcess(q)
28+
p = multiprocessing.Process(target=new_worker.run, args=())
29+
p.start()
30+
process_queue_list.append((p, q))
31+
self.process_picker = ProcessPicker.get_instance(
32+
cpu_count, process_queue_list)
2033

2134
def new_qubit(self, q_id):
2235
"""
2336
Creates a new qubit with an id.
2437
2538
Args:
26-
id (String): Id of the new qubit.
39+
q_id (String): Id of the new qubit.
2740
"""
28-
q = multiprocessing.Queue()
29-
thread = QubitThread(q_id, q)
30-
p = multiprocessing.Process(target=thread.run, args=())
41+
p, q = self.process_picker.get_next_process_queue()
42+
q.put([NEW_QUBIT, q_id])
3143
self.shared_dict.set_thread_with_id(q_id, p, q)
32-
p.start()
3344
logging.debug("Created new qubit with id %s.", q_id)
3445

3546
def stop_all(self):
@@ -38,6 +49,8 @@ def stop_all(self):
3849
"""
3950
self.shared_dict.send_all_threads(None)
4051
self.shared_dict.stop_all_threads()
52+
self.shared_dict.stop_shared_dict()
53+
self.process_picker.stop_process_picker()
4154

4255
def X_gate(self, q_id):
4356
"""
@@ -144,12 +157,13 @@ def merge_qubits(self, q_id1, q_id2):
144157
q1 = l[0]
145158
q2 = l[1]
146159
merge_q = self.manager.Queue()
147-
q1.put([MERGE_SEND, merge_q])
148-
q2.put([MERGE_ACCEPT, merge_q])
160+
q1.put([MERGE_SEND, q_id1, merge_q])
161+
q2.put([MERGE_ACCEPT, q_id2, merge_q])
149162
qubits_q = self.manager.Queue()
150-
q1.put([GIVE_QUBITS_AND_TERMINATE, qubits_q])
163+
q1.put([GIVE_QUBITS_AND_TERMINATE, q_id1, qubits_q])
151164
qubits = qubits_q.get()
152-
self.shared_dict.change_thread_and_queue_of_ids_and_join(
165+
q2.put([ADD_MERGED_QUBITS_TO_DICT, q_id2, qubits])
166+
self.shared_dict.change_thread_and_queue_of_ids(
153167
qubits, q_id2)
154168

155169
def cnot_gate(self, q_id1, q_id2):

eqsn/process_picker.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
class ProcessPicker(object):
2+
"""
3+
Decides which process allocates new Qubits.
4+
"""
5+
__instance = None
6+
7+
@staticmethod
8+
def get_instance(cpu_count, process_queue_list):
9+
if ProcessPicker.__instance is None:
10+
return ProcessPicker(cpu_count, process_queue_list)
11+
return ProcessPicker.__instance
12+
13+
def __init__(self, cpu_count, process_queue_list):
14+
if ProcessPicker.__instance is not None:
15+
raise ValueError(
16+
"Use get instance to get the Process picker class.")
17+
ProcessPicker.__instance = self
18+
self.amount_processes = cpu_count
19+
self.pointer = 0
20+
self.process_queue_list = process_queue_list
21+
22+
def get_next_process_queue(self):
23+
"""
24+
Called in threads may override pointer, but we do not care, since
25+
the pointer value is not essential.
26+
"""
27+
res_q = self.process_queue_list[self.pointer % self.amount_processes]
28+
self.pointer += 1
29+
return res_q
30+
31+
def stop_process_picker(self):
32+
ProcessPicker.__instance = None

eqsn/qubit_thread.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
MERGE_SEND = 5
1313
GIVE_QUBITS_AND_TERMINATE = 6
1414
MEASURE_NON_DESTRUCTIVE = 7
15+
NEW_QUBIT = 8
16+
ADD_MERGED_QUBITS_TO_DICT = 9
1517

1618

1719
class QubitThread(object):
@@ -212,18 +214,15 @@ def run(self):
212214
"""
213215
Run in loop and wait to receive tasks to perform.
214216
"""
215-
amount_single_gate = 0
216217
while True:
217218
item = self.queue.get()
218219
if item is None:
219220
return
220221
elif item[0] == SINGLE_GATE:
221222
self.apply_single_gate(item[1], item[2])
222-
amount_single_gate += 1
223223
elif item[0] == CONTROLLED_GATE:
224224
self.apply_controlled_gate(item[1], item[2], item[3])
225225
elif item[0] == MEASURE:
226-
sys.stdout.flush()
227226
self.measure(item[1], item[2])
228227
# no qubit left, terminate
229228
if len(self.qubits) == 0:

eqsn/shared_dict.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ def get_instance():
7979
return SharedDict()
8080
return SharedDict.__instance
8181

82+
@staticmethod
83+
def get_new_instance():
84+
SharedDict.__instance = None
85+
return SharedDict()
86+
8287
def __init__(self):
8388
if SharedDict.__instance is not None:
8489
raise Exception("Call get instance to get this class!")
@@ -90,6 +95,17 @@ def __init__(self):
9095
self.thread_list = []
9196
self.queue_list = []
9297

98+
def get_queues_and_threads_for_ids(self, q_id_list):
99+
ret = []
100+
self.lock.acquire_read()
101+
for q_id in q_id_list:
102+
res = self.id_to_queue[q_id]
103+
res2 = self.id_to_thread[q_id]
104+
if res not in ret:
105+
ret.append((res, res2))
106+
self.lock.release_read()
107+
return ret
108+
93109
def get_queues_for_ids(self, q_id_list):
94110
ret = []
95111
self.lock.acquire_read()
@@ -150,6 +166,15 @@ def change_thread_and_queue_of_ids_and_join(self, q_ids, q_id_new_thread):
150166
self.id_to_queue[q_id] = new_queue
151167
self.lock.release_write()
152168

169+
def change_thread_and_queue_of_ids(self, q_ids, q_id_new_thread):
170+
self.lock.acquire_write()
171+
new_thread = self.id_to_thread[q_id_new_thread]
172+
new_queue = self.id_to_queue[q_id_new_thread]
173+
for q_id in q_ids:
174+
self.id_to_thread[q_id] = new_thread
175+
self.id_to_queue[q_id] = new_queue
176+
self.lock.release_write()
177+
153178
def send_all_threads(self, msg):
154179
self.lock.acquire_write()
155180
for p in self.queue_list:
@@ -161,3 +186,6 @@ def stop_all_threads(self):
161186
for p in self.thread_list:
162187
p.join()
163188
self.lock.release_write()
189+
190+
def stop_shared_dict(self):
191+
SharedDict.__instance = None

eqsn/worker_process.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import logging
2+
import threading
3+
from queue import Queue
4+
from eqsn.shared_dict import SharedDict
5+
from eqsn.qubit_thread import SINGLE_GATE, MERGE_SEND, MERGE_ACCEPT, MEASURE,\
6+
MEASURE_NON_DESTRUCTIVE, GIVE_QUBITS_AND_TERMINATE, \
7+
CONTROLLED_GATE, NEW_QUBIT, ADD_MERGED_QUBITS_TO_DICT, QubitThread
8+
9+
10+
class WorkerProcess(object):
11+
12+
def __init__(self, queue):
13+
self.queue = queue
14+
# Get a new instance, since their might be one from the old process
15+
self.shared_dict = SharedDict.get_new_instance()
16+
17+
def run(self):
18+
"""
19+
Run in loop and wait to receive tasks to perform.
20+
"""
21+
amount_single_gate = 0
22+
while True:
23+
item = self.queue.get()
24+
if item is None:
25+
self.stop_all()
26+
return
27+
elif item[0] == NEW_QUBIT:
28+
self.new_qubit(item[1])
29+
elif item[0] == SINGLE_GATE:
30+
self.apply_single_gate(item[1], item[2])
31+
amount_single_gate += 1
32+
elif item[0] == CONTROLLED_GATE:
33+
self.apply_controlled_gate(item[1], item[2], item[3])
34+
elif item[0] == MEASURE:
35+
self.measure(item[1], item[2])
36+
elif item[0] == MERGE_ACCEPT:
37+
self.merge_accept(item[1], item[2])
38+
elif item[0] == MERGE_SEND:
39+
self.merge_send(item[1], item[2])
40+
elif item[0] == GIVE_QUBITS_AND_TERMINATE:
41+
self.give_qubits_and_terminate(item[1], item[2])
42+
elif item[0] == MEASURE_NON_DESTRUCTIVE:
43+
self.measure_non_destructive(item[1], item[2])
44+
elif item[0] == ADD_MERGED_QUBITS_TO_DICT:
45+
self.add_merged_qubits_to_thread(item[1], item[2])
46+
else:
47+
raise ValueError("Command does not exist!")
48+
49+
def new_qubit(self, q_id):
50+
"""
51+
Creates a new qubit with an id.
52+
53+
Args:
54+
id (String): Id of the new qubit.
55+
"""
56+
q = Queue()
57+
thread = QubitThread(q_id, q)
58+
p = threading.Thread(target=thread.run, args=())
59+
self.shared_dict.set_thread_with_id(q_id, p, q)
60+
p.start()
61+
logging.debug("Created new qubit with id %s.", q_id)
62+
63+
def measure(self, q_id, channel):
64+
temp_queue = Queue()
65+
q = self.shared_dict.get_queues_for_ids([q_id])[0]
66+
q.put([MEASURE, q_id, temp_queue])
67+
res = temp_queue.get()
68+
channel.put(res)
69+
self.shared_dict.delete_id_and_check_to_join_thread(q_id)
70+
71+
def measure_non_destructive(self, q_id, channel):
72+
q = self.shared_dict.get_queues_for_ids([q_id])[0]
73+
q.put([MEASURE_NON_DESTRUCTIVE, q_id, channel])
74+
75+
def give_qubits_and_terminate(self, q_id, queue):
76+
temp_queue = Queue()
77+
q = self.shared_dict.get_queues_for_ids([q_id])[0]
78+
q.put([GIVE_QUBITS_AND_TERMINATE, temp_queue])
79+
qubits = temp_queue.get()
80+
# remove all qubits
81+
for c in qubits:
82+
self.shared_dict.delete_id_and_check_to_join_thread(c)
83+
# send the qubits to the main process
84+
queue.put(qubits)
85+
86+
def add_merged_qubits_to_thread(self, q_id, qubits):
87+
q, t = self.shared_dict.get_queues_and_threads_for_ids([q_id])[0]
88+
for qubit in qubits:
89+
self.shared_dict.set_thread_with_id(qubit, t, q)
90+
91+
def stop_all(self):
92+
"""
93+
Stops the simulator from running.
94+
"""
95+
self.shared_dict.send_all_threads(None)
96+
self.shared_dict.stop_all_threads()
97+
self.shared_dict.stop_shared_dict()
98+
99+
def apply_single_gate(self, gate, q_id):
100+
"""
101+
Applies a single gate to a Qubit.
102+
"""
103+
q = self.shared_dict.get_queues_for_ids([q_id])[0]
104+
q.put([SINGLE_GATE, gate, q_id])
105+
106+
def apply_controlled_gate(self, gate, q_id1, q_id2):
107+
"""
108+
Applies a controlled gate, where the gate is applied to
109+
q_id1 and controlled by q_id2.
110+
111+
Args:
112+
q_id1 (String): Id of the Qubit on which the X gate is applied.
113+
q_id2 (String): Id of the Qubit which controls the gate.
114+
"""
115+
self.merge_qubits(q_id1, q_id2)
116+
q = self.shared_dict.get_queues_for_ids([q_id1])[0]
117+
q.put([CONTROLLED_GATE, gate, q_id1, q_id2])
118+
119+
def merge_send(self, q_id, queue):
120+
q = self.shared_dict.get_queues_for_ids([q_id])[0]
121+
q.put([MERGE_SEND, queue])
122+
123+
def merge_accept(self, q_id, queue):
124+
q = self.shared_dict.get_queues_for_ids([q_id])[0]
125+
q.put([MERGE_ACCEPT, queue])
126+
127+
def merge_qubits(self, q_id1, q_id2):
128+
"""
129+
Merges two qubits to one process, if they are not already
130+
running in the same process.
131+
132+
Args:
133+
q_id1 (String): Id of the Qubit merged into q_id2.
134+
q_id2 (String): Id of the Qubit merged with q_id1.
135+
"""
136+
l = self.shared_dict.get_queues_for_ids([q_id1, q_id2])
137+
if len(l) == 1:
138+
return # Already merged
139+
else:
140+
logging.debug("Merge Qubits %s and %s.", q_id1, q_id2)
141+
q1 = l[0]
142+
q2 = l[1]
143+
merge_q = Queue()
144+
q1.put([MERGE_SEND, merge_q])
145+
q2.put([MERGE_ACCEPT, merge_q])
146+
qubits_q = Queue()
147+
q1.put([GIVE_QUBITS_AND_TERMINATE, qubits_q])
148+
qubits = qubits_q.get()
149+
self.shared_dict.change_thread_and_queue_of_ids_and_join(
150+
qubits, q_id2)

tests/test_measuring_from_threads.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ def test_measure_from_threads():
1111
def measure_or_hadamard(id):
1212
n = random.randrange(10, 100, 1)
1313
for _ in range(n):
14-
time.sleep(0.1)
14+
time.sleep(0.05)
1515
q_sim.H_gate(id)
1616
print("Finished Hadamard, measure qubit %s!" % id)
1717
print(q_sim.measure(id))
1818
print("Finished with Measure!")
1919

20-
nr_threads = 10
20+
nr_threads = 5
2121
ids = [str(x) for x in range(nr_threads)]
2222
for id in ids:
2323
q_sim.new_qubit(id)
@@ -27,7 +27,8 @@ def measure_or_hadamard(id):
2727
q_sim.cnot_gate(id1, c)
2828
thread_list = []
2929
for id in ids:
30-
t = threading.Thread(target=measure_or_hadamard, args=(id))
30+
print(id)
31+
t = threading.Thread(target=measure_or_hadamard, args=(id,))
3132
t.start()
3233
thread_list.append(t)
3334
for t in thread_list:

tests/test_multi_gate.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from eqsn import EQSN
2+
import time
23

34

45
def test_epr_creation():
@@ -36,4 +37,5 @@ def test_5_qubits_gate():
3637

3738
if __name__ == "__main__":
3839
test_epr_creation()
40+
time.sleep(0.1)
3941
test_5_qubits_gate()

0 commit comments

Comments
 (0)