@@ -65,15 +65,15 @@ void filter(const data_t *in, data_t *out, const filter_params_t param)
6565}
6666
6767
68- bool window (data_t * out , const window_params_t params )
68+ bool window (data_t * * out , const window_params_t params )
6969{
7070 data_t * data = params .source -> get_next (params .source , params .size , params .step );
7171
7272 if (data == NULL )
7373 return false;
7474
75- * out = * data ;
76- free ( data ) ;
75+ free ( * out ) ;
76+ * out = data ;
7777 return true;
7878}
7979
@@ -146,7 +146,7 @@ void execute_step(step_t *step)
146146 atomic_store (& step -> quit , true);
147147 free (output );
148148 //step->quit = true;
149- return ;;
149+ return ;
150150 }
151151 spsc_dequeue (right_queue , & right_input );
152152 assert (right_input );
@@ -173,7 +173,7 @@ void execute_step(step_t *step)
173173 break ;
174174
175175 case WINDOW :
176- if (!window (output , op -> params .window )) {
176+ if (!window (& output , op -> params .window )) {
177177 atomic_store (& step -> quit , true);
178178 free (output );
179179 return ;
@@ -195,13 +195,19 @@ void execute_step(step_t *step)
195195 free (left_input -> data ); left_input -> data = NULL ;
196196 free (left_input ); left_input = NULL ;
197197 }
198+ else {
199+ free (left_input ); left_input = NULL ;
200+ }
198201 }
199202 if (right_step ) {
200203 assert (right_input );
201204 if (op -> right -> type != WINDOW ) {
202205 free (right_input -> data ); right_input -> data = NULL ;
203206 free (right_input ); right_input = NULL ;
204207 }
208+ else {
209+ free (right_input ); right_input = NULL ;
210+ }
205211 }
206212}
207213
0 commit comments