Skip to content

Commit 8994a23

Browse files
authored
Merge pull request #27 from SidoShiro/network-read-1
Network read 1
2 parents ef271e8 + c30f79c commit 8994a23

4 files changed

Lines changed: 67 additions & 23 deletions

File tree

include/globals.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#ifndef DISTRIBUTEDMALLOC_GLOBALS_H
22
#define DISTRIBUTEDMALLOC_GLOBALS_H
33

4-
#define DEF_NODE_SIZE 32
4+
#define DEF_NODE_SIZE 8
55
#define DEF_NODE_USER 0
66
#define DEF_NODE_LEADER 1
77

src/cli/user.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
#include "message.h"
44
#include "globals.h"
55

6-
#include <mpi.h>
76
#include <debug.h>
8-
#include <stdint-gcc.h>
9-
#include <event.h>
7+
#include <mpi.h>
8+
#include <stdint.h>
9+
#include <sys/types.h>
10+
#include <unistd.h>
1011

1112
void send_write(void *data, unsigned short leader) {
1213
MPI_Request r;

src/network/leader.c

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <mpi.h>
1111
#include <stdint.h>
1212
#include <communication.h>
13+
#include <string.h>
1314

1415
size_t get_message_size() {
1516
return (sizeof(unsigned short) * 3 + 2 * sizeof(size_t) + sizeof(enum operation));
@@ -262,37 +263,54 @@ void execute_read(struct leader_resources *l_r) {
262263
return;
263264
}
264265

265-
// Readed bytes
266-
char *read_buff = NULL;
266+
// Buffer of all READS bytes
267+
char *read_buff = malloc(sizeof(char) * (d_r->size + 1));
267268

268269
// 2 Get the block to write to (Warning to multiple parts allocation)
269270
// (Warning to size bigger than block)
270271
size_t to_write_address_v = d_r->address;
271272
size_t nb_read = 0;
272-
for (size_t i = part_s; i < c_a->number_parts; i++) {
273+
size_t nb_read_size = 0;
274+
size_t x = d_r->size;
275+
size_t offset = 0;
276+
for (size_t i = part_s; x > 0 && i < c_a->number_parts; i++) {
273277
// TODO handle size overflow
274-
275-
276278
struct block *b = c_a->parts[i];
277-
// compute size to write for this block
278-
size_t to_read_size = d_r->size - b->size;
279-
d_r->size -= to_read_size;
279+
// compute size to read for this block
280+
size_t to_read_size = 0;
281+
if (x <= b->size) {
282+
to_read_size = x;
283+
x = 0;
284+
} else {
285+
x -= b->size;
286+
to_read_size = b->size;
287+
}
288+
280289
// compute local address to write
281-
size_t local_address = b->virtual_address - to_write_address_v;
282-
to_write_address_v += local_address;
290+
size_t local_address = 0;
291+
if (b->virtual_address == to_write_address_v) {
292+
local_address = 0;
293+
to_write_address_v += to_read_size;
294+
} else {
295+
local_address += to_write_address_v - b->virtual_address;
296+
to_write_address_v += to_read_size;
297+
}
283298

299+
nb_read_size += to_read_size;
284300
// 3 Send READ OP to each node (Warning to the local address of the node, not the virtual)
285301
struct message *m = generate_message(l_r->id, b->id, b->id, local_address, to_read_size, OP_READ);
286-
MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 5, MPI_COMM_WORLD);
302+
debug("Send OP Read", l_r->id);
303+
MPI_Send(m, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD);
304+
void *buff = malloc(sizeof(char) * (to_read_size + 1));
305+
MPI_Status st;
306+
MPI_Recv(buff, to_read_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD, &st);
307+
memcpy((void*)(read_buff + (offset * sizeof(char))), buff, to_read_size);
308+
offset += to_read_size;
287309
nb_read++;
310+
free(m);
288311
}
289-
290-
// 4 Receive bytes, append data from all nodes
291-
for (size_t i = 0; i < nb_read; i++) {
292-
// MPI_Irecv()
293-
}
294-
(void) read_buff;
295-
312+
debug("READ AND ASSEMBLED", l_r->id);
313+
debug_n(read_buff, l_r->id, d_r->size);
296314
}
297315

298316
void execute_write(struct leader_resources *l_r) {
@@ -316,6 +334,7 @@ void execute_write(struct leader_resources *l_r) {
316334
// (Warning to size bigger than block)
317335
size_t to_write_address_v = d_w->address;
318336
size_t x = d_w->size;
337+
size_t offset = 0;
319338
for (size_t i = part_s; x > 0 && i < c_a->number_parts; i++) {
320339
// TODO handle size overflow
321340
struct block *b = c_a->parts[i];
@@ -350,7 +369,8 @@ void execute_write(struct leader_resources *l_r) {
350369
MPI_Recv(&m2, sizeof(struct message), MPI_BYTE, b->id, 3, MPI_COMM_WORLD, &st);
351370
debug("Send Data", l_r->id);
352371
// debug_n(d_w->data, l_r->id, d_w->size);
353-
MPI_Send(d_w->data, to_write_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD);
372+
MPI_Send((void*)((char*)d_w->data + offset), to_write_size, MPI_BYTE, b->id, 4, MPI_COMM_WORLD);
373+
offset += to_write_size;
354374
}
355375

356376
if (x > 0) {

src/network/node.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ void write_on_node(struct node *n, size_t address, char *data, size_t size) {
3232
}
3333
}
3434

35+
/**
36+
*
37+
* @param n
38+
* @param address
39+
* @param data
40+
* @param size
41+
*/
42+
void read_on_node(struct node *n, size_t address, char *data, size_t size) {
43+
if (address + size <= n->size) {
44+
void *mem_op_ptr = (n->memory + address);
45+
memcpy((void *) data, (void *) mem_op_ptr, size);
46+
}
47+
else {
48+
debug("OP READ FAILED", n->id);
49+
}
50+
}
51+
3552
void node_cycle(struct node *n) {
3653
while (1) {
3754
// cycle of node
@@ -57,6 +74,12 @@ void node_cycle(struct node *n) {
5774
}
5875
break;
5976
case OP_READ:
77+
debug("Read OP", n->id);
78+
char *data = malloc(m->size * sizeof(char));
79+
read_on_node(n, m->address, data, m->size);
80+
debug("Send Read: Data", n->id);
81+
MPI_Send(data, m->size, MPI_BYTE, m->id_s, 4, MPI_COMM_WORLD);
82+
free(data);
6083
break;
6184
default:
6285
break;

0 commit comments

Comments
 (0)