Skip to content

Commit ef271e8

Browse files
authored
Merge pull request #26 from SidoShiro/network-write-2
Network write 2, nice
2 parents 9979873 + 87b9241 commit ef271e8

4 files changed

Lines changed: 112 additions & 80 deletions

File tree

include/leader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ struct leader_resources {
1313
struct allocation_register *leader_reg;
1414
struct command_queue *leader_command_queue;
1515
unsigned short id;
16+
size_t availaible_memory;
17+
size_t max_memory;
1618
};
1719

1820
struct address_search {

src/cli/user.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
#include <mpi.h>
77
#include <debug.h>
8+
#include <stdint-gcc.h>
9+
#include <event.h>
810

911
void send_write(void *data, unsigned short leader) {
1012
MPI_Request r;
@@ -45,9 +47,16 @@ void send_malloc(void *data, unsigned short leader) {
4547
*/
4648
if (0 == MPI_Wait(&r2, &st)) {
4749
struct message *m2 = buff;
48-
printf("user: request malloc of size %zu, is at address %zu on the network\n",
49-
m2->size,
50-
m2->address);
50+
if (m2->address != SIZE_MAX) {
51+
printf("user: request malloc of size %zu, is at address %zu on the network\n",
52+
m2->size,
53+
m2->address);
54+
}
55+
else if (m2->size == 0) {
56+
debug("Network: Out of memory", 0);
57+
} else {
58+
debug("Network: Fatal error in leader", 0);
59+
}
5160
}
5261
free(m);
5362
free(buff);

src/network/leader.c

Lines changed: 74 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ size_t get_message_size() {
1515
return (sizeof(unsigned short) * 3 + 2 * sizeof(size_t) + sizeof(enum operation));
1616
}
1717

18+
19+
size_t size_of_allocation(struct allocation *a) {
20+
size_t size = 0;
21+
if (!a)
22+
return size;
23+
for (size_t i = 0; i < a->number_parts; i++) {
24+
size += a->parts[i]->size;
25+
}
26+
return size;
27+
}
28+
29+
1830
struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_address, size_t *part) {
1931
if (!l_r->leader_reg)
2032
return NULL;
@@ -35,10 +47,12 @@ struct allocation *give_for_v_address(struct leader_resources *l_r, size_t v_add
3547

3648
struct leader_resources *generate_leader_resources(size_t nb_nodes, size_t id) {
3749
struct leader_resources *l_r = malloc(128);
38-
l_r->leader_blks = init_nodes_same_size(nb_nodes, 8);
50+
l_r->leader_blks = init_nodes_same_size(nb_nodes, DEF_NODE_SIZE);
3951
l_r->leader_command_queue = NULL;
4052
l_r->leader_reg = generate_allocs(4);
4153
l_r->id = id;
54+
l_r->max_memory = nb_nodes * DEF_NODE_SIZE - (DEF_NODE_SIZE); // minus leader
55+
l_r->availaible_memory = l_r->max_memory;
4256
return l_r;
4357
}
4458

@@ -50,6 +64,8 @@ struct leader_resources *generate_leader_resources(size_t nb_nodes, size_t id) {
5064
* @return 1 if okay, 0 if no blocks possible for this size
5165
*/
5266
size_t alloc_memory(size_t size, struct leader_resources *l_r) {
67+
if (size >= l_r->max_memory || size >= l_r->availaible_memory)
68+
return SIZE_MAX;
5369
struct block_register *blks = l_r->leader_blks;
5470
if (!blks && blks->nb_blocks > 0) {
5571
debug("ERROR not malloc blockS !!!", 0); //l_r->id);
@@ -68,6 +84,7 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) {
6884
a->parts = malloc(34 + (a->number_parts * sizeof(struct allocation *)));
6985
a->parts[0] = b;
7086
add_allocation(l_r->leader_reg, a);
87+
l_r->availaible_memory -= size_of_allocation(a);
7188
return b->virtual_address;
7289
} else if (size < b->size) {
7390
b->free = 1;
@@ -79,6 +96,7 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) {
7996
a->parts = malloc(34 + (a->number_parts * sizeof(struct allocation *)));
8097
a->parts[0] = b;
8198
add_allocation(l_r->leader_reg, a);
99+
l_r->availaible_memory -= size_of_allocation(a);
82100
return b->virtual_address;
83101
} else {
84102
debug("Fatal Error, split invalid or memory error", 1);
@@ -96,7 +114,7 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) {
96114
ssize_t m_size = size;
97115
for (size_t i = 0; i < blks->nb_blocks; i++) {
98116
struct block *b = blks->blks[i];
99-
while (b && m_size > 0) {
117+
while (b && b->id != l_r->id && m_size > 0) {
100118
if (b->free == 0) {
101119
b->free = 1;
102120
m_size -= b->size;
@@ -113,6 +131,7 @@ size_t alloc_memory(size_t size, struct leader_resources *l_r) {
113131
}
114132
if (m_size <= 0) {
115133
add_allocation(l_r->leader_reg, a);
134+
l_r->availaible_memory -= size_of_allocation(a);
116135
return a->v_address_start;
117136
}
118137
}
@@ -162,7 +181,7 @@ void get_command(struct leader_resources *l_r, unsigned short user) {
162181
n_command->command = m->op;
163182
n_command->data = NULL;
164183
struct data_write *d_w = generate_data_write(m->address, m->size, NULL);
165-
void *wbuff = malloc(sizeof(char) * (m->size + 1));
184+
void *wbuff = malloc(sizeof(char) * (m->size + 2));
166185
debug("Leader wait DATA from User for OP WRITE", l_r->id);
167186
MPI_Irecv(wbuff, m->size * sizeof(char), MPI_BYTE, user, 0, MPI_COMM_WORLD, &r);
168187
if (0 == MPI_Wait(&r, &st)) {
@@ -201,14 +220,27 @@ void execute_malloc(struct leader_resources *l_r) {
201220
struct data_size *d_s = peek_command(l_r->leader_command_queue);
202221
if (!d_s) {
203222
debug("ERROR allocation data_size for OP MALLOC execution [LEADER]", l_r->id);
223+
return;
204224
}
205225
size_t v_addr = alloc_memory(d_s->size, l_r);
206226
size_t sss = d_s->size;
207-
if (v_addr == 999 || v_addr == 1999) {
227+
if (v_addr == 999 || v_addr == 1999 || v_addr == SIZE_MAX) {
228+
if (v_addr == SIZE_MAX) {
229+
debug("Out of memory", l_r->id);
230+
struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, SIZE_MAX, 0, OP_MALLOC);
231+
MPI_Request r;
232+
MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r);
233+
free(m);
234+
return;
235+
}
208236
debug("FATAL ERROR", l_r->id);
209237
if (v_addr == 1999) {
210238
debug("SEC", l_r->id);
211239
}
240+
struct message *m = generate_message(l_r->id, DEF_NODE_USER, DEF_NODE_USER, SIZE_MAX, SIZE_MAX, OP_MALLOC);
241+
MPI_Request r;
242+
MPI_Isend((void *) m, sizeof(struct message), MPI_BYTE, m->id_t, 0, MPI_COMM_WORLD, &r);
243+
free(m);
212244
return;
213245
}
214246
debug("Allocation at ", l_r->id);
@@ -283,29 +315,46 @@ void execute_write(struct leader_resources *l_r) {
283315
// 2 Get the block to write to (Warning to multiple parts allocation)
284316
// (Warning to size bigger than block)
285317
size_t to_write_address_v = d_w->address;
286-
ssize_t x = d_w->size;
287-
for (size_t i = part_s; i < c_a->number_parts; i++) {
318+
size_t x = d_w->size;
319+
for (size_t i = part_s; x > 0 && i < c_a->number_parts; i++) {
288320
// TODO handle size overflow
289-
while (x > 0) {
290-
struct block *b = c_a->parts[i];
291-
// compute size to write for this block
292-
ssize_t to_write_size = b->size - x;
293-
x -= to_write_size;
294-
if (to_write_size < 0) {
295-
debug("Fatal error in write operation, size to write negative", l_r->id);
296-
}
297-
// compute local address to write
298-
size_t local_address = b->virtual_address - to_write_address_v;
299-
to_write_address_v += local_address;
300-
301-
struct queue *q = queue_init();
302-
printf("Size to send for Write %ld\n\n", to_write_size);
303-
// 3 Send Write OP to each node (Warning to the local address of the node, not the virtual)
304-
struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_write_size, OP_WRITE);
305-
send_safe_message(m, q);
306-
debug_n(d_w->data, l_r->id, d_w->size);
307-
MPI_Send(d_w->data, to_write_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD);
321+
struct block *b = c_a->parts[i];
322+
// compute size to write for this block
323+
size_t to_write_size = 0;
324+
if (x <= b->size) {
325+
to_write_size = x;
326+
x = 0;
327+
} else {
328+
x -= b->size;
329+
to_write_size = b->size;
308330
}
331+
332+
// compute local address to write
333+
size_t local_address = 0;
334+
if (b->virtual_address == to_write_address_v) {
335+
local_address = 0;
336+
to_write_address_v += to_write_size;
337+
} else {
338+
local_address += to_write_address_v - b->virtual_address;
339+
to_write_address_v += to_write_size;
340+
}
341+
342+
// struct queue *q = queue_init();
343+
// printf("Size to send for Write %zu\n\n", to_write_size);
344+
// 3 Send Write OP to each node (Warning to the local address of the node, not the virtual)
345+
struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_write_size, OP_WRITE);
346+
debug("Send Write OP", l_r->id);
347+
MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD);
348+
struct message m2;
349+
MPI_Status st;
350+
MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD, &st);
351+
debug("Send Data", l_r->id);
352+
// debug_n(d_w->data, l_r->id, d_w->size);
353+
MPI_Send(d_w->data, to_write_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD);
354+
}
355+
356+
if (x > 0) {
357+
debug("Write asked is overflowing the allocated block", l_r->id);
309358
}
310359

311360
// TODO Confirmation ?
@@ -356,26 +405,9 @@ void leader_loop(struct node *n, unsigned short terminal_id, unsigned short nb_n
356405
// Init leader resource
357406
printf("NB NODES :%d,", nb_nodes);
358407

359-
printf("NB NODES :%d,", nb_nodes);
360-
void *k = malloc(sizeof(struct message));
361-
(void) k;
362408
debug("START LEADER LOOP", n->id);
363409
struct leader_resources *l_r = generate_leader_resources(nb_nodes, n->id);
364410

365-
debug("PASS LEADER_RESOURCES", n->id);
366-
for (size_t o = 0; o < 1; o++) {
367-
// void *g =generate_message(0, DEF_NODE_USER, DEF_NODE_USER, 0, 0, OP_MALLOC);
368-
// (void)g;
369-
void *jj = malloc(48); //
370-
// sizeof(struct message));
371-
(void) jj;
372-
}
373-
374-
for (size_t i = 0; i < l_r->leader_blks->nb_blocks; i++) {
375-
struct block *b = l_r->leader_blks->blks[i];
376-
printf("%u\n", b->id);
377-
}
378-
379411
/*
380412
// MPI_Isend();
381413
int MPI_Isend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag,

src/network/node.c

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,66 +10,55 @@ struct node *generate_node(unsigned short id, size_t size) {
1010
return NULL;
1111
n->id = id;
1212
n->size = size;
13-
n->memory = malloc(size * sizeof(char) + 1);
13+
n->memory = malloc((1 + size) * sizeof(char));
14+
for (size_t i = 0; i < size; i++) {
15+
n->memory[i] = '#';
16+
}
17+
n->memory[size] = '\0';
1418
n->isleader = 0;
1519
return n;
1620
}
1721

1822
void write_on_node(struct node *n, size_t address, char *data, size_t size) {
19-
if (address < n->size && address + size < n->size) {
20-
n->memory = (char *) memcpy((void *) (n->memory + address), (void *) data, size);
23+
if (address + size <= n->size) {
24+
void *mem_op_ptr = (n->memory + address);
25+
memcpy((void *) mem_op_ptr, (void *) data, size);
2126
debug("Write done of :", n->id);
22-
debug_n((char *)n->memory, n->id, n->size);
27+
debug_n((char *) n->memory, n->id, n->size);
28+
}
29+
else {
30+
// printf("aske_addr %zu, ask_mem %zu, size_mem %zu\n\n", address, size, n->size);
31+
debug("OP WRITE FAILED", n->id);
2332
}
2433
}
2534

2635
void node_cycle(struct node *n) {
2736
while (1) {
2837
// cycle of node
29-
struct queue *q = queue_init();
30-
struct message *m;
31-
m = receive_message(q);
32-
38+
// OLD FIXME struct queue *q = queue_init();
39+
struct message *m = generate_message(0, 0, 0, 0, 0, OP_NONE);
40+
MPI_Status st;
41+
MPI_Recv(m, sizeof(struct message), MPI_BYTE, MPI_ANY_SOURCE, 3, MPI_COMM_WORLD, &st);
42+
debug("Recv OP", n->id);
3343
switch (m->op) {
3444
case OP_OK:
3545
break;
36-
case OP_MALLOC:
37-
break;
38-
case OP_FREE:
39-
break;
4046
case OP_WRITE: {
47+
debug("Write OP : send OK", n->id);
48+
struct message *mW = generate_message(n->id, m->id_s, n->id, 0, 0, OP_OK);
49+
MPI_Send(mW, sizeof(struct message), MPI_BYTE, mW->id_t, 3, MPI_COMM_WORLD);
4150
size_t addr = m->address;
4251
size_t size = m->size;
4352
char *data = malloc(size * sizeof(char));
44-
MPI_Status st;
45-
MPI_Recv(data, size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD, &st);
53+
MPI_Status st3;
54+
MPI_Recv(data, size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD, &st3);
4655
write_on_node(n, addr, data, size);
4756
free(data);
4857
}
4958
break;
5059
case OP_READ:
5160
break;
52-
case OP_SNAP:
53-
break;
54-
case OP_LEADER:
55-
break;
56-
case OP_WHOISLEADER:
57-
break;
58-
case OP_REVIVE:
59-
break;
60-
case OP_KILL:
61-
break;
62-
case OP_TEST:
63-
break;
64-
case OP_NONE:
65-
break;
66-
case OP_DUMP:
67-
break;
68-
case OP_LEADER_OK:
69-
break;
70-
case OP_ALIVE:
71-
break;
72-
case OP_LEADER_AGAIN:
61+
default:
7362
break;
7463
}
7564
if (m->op == OP_KILL)

0 commit comments

Comments
 (0)