@@ -152,29 +152,28 @@ class GraphSketchDriver {
152152#endif
153153
154154 while (true ) {
155- bool got_breakpoint = false ;
156155 size_t updates = stream->get_update_buffer (update_array, update_array_size);
157-
158- if (update_array[updates - 1 ].type == BREAKPOINT) {
159- --updates;
160- got_breakpoint = true ;
161- }
162- gts->process_stream_upd_batch (update_array, updates, thr_id);
163-
164156 for (size_t i = 0 ; i < updates; i++) {
165- GraphUpdate upd = {update_array[i].edge , (UpdateType) update_array[i].type };
166- sketching_alg->pre_insert (upd, thr_id);
157+ GraphUpdate upd;
158+ upd.edge = update_array[i].edge ;
159+ upd.type = static_cast <UpdateType>(update_array[i].type );
160+ if (upd.type == BREAKPOINT) {
161+ // reached the breakpoint. Update verifier if applicable and return
167162#ifdef VERIFY_SAMPLES_F
168- local_verifier.edge_update (upd.edge );
163+ std::lock_guard<std::mutex> lk (verifier_mtx);
164+ verifier->combine (local_verifier);
169165#endif
170- }
171-
172- if (got_breakpoint) {
166+ return ;
167+ }
168+ else {
169+ sketching_alg->pre_insert (upd, thr_id);
170+ Edge edge = upd.edge ;
171+ gts->insert ({edge.src , edge.dst }, thr_id);
172+ gts->insert ({edge.dst , edge.src }, thr_id);
173173#ifdef VERIFY_SAMPLES_F
174- std::lock_guard<std::mutex> lk (verifier_mtx);
175- verifier->combine (local_verifier);
174+ local_verifier.edge_update (edge);
176175#endif
177- return ;
176+ }
178177 }
179178 }
180179 };
@@ -227,4 +226,4 @@ class GraphSketchDriver {
227226 // time hooks for experiments
228227 std::chrono::steady_clock::time_point flush_start;
229228 std::chrono::steady_clock::time_point flush_end;
230- };
229+ };
0 commit comments