Skip to content

Commit b6ada53

Browse files
authored
Merge pull request #7 from tqsd/making_merge_atomar
Making merge atomar
2 parents a7ecbfc + a29832b commit b6ada53

9 files changed

Lines changed: 61 additions & 50 deletions

eqsn/gates.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
import numpy as np
44
from eqsn.qubit_thread import SINGLE_GATE, MERGE_SEND, MERGE_ACCEPT, MEASURE,\
5-
MEASURE_NON_DESTRUCTIVE, GIVE_QUBITS_AND_TERMINATE, \
5+
MEASURE_NON_DESTRUCTIVE, \
66
CONTROLLED_GATE, NEW_QUBIT, ADD_MERGED_QUBITS_TO_DICT
77
from eqsn.shared_dict import SharedDict
88
from eqsn.worker_process import WorkerProcess
@@ -19,17 +19,16 @@ class EQSN(object):
1919
def __init__(self):
2020
self.manager = multiprocessing.Manager()
2121
self.shared_dict = SharedDict.get_instance()
22-
# cpu_count = multiprocessing.cpu_count()
23-
cpu_count = 1
24-
process_queue_list = []
22+
cpu_count = multiprocessing.cpu_count()
23+
self.process_queue_list = []
2524
for _ in range(cpu_count):
2625
q = multiprocessing.Queue()
2726
new_worker = WorkerProcess(q)
2827
p = multiprocessing.Process(target=new_worker.run, args=())
2928
p.start()
30-
process_queue_list.append((p, q))
29+
self.process_queue_list.append((p, q))
3130
self.process_picker = ProcessPicker.get_instance(
32-
cpu_count, process_queue_list)
31+
cpu_count, self.process_queue_list)
3332

3433
def new_qubit(self, q_id):
3534
"""
@@ -47,8 +46,9 @@ def stop_all(self):
4746
"""
4847
Stops the simulator from running.
4948
"""
50-
self.shared_dict.send_all_threads(None)
51-
self.shared_dict.stop_all_threads()
49+
for p, q in self.process_queue_list:
50+
q.put(None)
51+
p.join()
5252
self.shared_dict.stop_shared_dict()
5353
self.process_picker.stop_process_picker()
5454

@@ -153,18 +153,20 @@ def merge_qubits(self, q_id1, q_id2):
153153
if len(l) == 1:
154154
return # Already merged
155155
else:
156+
# Block the dictionary, that noone can send commands to the qubits,
156157
logging.debug("Merge Qubits %s and %s.", q_id1, q_id2)
158+
self.shared_dict.block_shared_dict()
157159
q1 = l[0]
158160
q2 = l[1]
159161
merge_q = self.manager.Queue()
160-
q1.put([MERGE_SEND, q_id1, merge_q])
161-
q2.put([MERGE_ACCEPT, q_id2, merge_q])
162162
qubits_q = self.manager.Queue()
163-
q1.put([GIVE_QUBITS_AND_TERMINATE, q_id1, qubits_q])
163+
q1.put([MERGE_SEND, q_id1, merge_q, qubits_q])
164+
q2.put([MERGE_ACCEPT, q_id2, merge_q])
164165
qubits = qubits_q.get()
165166
q2.put([ADD_MERGED_QUBITS_TO_DICT, q_id2, qubits])
166-
self.shared_dict.change_thread_and_queue_of_ids(
167+
self.shared_dict.change_thread_and_queue_of_ids_nonblocking(
167168
qubits, q_id2)
169+
self.shared_dict.release_shared_dict()
168170

169171
def cnot_gate(self, q_id1, q_id2):
170172
"""

eqsn/qubit_thread.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
MEASURE = 3
1111
MERGE_ACCEPT = 4
1212
MERGE_SEND = 5
13-
GIVE_QUBITS_AND_TERMINATE = 6
1413
MEASURE_NON_DESTRUCTIVE = 7
1514
NEW_QUBIT = 8
1615
ADD_MERGED_QUBITS_TO_DICT = 9
@@ -115,19 +114,13 @@ def merge_accept(self, channel):
115114
self.qubit = np.kron(self.qubit, vector)
116115
logging.debug("Qubit Thread merged, new qubits are %r", self.qubits)
117116

118-
def merge_send(self, channel):
117+
def merge_send(self, channel, chanel2):
119118
"""
120119
Send own process data to another process and suicide.
121120
"""
122121
channel.put(dp(self.qubits))
123122
channel.put(dp(self.qubit))
124-
return
125-
126-
def send_qubits(self, channel):
127-
"""
128-
Send which qubits are in this process over a channel.
129-
"""
130-
channel.put(dp(self.qubits))
123+
chanel2.put(dp(self.qubits))
131124
return
132125

133126
def measure_non_destructive(self, q_id, ret_channel):
@@ -145,7 +138,11 @@ def measure_non_destructive(self, q_id, ret_channel):
145138
if after > 0:
146139
measure_vec = np.kron(measure_vec, np.ones(2 ** after))
147140
pr_0 = np.multiply(measure_vec, self.qubit)
148-
pr_0 = abs(np.dot(pr_0, pr_0).real)
141+
pr_0 = abs(np.dot(pr_0, pr_0))
142+
if pr_0 > 1.0:
143+
pr_0 = 1.0
144+
elif pr_0 < 0.0:
145+
pr_0 = 0.0
149146
meas_res = np.random.binomial(1, 1.0 - pr_0)
150147
reduction_mat = None
151148
if meas_res == 0:
@@ -183,7 +180,11 @@ def measure(self, q_id, ret_channel):
183180
if after > 0:
184181
measure_vec = np.kron(measure_vec, np.ones(2 ** after))
185182
pr_0 = np.multiply(measure_vec, self.qubit)
186-
pr_0 = abs(np.dot(pr_0, pr_0).real)
183+
pr_0 = abs(np.dot(pr_0, pr_0))
184+
if pr_0 > 1.0:
185+
pr_0 = 1.0
186+
elif pr_0 < 0.0:
187+
pr_0 = 0.0
187188
meas_res = np.random.binomial(1, 1.0 - pr_0)
188189
reduction_mat = None
189190
if meas_res == 0:
@@ -230,9 +231,7 @@ def run(self):
230231
elif item[0] == MERGE_ACCEPT:
231232
self.merge_accept(item[1])
232233
elif item[0] == MERGE_SEND:
233-
self.merge_send(item[1])
234-
elif item[0] == GIVE_QUBITS_AND_TERMINATE:
235-
self.send_qubits(item[1])
234+
self.merge_send(item[1], item[2])
236235
return
237236
elif item[0] == MEASURE_NON_DESTRUCTIVE:
238237
self.measure_non_destructive(item[1], item[2])

eqsn/shared_dict.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ def __init__(self):
9595
self.thread_list = []
9696
self.queue_list = []
9797

98+
def block_shared_dict(self):
99+
self.lock.acquire_write()
100+
101+
def release_shared_dict(self):
102+
self.lock.release_write()
103+
98104
def get_queues_and_threads_for_ids(self, q_id_list):
99105
ret = []
100106
self.lock.acquire_read()
@@ -175,6 +181,13 @@ def change_thread_and_queue_of_ids(self, q_ids, q_id_new_thread):
175181
self.id_to_queue[q_id] = new_queue
176182
self.lock.release_write()
177183

184+
def change_thread_and_queue_of_ids_nonblocking(self, q_ids, q_id_new_thread):
185+
new_thread = self.id_to_thread[q_id_new_thread]
186+
new_queue = self.id_to_queue[q_id_new_thread]
187+
for q_id in q_ids:
188+
self.id_to_thread[q_id] = new_thread
189+
self.id_to_queue[q_id] = new_queue
190+
178191
def send_all_threads(self, msg):
179192
self.lock.acquire_write()
180193
for p in self.queue_list:

eqsn/worker_process.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import logging
22
import threading
3+
import os
34
from queue import Queue
45
from eqsn.shared_dict import SharedDict
56
from eqsn.qubit_thread import SINGLE_GATE, MERGE_SEND, MERGE_ACCEPT, MEASURE,\
6-
MEASURE_NON_DESTRUCTIVE, GIVE_QUBITS_AND_TERMINATE, \
7+
MEASURE_NON_DESTRUCTIVE, \
78
CONTROLLED_GATE, NEW_QUBIT, ADD_MERGED_QUBITS_TO_DICT, QubitThread
89

910

@@ -36,9 +37,7 @@ def run(self):
3637
elif item[0] == MERGE_ACCEPT:
3738
self.merge_accept(item[1], item[2])
3839
elif item[0] == MERGE_SEND:
39-
self.merge_send(item[1], item[2])
40-
elif item[0] == GIVE_QUBITS_AND_TERMINATE:
41-
self.give_qubits_and_terminate(item[1], item[2])
40+
self.merge_send(item[1], item[2], item[3])
4241
elif item[0] == MEASURE_NON_DESTRUCTIVE:
4342
self.measure_non_destructive(item[1], item[2])
4443
elif item[0] == ADD_MERGED_QUBITS_TO_DICT:
@@ -72,17 +71,6 @@ def measure_non_destructive(self, q_id, channel):
7271
q = self.shared_dict.get_queues_for_ids([q_id])[0]
7372
q.put([MEASURE_NON_DESTRUCTIVE, q_id, channel])
7473

75-
def give_qubits_and_terminate(self, q_id, queue):
76-
temp_queue = Queue()
77-
q = self.shared_dict.get_queues_for_ids([q_id])[0]
78-
q.put([GIVE_QUBITS_AND_TERMINATE, temp_queue])
79-
qubits = temp_queue.get()
80-
# remove all qubits
81-
for c in qubits:
82-
self.shared_dict.delete_id_and_check_to_join_thread(c)
83-
# send the qubits to the main process
84-
queue.put(qubits)
85-
8674
def add_merged_qubits_to_thread(self, q_id, qubits):
8775
q, t = self.shared_dict.get_queues_and_threads_for_ids([q_id])[0]
8876
for qubit in qubits:
@@ -116,9 +104,16 @@ def apply_controlled_gate(self, gate, q_id1, q_id2):
116104
q = self.shared_dict.get_queues_for_ids([q_id1])[0]
117105
q.put([CONTROLLED_GATE, gate, q_id1, q_id2])
118106

119-
def merge_send(self, q_id, queue):
107+
def merge_send(self, q_id, queue, queue2):
108+
temp_queue = Queue()
120109
q = self.shared_dict.get_queues_for_ids([q_id])[0]
121-
q.put([MERGE_SEND, queue])
110+
q.put([MERGE_SEND, queue, temp_queue])
111+
qubits = temp_queue.get()
112+
# remove all qubits
113+
for c in qubits:
114+
self.shared_dict.delete_id_and_check_to_join_thread(c)
115+
# send the qubits to the main process
116+
queue2.put(qubits)
122117

123118
def merge_accept(self, q_id, queue):
124119
q = self.shared_dict.get_queues_for_ids([q_id])[0]
@@ -141,10 +136,9 @@ def merge_qubits(self, q_id1, q_id2):
141136
q1 = l[0]
142137
q2 = l[1]
143138
merge_q = Queue()
144-
q1.put([MERGE_SEND, merge_q])
145-
q2.put([MERGE_ACCEPT, merge_q])
146139
qubits_q = Queue()
147-
q1.put([GIVE_QUBITS_AND_TERMINATE, qubits_q])
140+
q1.put([MERGE_SEND, merge_q, qubits_q])
141+
q2.put([MERGE_ACCEPT, merge_q])
148142
qubits = qubits_q.get()
149143
self.shared_dict.change_thread_and_queue_of_ids_and_join(
150144
qubits, q_id2)

tests/test_measuring_from_threads.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ def test_measure_from_threads():
1111
def measure_or_hadamard(id):
1212
n = random.randrange(10, 100, 1)
1313
for _ in range(n):
14-
time.sleep(0.05)
14+
# time.sleep(0.05)
1515
q_sim.H_gate(id)
1616
print("Finished Hadamard, measure qubit %s!" % id)
1717
print(q_sim.measure(id))
1818
print("Finished with Measure!")
1919

20-
nr_threads = 5
20+
nr_threads = 10
2121
ids = [str(x) for x in range(nr_threads)]
2222
for id in ids:
2323
q_sim.new_qubit(id)

tests/test_merging_qubits.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ def test_merge():
99
q_sim.new_qubit(id2)
1010
q_sim.merge_qubits(id1, id2)
1111
q_sim.stop_all()
12+
print("merging was succesfull")
1213

1314

1415
if __name__ == "__main__":

tests/test_multi_gate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ def test_5_qubits_gate():
3737

3838
if __name__ == "__main__":
3939
test_epr_creation()
40-
time.sleep(0.1)
40+
time.sleep(0.05)
4141
test_5_qubits_gate()

tests/test_non_destructive_measurement.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ def test_non_destructive_measurement():
1212
assert m == m2
1313
print("Test was successfull!")
1414
q_sim.stop_all()
15+
print("Stopped succesfully!")
16+
exit(0)
1517

1618

1719
if __name__ == "__main__":

tests/test_single_gate.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,4 @@ def test_measure():
118118
test_measure]
119119
for func in test_list:
120120
func()
121-
time.sleep(0.01)
121+
time.sleep(0.05)

0 commit comments

Comments
 (0)