Skip to content

Commit 534e06c

Browse files
authored
json_dump: draft of reconnect feature (#217)
json_dump reconnection improvements
1 parent 38a5f15 commit 534e06c

1 file changed

Lines changed: 149 additions & 84 deletions

File tree

json_dump/json_dump.py

Lines changed: 149 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,156 @@
11
#!/usr/bin/python3
22

3-
import sys
4-
import os.path
5-
import socket
3+
import argparse
64
import io
7-
import pytrap
85
import json
9-
import optparse
10-
11-
from optparse import OptionParser
12-
parser = OptionParser(add_help_option=True)
13-
parser.add_option("-i", "--ifcspec", dest="ifcspec",
14-
help="See https://nemea.liberouter.org/trap-ifcspec/", metavar="IFCSPEC")
15-
parser.add_option("-w", dest="filename",
16-
help="Write dump to FILE instead of stdout (overwrite file)", metavar="FILE")
17-
parser.add_option("-a", dest="filename_append",
18-
help="Write dump to FILE instead of stdout (append to file)", metavar="FILE")
19-
parser.add_option("-s", dest="networktarget",
20-
help="Stream data over TCP", metavar="HOST:PORT")
21-
parser.add_option("-I", "--indent", metavar="N", type=int,
22-
help="Pretty-print JSON with indentation set to N spaces. Note that such format can't be read by json_replay module.")
23-
parser.add_option("-v", "--verbose", action="store_true",
24-
help="Set verbose mode - print messages.")
25-
parser.add_option("--noflush", action="store_true",
26-
help="Disable automatic flush of output buffer after writing a record (may improve performance).")
27-
28-
29-
# Parse remaining command-line arguments
30-
(options, args) = parser.parse_args()
31-
32-
# Initialize module
33-
trap = pytrap.TrapCtx()
34-
trap.init(["-i", options.ifcspec])
35-
36-
# Open output file
37-
if options.filename and options.filename_append:
38-
sys.stderr.write("Error: -w and -a are mutually exclusive.")
39-
sys.exit(1)
40-
if options.filename:
41-
file = io.FileIO(options.filename, "w")
42-
elif options.filename_append:
43-
file = io.FileIO(options.filename_append, "a")
44-
elif options.networktarget:
45-
addr = options.networktarget.split(":")
46-
if len(addr) != 2:
47-
raise AttributeError("Malformed argument of -s host:port")
48-
s = socket.create_connection((addr[0], int(addr[1])))
49-
file = socket.SocketIO(s, "w")
50-
else:
51-
file = sys.stdout
52-
53-
# Set JSON as required data type on input
54-
trap.setRequiredFmt(0, pytrap.FMT_JSON, "")
55-
56-
stop = False
57-
# Main loop (trap.stop is set to True when SIGINT or SIGTERM is received)
58-
while not stop:
59-
# Read data from input interface
60-
try:
61-
data = trap.recv()
62-
except pytrap.FormatMismatch:
63-
sys.stderr.write("Error: output and input interfaces data type or format mismatch\n")
64-
break
65-
except pytrap.FormatChanged as e:
66-
if options.verbose:
67-
print(trap.getDataFmt(0))
68-
data = e.data
69-
del(e)
70-
pass
71-
except (pytrap.Terminated, KeyboardInterrupt):
72-
break
73-
74-
# Check for "end-of-stream" record
75-
if len(data) <= 1:
76-
if options.verbose:
77-
print('Received "end-of-stream" message, going to quit.')
78-
break
6+
import socket
7+
import sys
8+
import time
9+
from typing import Optional, TextIO
10+
11+
import pytrap
12+
13+
14+
def get_parser() -> argparse.ArgumentParser:
15+
"""Prepare the argument parser.
16+
17+
Returns:
18+
An instance of ArgumentParser with ready arguments.
19+
"""
20+
parser = argparse.ArgumentParser(description="Print received JSON messages to stdout or a file.")
21+
parser.add_argument("-i", "--ifcspec", dest="ifcspec", metavar="IFCSPEC",
22+
required=True,
23+
help="See https://nemea.liberouter.org/trap-ifcspec/")
24+
parser.add_argument("-I", "--indent", metavar="N", type=int,
25+
help="Pretty-print JSON with indentation set to N spaces. "
26+
"Note that such format can't be read by json_replay module.")
27+
parser.add_argument("-v", "--verbose", action="store_true",
28+
help="Set verbose mode (print messages).")
29+
parser.add_argument("--noflush", action="store_true",
30+
help="Disable automatic flush of output buffer after writing a "
31+
"record (may improve performance).")
32+
33+
group = parser.add_mutually_exclusive_group()
34+
group.add_argument("-w", dest="filename", metavar="FILE",
35+
help="Write data to FILE instead of stdout (overwrite file)")
36+
group.add_argument("-a", dest="filename_append", metavar="FILE",
37+
help="Write data to FILE instead of stdout (append to file)")
38+
group.add_argument("-s", dest="networktarget", metavar="HOST:PORT",
39+
help="Send data using a TCP network stream to HOST:PORT")
40+
return parser
41+
42+
43+
def connect_socket(address: str, port: int, wait_interval: int = 5) -> TextIO:
44+
"""Create a connection to a socket given an address and a port.
45+
46+
The connection is tried repeatedly until successful. If the connection to the socket
47+
fails, wait 5 seconds and retry.
48+
49+
Args:
50+
address (str): The address of the destination socket.
51+
port (int): The port of the destination socket.
52+
wait_interval (int): Number of seconds to wait before retrying connection if it fails (default: 5 sec)
53+
54+
Returns:
55+
TextIO object providing access to opened socket.
56+
"""
57+
last_error = None
58+
while True:
59+
if last_error is None:
60+
print(f"{time.strftime('%F-%T')} Connecting to {address}:{port} ...", file=sys.stderr)
61+
try:
62+
s = socket.create_connection((address, port))
63+
print(f"{time.strftime('%F-%T')} Connection established.", file=sys.stderr)
64+
last_error = None
65+
return s.makefile("w", encoding="utf-8")
66+
except OSError as e:
67+
# sleep for a while and then reconnect (print error only once or when the error message changes)
68+
if last_error is None or str(e) != last_error:
69+
print(f"{time.strftime('%F-%T')} Connection failed ({e}), retrying every {wait_interval} seconds ...", file=sys.stderr)
70+
last_error = str(e)
71+
time.sleep(wait_interval)
72+
73+
74+
def main():
75+
parser = get_parser()
76+
parsed_args = parser.parse_args()
77+
78+
address: Optional[str] = None
79+
port: Optional[int] = None
80+
81+
# Parsing the arguments
82+
if parsed_args.filename:
83+
file = open(parsed_args.filename, "w", encoding="utf-8")
84+
elif parsed_args.filename_append:
85+
file = open(parsed_args.filename_append, "a", encoding="utf-8")
86+
elif parsed_args.networktarget:
87+
try:
88+
address, port = parsed_args.networktarget.split(":")
89+
port = int(port)
90+
except (TypeError, ValueError):
91+
print("Error: malformed argument of -s host:port", file=sys.stderr)
92+
sys.exit(1)
93+
file = connect_socket(address, port)
94+
else:
95+
file = sys.stdout
96+
97+
# Initialize the PyTrap module
98+
trap = pytrap.TrapCtx()
99+
trap.init(["-i", parsed_args.ifcspec])
100+
101+
# Set JSON as required data type on input
102+
trap.setRequiredFmt(0, pytrap.FMT_JSON, "")
103+
104+
# Main loop
105+
while True:
106+
# Read data from input interface
107+
try:
108+
data = trap.recv()
109+
except pytrap.FormatMismatch:
110+
print("Error: output and input interfaces data type or format mismatch", file=sys.stderr)
111+
break
112+
except pytrap.FormatChanged as e:
113+
if parsed_args.verbose:
114+
print(trap.getDataFmt(0))
115+
data = e.data
116+
except (pytrap.Terminated, KeyboardInterrupt):
117+
break
118+
119+
# Check for "end-of-stream" record
120+
if len(data) <= 1:
121+
if parsed_args.verbose:
122+
print('Received "end-of-stream" message, going to quit.')
123+
break
79124

80-
try:
81125
# Decode data (and check it's valid JSON)
82-
rec = json.loads(data.decode("utf-8"))
83-
if options.verbose:
84-
print("Message: {0}".format(rec))
85-
# Print it to file or stdout
86-
file.write(bytes(json.dumps(rec, indent=options.indent) + '\n', "utf-8"))
87-
if not options.noflush:
88-
file.flush()
89-
except ValueError as e:
90-
sys.stderr.write(str(e) + '\n')
126+
try:
127+
rec = json.loads(data.decode("utf-8"))
128+
except ValueError as e:
129+
print(f"ERROR: Received invalid JSON (message skipped): {e}", file=sys.stderr)
130+
continue
131+
132+
if parsed_args.verbose:
133+
print(f"Message: {format(rec)}")
91134

135+
# Print it to file, stdout, or send to socket
136+
try:
137+
file.write(json.dumps(rec, indent=parsed_args.indent) + '\n')
138+
if not parsed_args.noflush:
139+
file.flush()
140+
except IOError as e:
141+
if parsed_args.networktarget:
142+
print(f"{time.strftime('%F-%T')} Connection error: {e}", file=sys.stderr)
143+
# connection error, try to reconnect and send the message again
144+
file = connect_socket(address, port)
145+
file.write(json.dumps(rec, indent=parsed_args.indent) + '\n')
146+
if not parsed_args.noflush:
147+
file.flush()
148+
else:
149+
raise
150+
151+
152+
if __name__ == "__main__":
153+
try:
154+
main()
155+
except KeyboardInterrupt:
156+
pass # quietly exit program without traceback

0 commit comments

Comments
 (0)