Skip to content

Commit 6c240dc

Browse files
committed
feat: switch StreamOut node to UDP unicast; update synapse-api (#18)
BREAKING CHANGE: support UDP unicast in StreamOut node, remove UDP multicast
1 parent a0e69c7 commit 6c240dc

9 files changed

Lines changed: 427 additions & 78 deletions

File tree

examples/stats/main.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ auto configure_stream(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr
5656

5757
NodeConfig stream_out_config;
5858
auto* stream_out_proto = stream_out_config.mutable_stream_out();
59-
stream_out_proto->set_multicast_group("224.0.0.115");
59+
auto* udp_config = stream_out_proto->mutable_udp_unicast();
60+
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);
6061

6162
std::shared_ptr<Node> stream_out_node;
6263
s = StreamOut::from_proto(stream_out_config, &stream_out_node);

examples/stream_out/main.cpp

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <memory>
2+
#include <chrono>
23

34
#include "science/scipp/status.h"
45
#include "science/synapse/channel.h"
@@ -18,14 +19,16 @@ using synapse::NodeType;
1819
using synapse::Signal;
1920
using synapse::StreamOut;
2021
using synapse::SynapseData;
22+
using synapse::NodeConfig;
23+
using synapse::Node;
2124

2225

2326
auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> science::Status {
27+
const uint32_t N_CHANNELS = 10;
2428
if (stream_out_ptr == nullptr) {
2529
return { science::StatusCode::kInvalidArgument, "stream out pointer is null" };
2630
}
2731

28-
std::string group = "224.0.0.10";
2932
science::Status s;
3033
DeviceInfo info;
3134
s = device.info(&info);
@@ -39,8 +42,8 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
3942
}
4043
};
4144
auto& electrodes = std::get<Electrodes>(signal.signal);
42-
electrodes.channels.reserve(19);
43-
for (unsigned int i = 0; i < 19; i++) {
45+
electrodes.channels.reserve(N_CHANNELS);
46+
for (unsigned int i = 0; i < N_CHANNELS; i++) {
4447
electrodes.channels.push_back(Ch{
4548
.id = i,
4649
.electrode_id = i * 2,
@@ -50,7 +53,22 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
5053

5154
Config config;
5255
auto broadband_source = std::make_shared<synapse::BroadbandSource>(100, 16, 30000, 20.0, signal);
53-
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
56+
57+
// Create StreamOut with explicit configuration
58+
NodeConfig stream_out_config;
59+
auto* stream_out_proto = stream_out_config.mutable_stream_out();
60+
auto* udp_config = stream_out_proto->mutable_udp_unicast();
61+
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);
62+
stream_out_proto->set_label("Broadband Stream");
63+
64+
std::shared_ptr<Node> stream_out_node;
65+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
66+
if (!s.ok()) return s;
67+
68+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
69+
if (!*stream_out_ptr) {
70+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
71+
}
5472

5573
s = config.add_node(broadband_source);
5674
if (!s.ok()) return s;
@@ -64,7 +82,10 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
6482
s = device.configure(&config);
6583
if (!s.ok()) return s;
6684

85+
std::cout << "Configured device" << std::endl;
86+
6787
s = device.start();
88+
std::cout << "Started device" << std::endl;
6889

6990
return s;
7091
}
@@ -75,18 +96,17 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
7596
}
7697

7798
science::Status s;
78-
7999
DeviceInfo info;
80100
s = device.info(&info);
81101
if (!s.ok()) return s;
82102

83-
uint32_t stream_out_id = 0; // default id
84-
std::string group;
103+
uint32_t stream_out_id = 0;
104+
NodeConfig stream_out_config;
85105
const auto& nodes = info.configuration().nodes();
86106
for (const auto& node : nodes) {
87107
if (node.type() == NodeType::kStreamOut) {
88108
stream_out_id = node.id();
89-
group = node.stream_out().multicast_group();
109+
stream_out_config = node;
90110
break;
91111
}
92112
}
@@ -95,9 +115,16 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
95115
return { science::StatusCode::kNotFound, "no stream out node found" };
96116
}
97117

98-
std::cout << "found stream out node with id " << stream_out_id << " and group " << group << std::endl;
118+
std::shared_ptr<Node> stream_out_node;
119+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
120+
if (!s.ok()) return s;
121+
122+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
123+
if (!*stream_out_ptr) {
124+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
125+
}
99126

100-
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
127+
std::cout << "found stream out node with id " << stream_out_id << std::endl;
101128

102129
Config config;
103130
s = config.add_node(*stream_out_ptr, stream_out_id);
@@ -107,7 +134,7 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
107134
if (!s.ok()) return s;
108135

109136
return s;
110-
}
137+
}
111138

112139
auto stream(const std::string& uri, bool configure) -> int {
113140
synapse::Device device(uri);
@@ -121,7 +148,6 @@ auto stream(const std::string& uri, bool configure) -> int {
121148
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
122149
return 1;
123150
}
124-
125151
} else {
126152
s = stream_existing(device, &stream_out);
127153
if (!s.ok()) {
@@ -136,6 +162,7 @@ auto stream(const std::string& uri, bool configure) -> int {
136162
return 1;
137163
}
138164

165+
std::cout << "Reading..." << std::endl;
139166
while (true) {
140167
SynapseData out;
141168
s = stream_out->read(&out);
@@ -185,16 +212,24 @@ auto stream(const std::string& uri, bool configure) -> int {
185212
std::cout << ss.str() << std::endl;
186213
}
187214
} else {
188-
std::cout << "data type unknown" << std::endl;
215+
std::cout << "received data of unknown type" << std::endl;
189216
}
190217
}
191218

192-
return 1;
219+
return 0;
193220
}
194221

195222
int main(int argc, char** argv) {
196-
std::string uri = "192.168.0.1:647";
197-
stream(uri, false);
223+
if (argc != 2 && argc != 3) {
224+
std::cout << "Usage: " << argv[0] << " <uri> [--config]" << std::endl;
225+
std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl;
226+
std::cout << " --config: optional flag to configure a new stream" << std::endl;
227+
std::cout << " if omitted, uses existing stream" << std::endl;
228+
return 1;
229+
}
198230

199-
return 0;
231+
std::string uri = argv[1];
232+
bool configure = (argc == 3 && std::string(argv[2]) == "--config");
233+
234+
return stream(uri, configure);
200235
}

include/science/synapse/nodes/stream_out.h

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@
88
#include "science/libndtp/types.h"
99
#include "science/scipp/status.h"
1010
#include "science/synapse/api/nodes/stream_out.pb.h"
11-
#include "science/synapse/nodes/udp_node.h"
11+
#include "science/synapse/node.h"
1212

1313
namespace synapse {
1414

15-
class StreamOut : public UdpNode {
15+
class StreamOut : public Node {
1616
public:
17-
StreamOut(const std::string& label, const std::string& multicast_group);
17+
static constexpr uint16_t DEFAULT_STREAM_OUT_PORT = 50038;
18+
StreamOut(const std::string& destination_address = "",
19+
uint16_t destination_port = DEFAULT_STREAM_OUT_PORT,
20+
const std::string& label = "");
21+
~StreamOut();
1822

23+
auto init() -> science::Status;
1924
auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status;
2025

2126
[[nodiscard]] static auto from_proto(
@@ -27,11 +32,13 @@ class StreamOut : public UdpNode {
2732
auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override;
2833

2934
private:
30-
const std::string label_;
31-
const std::string multicast_group_;
35+
std::string destination_address_;
36+
uint16_t destination_port_;
37+
std::string label_;
38+
int socket_ = 0;
39+
std::optional<sockaddr_in> addr_;
3240

33-
auto init() -> science::Status;
34-
auto get_host(std::string* host) -> science::Status override;
41+
static constexpr uint32_t SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024; // 5MB
3542
};
3643

3744
} // namespace synapse

0 commit comments

Comments
 (0)