Skip to content

Commit 64c4e6e

Browse files
committed
feat: add support to ondemand workers and ff_send_out
1 parent 1291d63 commit 64c4e6e

11 files changed

Lines changed: 206 additions & 56 deletions

fastflow_module.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,17 @@ fastflow_exec(PyObject *module)
6868
return 0;
6969
}
7070

71-
PyDoc_STRVAR(ff_send_out_to_doc, "Send out to a node");
71+
PyDoc_STRVAR(ff_send_out_doc, "Send out to a node");
7272

73-
PyObject* empty_ff_send_out_to(PyObject *self, PyObject *args) {
73+
PyObject* empty_ff_send_out(PyObject *self, PyObject *args) {
7474
assert(self);
7575

7676
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
7777
return NULL;
7878
}
7979

8080
static PyMethodDef module_methods[] = {
81-
{ "ff_send_out_to", (PyCFunction) empty_ff_send_out_to, METH_VARARGS, ff_send_out_to_doc },
81+
{ "ff_send_out", (PyCFunction) empty_ff_send_out, METH_VARARGS, ff_send_out_doc },
8282
{NULL, NULL} /* Sentinel */
8383
};
8484

include/process/base_process.hpp

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,32 +41,27 @@ void process_body(std::string &node_ser, int read_fd, int send_fd, bool isMultiO
4141
py_ff_callback_object* callback = (py_ff_callback_object*) PyObject_CallObject(
4242
(PyObject *) &py_ff_callback_type, NULL
4343
);
44-
callback->ff_send_out_to_callback = [isMultiOutput, &pickl, &messaging](PyObject* pydata, int index) {
44+
callback->ff_send_out_callback = [isMultiOutput, &pickl, &messaging](PyObject* pydata, int index) {
4545
if (!isMultiOutput) {
4646
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
4747
return (PyObject*) NULL;
4848
}
4949

50-
if (index < 0) {
51-
PyErr_SetString(PyExc_Exception, "Index cannot be negative");
52-
return (PyObject*) NULL;
53-
}
54-
5550
Message response;
5651
if (PyObject_TypeCheck(pydata, &py_ff_constant_type) != 0) {
5752
// we may have a fastflow constant as data to send out to index
5853
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(pydata);
59-
int err = messaging.call_remote(response, "ff_send_out_to", index, _const_result->ff_const);
54+
int err = messaging.call_remote(response, "ff_send_out", _const_result->ff_const, index);
6055
if (err <= 0) {
61-
PyErr_SetString(PyExc_Exception, "Error occurred sending ff_send_out_to request");
56+
PyErr_SetString(PyExc_Exception, "Error occurred sending ff_send_out request");
6257
return (PyObject*) NULL;
6358
}
6459
} else {
6560
auto data = pickl.pickle(pydata);
66-
int err = messaging.call_remote(response, "ff_send_out_to", index, data);
61+
int err = messaging.call_remote(response, "ff_send_out", data, index);
6762
if (PyErr_Occurred()) return (PyObject*) NULL;
6863
if (err <= 0) {
69-
PyErr_SetString(PyExc_Exception, "Error occurred sending ff_send_out_to request");
64+
PyErr_SetString(PyExc_Exception, "Error occurred sending ff_send_out request");
7065
return (PyObject*) NULL;
7166
}
7267
}
@@ -81,7 +76,7 @@ void process_body(std::string &node_ser, int read_fd, int send_fd, bool isMultiO
8176
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", cleanup_exit();)
8277
}
8378
// if you access the methods by importing them from the module, replace each method with the delegate's one
84-
if (PyDict_SetItemString(globals, "ff_send_out_to", PyObject_GetAttrString((PyObject*) callback, "ff_send_out_to")) == -1) {
79+
if (PyDict_SetItemString(globals, "ff_send_out", PyObject_GetAttrString((PyObject*) callback, "ff_send_out")) == -1) {
8580
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", cleanup_exit();)
8681
}
8782

@@ -248,23 +243,27 @@ class base_process {
248243

249244
while(response.type == MESSAGE_TYPE_REMOTE_PROCEDURE_CALL) {
250245
// the only supported remote procedure call from the child process
251-
// if the call of ff_send_out_to (as of today...)
252-
if (response.f_name.compare("ff_send_out_to") != 0) {
246+
// if the call of ff_send_out (as of today...)
247+
if (response.f_name.compare("ff_send_out") != 0) {
253248
handleError("got invalid f_name", );
254249
return NULL;
255250
}
256251

257-
// parse received ff_send_out_to request
258-
std::tuple<int, std::string> args = messaging.parse_data<int, std::string>(response.data);
252+
// parse received ff_send_out request
253+
std::tuple<std::string, int> args = messaging.parse_data<std::string, int>(response.data);
259254
// try to deserialize to constant. If it results into NULL, then it is NOT a FastFlow's constant
260-
void* constant = deserialize<void*>(std::get<1>(args));
261-
// finally perform ff_send_out_to
262-
bool result = registered_callback->ff_send_out_to(constant == NULL ? (void*) new std::string(std::get<1>(args)):constant, std::get<0>(args));
255+
void* constant = deserialize<void*>(std::get<0>(args));
256+
// finally perform ff_send_out
257+
int index = std::get<1>(args);
258+
auto data = constant == NULL ? (void*) new std::string(std::get<0>(args)):constant;
259+
bool result = index >= 0 ?
260+
registered_callback->ff_send_out_to(data, index):
261+
registered_callback->ff_send_out(data);
263262

264-
// send ff_send_out_to result
263+
// send ff_send_out result
265264
err = messaging.send_response(result);
266265
if (err <= 0) {
267-
handleError("error sending ff_send_out_to response", );
266+
handleError("error sending ff_send_out response", );
268267
return NULL;
269268
}
270269

include/py_ff_a2a.hpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,30 @@ PyObject* py_ff_a2a_add_firstset(PyObject *self, PyObject *args, PyObject* kwds)
124124
py_ff_a2a_object* _self = reinterpret_cast<py_ff_a2a_object*>(self);
125125

126126
PyObject *py_nodes = NULL;
127-
bool use_main_thread = false;
128-
if (parse_args(args, kwds, &py_nodes, &use_main_thread) == -1) return NULL;
127+
128+
PyObject *py_use_main_thread = Py_False;
129+
PyObject *py_ondemand = Py_False;
130+
static const char *kwlist[] = { "", "ondemand", "use_main_thread", NULL };
131+
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|$OO", const_cast<char**>(kwlist), &py_nodes, &py_ondemand, &py_use_main_thread)) {
132+
assert(PyErr_Occurred());
133+
PyErr_SetString(PyExc_RuntimeError, "Tuple and keywords parsing failed");
134+
return NULL;
135+
}
136+
137+
if (py_use_main_thread != nullptr && !PyBool_Check(py_use_main_thread)) {
138+
PyErr_Format(PyExc_TypeError, "A bool is required for 'use_main_thread' keyword (got type %s)",
139+
Py_TYPE(py_use_main_thread)->tp_name);
140+
return NULL;
141+
}
142+
143+
if (py_ondemand != nullptr && !PyBool_Check(py_ondemand)) {
144+
PyErr_Format(PyExc_TypeError, "A bool is required for 'ondemand' keyword (got type %s)",
145+
Py_TYPE(py_ondemand)->tp_name);
146+
return NULL;
147+
}
148+
149+
bool use_main_thread = PyObject_IsTrue(py_use_main_thread) == 1;
150+
bool ondemand = PyObject_IsTrue(py_ondemand) == 1;
129151

130152
std::vector<ff::ff_node*> set;
131153
// Iterate over all the arguments
@@ -163,8 +185,7 @@ PyObject* py_ff_a2a_add_firstset(PyObject *self, PyObject *args, PyObject* kwds)
163185
}
164186
Py_DECREF(iterator);
165187

166-
int ondemand = 0;
167-
int val = _self->a2a->add_firstset(set, ondemand, false);
188+
int val = _self->a2a->add_firstset(set, ondemand ? 1:0, false);
168189
return PyLong_FromLong(val);
169190
}
170191

include/py_ff_callback.hpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,48 @@
77

88
typedef struct {
99
PyObject_HEAD
10-
std::function<PyObject*(PyObject*, int)> ff_send_out_to_callback;
10+
std::function<PyObject*(PyObject*, int)> ff_send_out_callback;
1111
} py_ff_callback_object;
1212

13-
PyDoc_STRVAR(py_ff_callback_ff_send_out_to_doc, "FF send out to");
13+
PyDoc_STRVAR(py_ff_callback_ff_send_out_doc, "FF send out to");
1414

15-
PyObject* py_ff_callback_ff_send_out_to(PyObject *self, PyObject *args)
15+
PyObject* py_ff_callback_ff_send_out(PyObject *self, PyObject *args)
1616
{
1717
assert(self);
1818
py_ff_callback_object* _self = reinterpret_cast<py_ff_callback_object*>(self);
1919

2020
PyObject *pydata = nullptr;
21-
unsigned int index = 0;
22-
if (!PyArg_ParseTuple(args, "OI", &pydata, &index)) return NULL;
21+
PyObject *pyindex = nullptr;
22+
// doc: http://web.mit.edu/people/amliu/vrut/python/ext/parseTuple.html
23+
if (!PyArg_ParseTuple(args, "O|O", &pydata, &pyindex)) return NULL;
2324

2425
if (pydata == nullptr) {
2526
PyErr_SetString(PyExc_TypeError, "Please provide valid data object");
2627
return NULL;
2728
}
2829

29-
return _self->ff_send_out_to_callback(pydata, index);
30+
int index = -1;
31+
if (pyindex != nullptr) {
32+
if (PyLong_Check(pyindex) == 0) {
33+
PyErr_Format(PyExc_TypeError, "Please provide the index as an integer object (got type %s)",
34+
Py_TYPE(pyindex)->tp_name);
35+
return NULL;
36+
}
37+
38+
index = PyLong_AsLong(pyindex);
39+
40+
if (index < 0) {
41+
PyErr_SetString(PyExc_Exception, "Index cannot be negative");
42+
return (PyObject*) NULL;
43+
}
44+
}
45+
46+
return _self->ff_send_out_callback(pydata, index);
3047
}
3148

3249
static PyMethodDef py_ff_callback_methods[] = {
33-
{ "ff_send_out_to", (PyCFunction) py_ff_callback_ff_send_out_to,
34-
METH_VARARGS, py_ff_callback_ff_send_out_to_doc },
50+
{ "ff_send_out", (PyCFunction) py_ff_callback_ff_send_out,
51+
METH_VARARGS, py_ff_callback_ff_send_out_doc },
3552
{NULL, NULL} /* Sentinel */
3653
};
3754

include/subint/base_subint.hpp

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ class base_subint {
8484
py_ff_callback_object* callback = (py_ff_callback_object*) PyObject_CallObject(
8585
(PyObject *) &py_ff_callback_type, NULL
8686
);
87-
callback->ff_send_out_to_callback = [this](PyObject* pydata, int index) {
88-
return this->py_ff_send_out_to(pydata, index);
87+
callback->ff_send_out_callback = [this](PyObject* pydata, int index) {
88+
return this->py_ff_send_out(pydata, index);
8989
};
9090

9191
int returnValue = 0;
@@ -96,7 +96,7 @@ class base_subint {
9696
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", returnValue = -1;)
9797
}
9898
// if you access the methods by importing them from the module, replace each method with the callback's one
99-
if (PyDict_SetItemString(globals, "ff_send_out_to", PyObject_GetAttrString((PyObject*) callback, "ff_send_out_to")) == -1) {
99+
if (PyDict_SetItemString(globals, "ff_send_out", PyObject_GetAttrString((PyObject*) callback, "ff_send_out")) == -1) {
100100
CHECK_ERROR_THEN("PyDict_SetItemString failure: ", returnValue = -1;)
101101
}
102102

@@ -205,27 +205,25 @@ class base_subint {
205205
LOGELAPSED("svc_end time ", svc_end_start_time);
206206
}
207207

208-
PyObject* py_ff_send_out_to(PyObject *py_data, int index) {
209-
if (registered_callback == NULL) {
208+
PyObject* py_ff_send_out(PyObject *py_data, int index) {
209+
if (registered_callback == NULL && index != -1) {
210210
PyErr_SetString(PyExc_Exception, "Operation not available. This is not a multi output node");
211211
return NULL;
212212
}
213213

214-
if (index < 0) {
215-
PyErr_SetString(PyExc_Exception, "Index cannot be negative");
216-
return (PyObject*) NULL;
217-
}
218-
219214
// we may have a fastflow constant as data to send out to index
220215
if (PyObject_TypeCheck(py_data, &py_ff_constant_type) != 0) {
221216
py_ff_constant_object* _const_result = reinterpret_cast<py_ff_constant_object*>(py_data);
222-
return registered_callback->ff_send_out_to(_const_result->ff_const, index) ? Py_True:Py_False;
217+
auto res = index >= 0 ?
218+
registered_callback->ff_send_out_to(_const_result->ff_const, index):
219+
registered_callback->ff_send_out(_const_result->ff_const);
220+
return res ? Py_True:Py_False;
223221
}
224222

225223
auto pickled_data_bytes = pickl->pickle_bytes(py_data);
226224
CHECK_ERROR_THEN("pickle send out data failure: ", return NULL;)
227225

228-
return registered_callback->ff_send_out_to(pickled_data_bytes, index) ? Py_True:Py_False;
226+
return registered_callback->ff_send_out(pickled_data_bytes, index) ? Py_True:Py_False;
229227
}
230228

231229
void register_callback(ff::ff_monode* cb_node) {

tests/python/a2a-ff-send-to.test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from fastflow import FFAllToAll, EOS, GO_ON, ff_send_out_to
1+
from fastflow import FFAllToAll, EOS, GO_ON, ff_send_out
22
import sys
33

44
"""
@@ -21,7 +21,7 @@ def svc(self, *arg):
2121
return EOS
2222
self.counter += 1
2323

24-
ff_send_out_to(list([f"source{self.id}-to-sink1"]), 0)
24+
ff_send_out(list([f"source{self.id}-to-sink1"]), 0)
2525

2626
class sink():
2727
def __init__(self, id):

tests/python/a2a-ff-send.test.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from fastflow import FFAllToAll, EOS, GO_ON, ff_send_out
2+
import sys
3+
4+
"""
5+
source _ _ sink
6+
| |
7+
source _|_|_ sink
8+
| |
9+
source _|_|_ sink
10+
|
11+
source _|
12+
"""
13+
14+
class source():
15+
def __init__(self, id):
16+
self.counter = 1
17+
self.id = id
18+
19+
def svc(self, *arg):
20+
while self.counter < 5:
21+
ff_send_out(list([f"source{self.id}-to-any"]))
22+
self.counter += 1
23+
return EOS
24+
25+
class sink():
26+
def __init__(self, id):
27+
self.id = id
28+
29+
def svc(self, lis: list):
30+
lis.append(f"sink{self.id}")
31+
print(lis)
32+
33+
def run_test(use_subinterpreters = True):
34+
a2a = FFAllToAll(use_subinterpreters)
35+
first_stage_size = 4
36+
second_stage_size = 3
37+
# build first stages
38+
first_lis = [source(i+1) for i in range(first_stage_size)]
39+
# build second stages
40+
second_lis = [sink(i+1) for i in range(second_stage_size)]
41+
# add first stages
42+
a2a.add_firstset(first_lis)
43+
# add second stages
44+
a2a.add_secondset(second_lis)
45+
a2a.run_and_wait_end()
46+
47+
if __name__ == "__main__":
48+
if sys.version_info[1] >= 12:
49+
print("Subinterpreters")
50+
run_test(use_subinterpreters = True)
51+
else:
52+
print("Skip subinterpreters test because python version is < 3.12")
53+
print()
54+
print("Processes")
55+
run_test(use_subinterpreters = False)

tests/python/a2a-ondemand.test.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from fastflow import FFAllToAll, EOS, GO_ON, ff_send_out
2+
import sys
3+
import time
4+
5+
"""
6+
source _ _ slowestsink
7+
| |
8+
source _|_|_ sink
9+
| |
10+
source _|_|_ sink
11+
|
12+
source _|
13+
"""
14+
15+
class source():
16+
def __init__(self, id):
17+
self.counter = 1
18+
self.id = id
19+
20+
def svc(self, *arg):
21+
while self.counter < 8:
22+
ff_send_out(list([f"source{self.id}-to-any"]))
23+
self.counter += 1
24+
return EOS
25+
26+
class sink():
27+
def __init__(self, id):
28+
self.id = id
29+
30+
def svc(self, lis: list):
31+
if self.id == 1:
32+
time.sleep(0.1)
33+
lis.append(f"slowestsink{self.id}")
34+
else:
35+
lis.append(f"sink{self.id}")
36+
print(lis)
37+
38+
def run_test(use_subinterpreters = True):
39+
a2a = FFAllToAll(use_subinterpreters)
40+
first_stage_size = 4
41+
second_stage_size = 3
42+
# build first stages
43+
first_lis = [source(i+1) for i in range(first_stage_size)]
44+
# build second stages
45+
second_lis = [sink(i+1) for i in range(second_stage_size)]
46+
# add first stages
47+
a2a.add_firstset(first_lis, ondemand=True)
48+
# add second stages
49+
a2a.add_secondset(second_lis)
50+
a2a.run_and_wait_end()
51+
52+
if __name__ == "__main__":
53+
if sys.version_info[1] >= 12:
54+
print("Subinterpreters")
55+
run_test(use_subinterpreters = True)
56+
else:
57+
print("Skip subinterpreters test because python version is < 3.12")
58+
print()
59+
print("Processes")
60+
run_test(use_subinterpreters = False)

0 commit comments

Comments
 (0)