Skip to content

Commit f73795a

Browse files
author
anders-wartoft
committed
Buggfix RELP
1 parent 10f036e commit f73795a

3 files changed

Lines changed: 310 additions & 291 deletions

File tree

src/main/java/nu/sitia/loggenerator/outputitems/RelpOutputItem.java

Lines changed: 177 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import nu.sitia.loggenerator.Configuration;
2121
import java.io.*;
2222
import java.net.Socket;
23+
import java.net.SocketTimeoutException;
2324
import java.util.List;
2425
import java.util.logging.Logger;
2526

@@ -121,14 +122,138 @@ public void send(List<String> toSend) {
121122
}
122123
}
123124

125+
/**
126+
* Reconnect to the RELP server after disconnection
127+
*/
128+
private void reconnect() throws IOException {
129+
logger.info("Reconnecting to RELP server...");
130+
try {
131+
if (socket != null && !socket.isClosed()) {
132+
socket.close();
133+
}
134+
} catch (IOException e) {
135+
logger.warning("Error closing old socket: " + e.getMessage());
136+
}
137+
138+
socket = new Socket(hostname, port);
139+
socket.setSoTimeout(5000);
140+
outputStream = socket.getOutputStream();
141+
inputReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
142+
143+
// Send RELP OPEN command with proper offers
144+
String openData = "relp_version=0\nrelp_software=LogGenerator\ncommands=syslog";
145+
String openFrame = "1 open " + openData.length() + " " + openData + "\n";
146+
logger.fine("Sending OPEN frame (reconnect): " + openFrame);
147+
outputStream.write(openFrame.getBytes());
148+
outputStream.flush();
149+
150+
String response = readRelpResponse();
151+
logger.fine("Received OPEN response (reconnect): " + response);
152+
153+
if (response == null || !response.contains("rsp")) {
154+
throw new RuntimeException("Failed to reconnect to RELP server. Response: " + response);
155+
}
156+
157+
transactionNr = 2;
158+
connected = true;
159+
logger.info("Reconnected to RELP server");
160+
}
161+
124162
/**
125163
* Send a single message via RELP protocol
126164
* @param message The message to send
127165
*/
128166
private void sendRelpMessage(String message) {
167+
sendRelpMessage(message, 0);
168+
}
169+
170+
/**
171+
* Read a RELP response from the server
172+
* @return The complete RELP response
173+
* @throws IOException if reading fails
174+
*/
175+
private String readRelpResponse() throws IOException {
176+
// Read the response line: txnr command len [data]
177+
String line = inputReader.readLine();
178+
if (line == null) {
179+
return null;
180+
}
181+
182+
logger.finest("Read RELP response line: " + line);
183+
184+
// Parse the header: txnr command len [data]
185+
String[] parts = line.split(" ", 4);
186+
if (parts.length < 3) {
187+
return line; // Return as-is if not proper RELP format
188+
}
189+
190+
try {
191+
int dataLen = Integer.parseInt(parts[2]);
192+
193+
if (dataLen == 0) {
194+
// No data
195+
return line;
196+
}
197+
198+
// Check if data is already on the same line
199+
if (parts.length == 4 && parts[3].length() >= dataLen) {
200+
return line;
201+
}
202+
203+
// Data is on subsequent lines or partially on this line
204+
// Read exactly dataLen bytes
205+
char[] dataBuffer = new char[dataLen];
206+
int totalRead = 0;
207+
208+
// If there's partial data on the header line, copy it first
209+
if (parts.length == 4 && parts[3].length() > 0) {
210+
int partialLen = Math.min(parts[3].length(), dataLen);
211+
parts[3].getChars(0, partialLen, dataBuffer, 0);
212+
totalRead = partialLen;
213+
}
214+
215+
// Read the remaining data
216+
while (totalRead < dataLen) {
217+
int read = inputReader.read(dataBuffer, totalRead, dataLen - totalRead);
218+
if (read == -1) {
219+
break;
220+
}
221+
totalRead += read;
222+
}
223+
224+
// Reconstruct the full response
225+
String data = new String(dataBuffer, 0, totalRead);
226+
String result = parts[0] + " " + parts[1] + " " + parts[2] + " " + data;
227+
logger.finest("Reconstructed RELP response: " + result);
228+
return result;
229+
230+
} catch (NumberFormatException e) {
231+
return line; // Return as-is if len is not a number
232+
}
233+
}
234+
235+
/**
236+
* Send a single message via RELP protocol with retry logic
237+
* @param message The message to send
238+
* @param retryCount Current retry count to prevent infinite recursion
239+
*/
240+
private void sendRelpMessage(String message, int retryCount) {
241+
if (retryCount > 3) {
242+
throw new RuntimeException("Failed to send message after " + retryCount + " retries. Check rsyslog RELP configuration.");
243+
}
244+
129245
try {
130246
if (!connected) {
131-
throw new RuntimeException("Not connected to RELP server");
247+
logger.fine("Not connected, attempting reconnect (retry " + retryCount + ")");
248+
try {
249+
Thread.sleep(200);
250+
reconnect();
251+
} catch (IOException e) {
252+
throw new RuntimeException("Failed to reconnect: " + e.getMessage(), e);
253+
} catch (InterruptedException e) {
254+
Thread.currentThread().interrupt();
255+
throw new RuntimeException("Interrupted during reconnect: " + e.getMessage(), e);
256+
}
132257
}
133258

134259
String relpFrame = createRelpFrame("syslog", message);
@@ -137,20 +262,45 @@ private void sendRelpMessage(String message) {
137262
outputStream.flush();
138263

139264
// Read response with timeout
140-
String response = inputReader.readLine();
265+
String response = readRelpResponse();
141266
if (response == null) {
142267
connected = false;
143-
throw new RuntimeException("Connection closed by server");
268+
logger.fine("Connection closed by server, retrying (attempt " + (retryCount + 1) + ")");
269+
sendRelpMessage(message, retryCount + 1);
270+
return;
144271
}
145-
if (!response.contains("rsp")) {
272+
273+
logger.finer("Received RELP response: " + response);
274+
275+
// Check if server is closing
276+
if (response.contains("serverclose")) {
277+
connected = false;
278+
logger.fine("Server sent serverclose, retrying (attempt " + (retryCount + 1) + ")");
279+
sendRelpMessage(message, retryCount + 1);
280+
return;
281+
}
282+
283+
// Parse RELP response: txnr command len data
284+
// Example: "2 rsp 2 OK" or "2 rsp 0 "
285+
String[] parts = response.split(" ", 4);
286+
if (parts.length < 2) {
146287
throw new RuntimeException("Invalid response from RELP server: " + response);
147288
}
148-
logger.finer("Received RELP response: " + response);
289+
290+
String command = parts[1];
291+
if (!"rsp".equalsIgnoreCase(command)) {
292+
throw new RuntimeException("Invalid response from RELP server (expected 'rsp', got '" + command + "'): " + response);
293+
}
294+
295+
// Success - message sent
296+
logger.finest("Message sent successfully");
297+
149298
} catch (IOException e) {
150299
connected = false;
151300
throw new RuntimeException("Failed to send RELP message: " + e.getMessage(), e);
152301
}
153302
}
303+
154304

155305
/**
156306
* Create a RELP frame
@@ -166,28 +316,43 @@ private String createRelpFrame(String command, String data) {
166316
return frame;
167317
}
168318

169-
@Override
319+
@Override
170320
public void setup() throws RuntimeException {
171321
super.setup();
172322
try {
173323
socket = new Socket(hostname, port);
324+
socket.setSoTimeout(5000); // 5 second timeout for reads
174325
outputStream = socket.getOutputStream();
175326
inputReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
176327

177-
// Send RELP OPEN command
178-
String openFrame = "1 open 0 \n";
328+
// Send RELP OPEN command with proper offers
329+
// Format: txnr command len data\n
330+
// Data format: relp_version=0\nrelp_software=LogGenerator\ncommands=syslog
331+
String openData = "relp_version=0\nrelp_software=LogGenerator\ncommands=syslog";
332+
String openFrame = "1 open " + openData.length() + " " + openData + "\n";
333+
logger.fine("Sending OPEN frame: " + openFrame);
179334
outputStream.write(openFrame.getBytes());
180335
outputStream.flush();
181336

182-
// Read open response
183-
String response = inputReader.readLine();
184-
if (response == null || !response.contains("rsp")) {
185-
throw new RuntimeException("Failed to open RELP connection");
337+
// Read open response with timeout
338+
String response = readRelpResponse();
339+
logger.fine("Received OPEN response: " + response);
340+
341+
if (response == null) {
342+
throw new RuntimeException("No response from RELP server");
343+
}
344+
345+
// Check for valid response
346+
if (!response.contains("rsp")) {
347+
throw new RuntimeException("Failed to open RELP connection. Response: " + response);
186348
}
187349

188350
connected = true;
189351
transactionNr = 2; // Next transaction number after OPEN
190352
logger.info("Connected to RELP server " + hostname + ":" + port);
353+
} catch (SocketTimeoutException e) {
354+
connected = false;
355+
throw new RuntimeException("Timeout connecting to RELP server: " + e.getMessage(), e);
191356
} catch (IOException e) {
192357
connected = false;
193358
throw new RuntimeException("Failed to connect to RELP server: " + e.getMessage(), e);

0 commit comments

Comments
 (0)