99#include <unistd.h>
1010#include <mpi.h>
1111#include <stdint.h>
12+ #include <communication.h>
1213
1314size_t get_message_size () {
1415 return (sizeof (unsigned short ) * 3 + 2 * sizeof (size_t ) + sizeof (enum operation ));
1516}
1617
18+ struct allocation * give_for_v_address (struct leader_resources * l_r , size_t v_address , size_t * part ) {
19+ if (!l_r -> leader_reg )
20+ return NULL ;
21+ struct allocation_register * reg = l_r -> leader_reg ;
22+ for (size_t i = 0 ; i < reg -> count_alloc ; i ++ ) {
23+ for (size_t j = 0 ; j < reg -> allocs [i ]-> number_parts ; j ++ ) {
24+ // FIXME check if its ok
25+ printf ("%zu == %zu \n\n" , v_address , reg -> allocs [i ]-> parts [j ]-> virtual_address );
26+ if (reg -> allocs [i ]-> parts [j ]-> virtual_address <= v_address
27+ && reg -> allocs [i ]-> parts [j ]-> virtual_address + reg -> allocs [i ]-> parts [j ]-> size > v_address ) {
28+ * part = j ;
29+ return reg -> allocs [i ];
30+ }
31+ }
32+ }
33+ return NULL ;
34+ }
35+
1736struct leader_resources * generate_leader_resources (size_t nb_nodes , size_t id ) {
1837 struct leader_resources * l_r = malloc (128 );
1938 l_r -> leader_blks = init_nodes_same_size (nb_nodes , 8 );
@@ -23,41 +42,6 @@ struct leader_resources *generate_leader_resources(size_t nb_nodes, size_t id) {
2342 return l_r ;
2443}
2544
26- int check_block_for_alloc (size_t * res_addr , struct block * b , size_t size , struct leader_resources * l_r ) {
27- if (0 == b -> free ) {
28- if (size == b -> size ) {
29- b -> free = 1 ;
30- struct allocation * a = malloc (sizeof (struct allocation ));
31- a -> number_parts = 1 ;
32- a -> v_address_start = b -> virtual_address ;
33- a -> parts = malloc (a -> number_parts * sizeof (struct allocation * ));
34- a -> parts [0 ] = b ;
35- add_allocation (l_r -> leader_reg , a );
36- * res_addr = b -> virtual_address ;
37- return 1 ;
38- } else if (size < b -> size ) {
39- b -> free = 1 ;
40- b = split_block_u (b , size );
41- if (b ) {
42- struct allocation * a = malloc (32 + sizeof (struct allocation ));
43- a -> number_parts = 1 ;
44- a -> v_address_start = b -> virtual_address ;
45- a -> parts = malloc (34 + (a -> number_parts * sizeof (struct allocation * )));
46- a -> parts [0 ] = b ;
47- add_allocation (l_r -> leader_reg , a );
48- * res_addr = b -> virtual_address ;
49- return 1 ;
50- } else {
51- debug ("Fatal Error, split invalid or memory error" , 1 );
52- }
53- } else {
54-
55- }
56-
57- }
58- return 0 ;
59- }
60-
6145/**
6246 * Allocated memory for the User, using free space of nodes
6347 * | X1 | | X2 | X1 |
@@ -75,7 +59,7 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) {
7559 for (size_t i = 0 ; i < blks -> nb_blocks ; i ++ ) {
7660 struct block * b = blks -> blks [i ];
7761 while (b != NULL ) {
78- if (0 == b -> free ) {
62+ if (0 == b -> free && b -> id != l_r -> id ) {
7963 if (size == b -> size ) {
8064 b -> free = 1 ;
8165 struct allocation * a = malloc (32 + sizeof (struct allocation ));
@@ -167,13 +151,17 @@ void get_command(struct leader_resources *l_r, unsigned short user) {
167151 struct command_queue * n_command = generate_command_queue (m -> op , NULL );
168152 switch (m -> op ) {
169153 case OP_OK :
170- debug ("Leader recv OP OK from User" , l_r -> id );
154+ if (m -> id_s == 0 ) {
155+ debug ("Leader recv OP OK from User" , l_r -> id );
156+ } else {
157+ debug ("Leader recv OP OK from a node" , l_r -> id );
158+ }
171159 break ;
172160 case OP_WRITE :
173161 debug ("Leader recv OP WRITE from User" , l_r -> id );
174162 n_command -> command = m -> op ;
175163 n_command -> data = NULL ;
176- struct data_write * d_w = generate_data_write (m -> size , m -> size , NULL );
164+ struct data_write * d_w = generate_data_write (m -> address , m -> size , NULL );
177165 void * wbuff = malloc (sizeof (char ) * (m -> size + 1 ));
178166 debug ("Leader wait DATA from User for OP WRITE" , l_r -> id );
179167 MPI_Irecv (wbuff , m -> size * sizeof (char ), MPI_BYTE , user , 0 , MPI_COMM_WORLD , & r );
@@ -209,68 +197,140 @@ void get_command(struct leader_resources *l_r, unsigned short user) {
209197 //free(buff);
210198}
211199
212- void execute_malloc (struct node * n , struct leader_resources * l_r ) {
200+ void execute_malloc (struct leader_resources * l_r ) {
213201 struct data_size * d_s = peek_command (l_r -> leader_command_queue );
214202 if (!d_s ) {
215- debug ("ERROR allocation data_size for OP MALLOC execution [LEADER]" , n -> id );
203+ debug ("ERROR allocation data_size for OP MALLOC execution [LEADER]" , l_r -> id );
216204 }
217205 size_t v_addr = alloc_memory (d_s -> size , l_r );
218206 size_t sss = d_s -> size ;
219207 if (v_addr == 999 || v_addr == 1999 ) {
220- debug ("FATAL ERROR" , n -> id );
208+ debug ("FATAL ERROR" , l_r -> id );
221209 if (v_addr == 1999 ) {
222- debug ("SEC" , n -> id );
210+ debug ("SEC" , l_r -> id );
223211 }
224212 return ;
225213 }
226- debug ("Allocation at " , n -> id );
214+ debug ("Allocation at " , l_r -> id );
227215 printf ("%zu of %zu bytes\n" , v_addr , sss );
228- if (n -> id != l_r -> id ) {
229- debug ("WTFFFFFFFFFFFFFFFFFFFf" , n -> id );
230- } else {
231- struct message * m = generate_message (l_r -> id , DEF_NODE_USER , DEF_NODE_USER , v_addr , sss , OP_MALLOC );
232- MPI_Request r ;
233- MPI_Isend ((void * ) m , sizeof (struct message ), MPI_BYTE , m -> id_t , 0 , MPI_COMM_WORLD , & r );
234- free (m );
235- }
216+ struct message * m = generate_message (l_r -> id , DEF_NODE_USER , DEF_NODE_USER , v_addr , sss , OP_MALLOC );
217+ MPI_Request r ;
218+ MPI_Isend ((void * ) m , sizeof (struct message ), MPI_BYTE , m -> id_t , 0 , MPI_COMM_WORLD , & r );
219+ free (m );
236220}
237221
238- void execute_read (struct node * n , struct leader_resources * l_r ) {
239- (void ) n ;
240- (void ) l_r ;
222+ void execute_read (struct leader_resources * l_r ) {
223+ struct data_read * d_r ;
224+ d_r = peek_command (l_r -> leader_command_queue );
225+
226+ size_t part_s = 0 ;
227+ struct allocation * c_a = give_for_v_address (l_r , d_r -> address , & part_s );
228+ if (c_a == NULL ) {
229+ debug ("Seg Fault: requested read to a not allocated address" , l_r -> id );
230+ return ;
231+ }
232+
233+ // Readed bytes
234+ char * read_buff = NULL ;
235+
236+ // 2 Get the block to write to (Warning to multiple parts allocation)
237+ // (Warning to size bigger than block)
238+ size_t to_write_address_v = d_r -> address ;
239+ size_t nb_read = 0 ;
240+ for (size_t i = part_s ; i < c_a -> number_parts ; i ++ ) {
241+ // TODO handle size overflow
242+
243+
244+ struct block * b = c_a -> parts [i ];
245+ // compute size to write for this block
246+ size_t to_read_size = d_r -> size - b -> size ;
247+ d_r -> size -= to_read_size ;
248+ // compute local address to write
249+ size_t local_address = b -> virtual_address - to_write_address_v ;
250+ to_write_address_v += local_address ;
251+
252+ // 3 Send READ OP to each node (Warning to the local address of the node, not the virtual)
253+ struct message * m = generate_message (l_r -> id , b -> id , b -> id , local_address , to_read_size , OP_READ );
254+ MPI_Send (m , sizeof (struct message ), MPI_BYTE , b -> id , 5 , MPI_COMM_WORLD );
255+ nb_read ++ ;
256+ }
257+
258+ // 4 Receive bytes, append data from all nodes
259+ for (size_t i = 0 ; i < nb_read ; i ++ ) {
260+ // MPI_Irecv()
261+ }
262+ (void ) read_buff ;
263+
241264}
242265
243- void execute_write (struct node * n , struct leader_resources * l_r ) {
266+ void execute_write (struct leader_resources * l_r ) {
244267 // MPI_Request r;
245268 // MPI_Status st;
246269 struct data_write * d_w ;
247270 d_w = peek_command (l_r -> leader_command_queue );
248- (void ) d_w ;
249- (void ) n ;
250- (void ) l_r ;
251- // TODO
271+ // d_w.address is the virtual address on the network
272+ // .size size in bytes of data
273+ // .data
274+
275+ // 1 Get correct allocation (Handle notFound)
276+ size_t part_s = 0 ;
277+ struct allocation * c_a = give_for_v_address (l_r , d_w -> address , & part_s );
278+ if (c_a == NULL ) {
279+ debug ("Seg Fault: requested write to a not allocated address" , l_r -> id );
280+ return ;
281+ }
282+
283+ // 2 Get the block to write to (Warning to multiple parts allocation)
284+ // (Warning to size bigger than block)
285+ size_t to_write_address_v = d_w -> address ;
286+ ssize_t x = d_w -> size ;
287+ for (size_t i = part_s ; i < c_a -> number_parts ; i ++ ) {
288+ // TODO handle size overflow
289+ while (x > 0 ) {
290+ struct block * b = c_a -> parts [i ];
291+ // compute size to write for this block
292+ ssize_t to_write_size = b -> size - x ;
293+ x -= to_write_size ;
294+ if (to_write_size < 0 ) {
295+ debug ("Fatal error in write operation, size to write negative" , l_r -> id );
296+ }
297+ // compute local address to write
298+ size_t local_address = b -> virtual_address - to_write_address_v ;
299+ to_write_address_v += local_address ;
300+
301+ struct queue * q = queue_init ();
302+ printf ("Size to send for Write %ld\n\n" , to_write_size );
303+ // 3 Send Write OP to each node (Warning to the local address of the node, not the virtual)
304+ struct message * m = generate_message (l_r -> id , b -> id , b -> id , local_address , to_write_size , OP_WRITE );
305+ send_safe_message (m , q );
306+ debug_n (d_w -> data , l_r -> id , d_w -> size );
307+ MPI_Send (d_w -> data , to_write_size , MPI_BYTE , b -> id , 4 , MPI_COMM_WORLD );
308+ }
309+ }
310+
311+ // TODO Confirmation ?
252312 // struct message *m = generate_message(n->id, )
253313 // MPI_Isend()
254314}
255315
256- void execute_command (struct node * n , struct leader_resources * l_r ) {
316+ void execute_command (struct leader_resources * l_r ) {
257317 if (peek_user_command (l_r -> leader_command_queue ) != OP_NONE ) {
258318
259319 switch (peek_user_command (l_r -> leader_command_queue )) {
260320 case OP_MALLOC :
261- debug ("EXECUTE OP MALLOC LEADER" , n -> id );
262- execute_malloc (n , l_r );
321+ debug ("EXECUTE OP MALLOC LEADER" , l_r -> id );
322+ execute_malloc (l_r );
263323 break ;
264324 case OP_FREE :
265- debug ("EXECUTE OP FREE LEADER" , n -> id );
325+ debug ("EXECUTE OP FREE LEADER" , l_r -> id );
266326 break ;
267327 case OP_WRITE :
268- debug ("EXECUTE OP WRITE LEADER" , n -> id );
269- execute_write (n , l_r );
328+ debug ("EXECUTE OP WRITE LEADER" , l_r -> id );
329+ execute_write (l_r );
270330 break ;
271331 case OP_READ :
272- debug ("EXECUTE OP READ LEADER" , n -> id );
273- execute_read (n , l_r );
332+ debug ("EXECUTE OP READ LEADER" , l_r -> id );
333+ execute_read (l_r );
274334 break ;
275335 case OP_DUMP :
276336 break ;
@@ -333,7 +393,7 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n
333393 get_command (l_r , terminal_id );
334394 // debug("COMMANDS LISTEN DONE", n->id);
335395 // Execute Commands
336- execute_command (n , l_r );
396+ execute_command (l_r );
337397 // debug("COMMANDS EXEC DONE", n->id);
338398 // Break on death
339399 if (die == 1 )
0 commit comments