-
Notifications
You must be signed in to change notification settings - Fork 321
Expand file tree
/
Copy pathasync_request_socket.cpp
More file actions
125 lines (110 loc) · 3.16 KB
/
async_request_socket.cpp
File metadata and controls
125 lines (110 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* Copyright (C) 2015 Dato, Inc.
* All rights reserved.
*
* This software may be modified and distributed under the terms
* of the BSD license. See the LICENSE file for details.
*/
#include <cassert>
#include <parallel/atomic.hpp>
#include <logger/logger.hpp>
#include <nanosockets/print_zmq_error.hpp>
#include <nanosockets/socket_errors.hpp>
#include <nanosockets/socket_config.hpp>
#include <nanosockets/async_request_socket.hpp>
extern "C" {
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
}
static graphlab::atomic<size_t> ASYNC_SOCKET_CTR;
namespace graphlab {
namespace nanosockets {
async_request_socket::async_request_socket(std::string target_connection,
size_t num_connections) {
server = normalize_address(target_connection);
sockets.resize(num_connections);
for (size_t i = 0;i < sockets.size(); ++i) {
available.push_back(i);
}
}
void async_request_socket::close() {
// closes all sockets
for (size_t i = 0;i < sockets.size(); ++i) {
if (sockets[i].z_socket != -1) {
nn_close(sockets[i].z_socket);
sockets[i].z_socket = -1;
}
}
}
async_request_socket::~async_request_socket() {
std::unique_lock<mutex> lock(global_lock);
sockets.clear();
available.clear();
cvar.signal();
lock.unlock();
close();
}
void async_request_socket::set_receive_poller(boost::function<bool()> fn) {
receive_poller = fn;
}
int async_request_socket::request_master(zmq_msg_vector& msgs,
zmq_msg_vector& ret,
size_t timeout) {
// find a free target to lock
std::unique_lock<mutex> lock(global_lock);
while(available.size() == 0 && sockets.size() > 0) {
cvar.wait(lock);
}
if (sockets.size() == 0) {
// no sockets available!
return -1;
}
size_t wait_socket = available[available.size() - 1];
available.pop_back();
lock.unlock();
create_socket(wait_socket);
int rc = 0;
// retry a few times
for (size_t retry = 0; retry < 3; ++retry) {
do {
rc = msgs.send(sockets[wait_socket].z_socket, SEND_TIMEOUT);
} while(rc == EAGAIN);
if (rc == 0) break;
}
if (rc == 0) {
timer ti;
do {
rc = ret.recv(sockets[wait_socket].z_socket, 1000);
if (rc != 0 && receive_poller && receive_poller() == false) break;
if (rc != 0 && timeout > 0 && ti.current_time() > timeout) break;
} while(rc == EAGAIN);
}
// restore available socket
lock.lock();
available.push_back(wait_socket);
cvar.signal();
lock.unlock();
// return
if (rc != 0) {
return rc;
} else {
return 0;
}
}
int async_request_socket::create_socket(size_t i) {
if (sockets[i].z_socket == -1) {
sockets[i].z_socket = nn_socket(AF_SP, NN_REQ);
int resendintl = 2147483647;
int rc = nn_setsockopt(sockets[i].z_socket, NN_REQ, NN_REQ_RESEND_IVL , &(resendintl), sizeof(resendintl));
assert(rc == 0);
set_conservative_socket_parameters(sockets[i].z_socket);
rc = nn_connect(sockets[i].z_socket, server.c_str());
if (rc == -1) {
print_zmq_error("Unexpected error on connection");
return rc;
}
}
return 0;
}
} // namespace nanosockets
}