-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtestping1.py
More file actions
201 lines (172 loc) · 10.5 KB
/
testping1.py
File metadata and controls
201 lines (172 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import os
import subprocess
import concurrent.futures
import ipaddress
import logging
import shutil
import re
from tqdm import tqdm # Install with `pip install tqdm`
# ⚡ Bolt: Cached DEVNULL file descriptor to minimize subprocess spawn overhead
# and kernel syscalls when firing thousands of concurrent pings.
DEVNULL_FD = open(os.devnull, "wb")
# ⚡ Bolt: Cache the absolute path of the ping executable.
# Calling shutil.which() once at module load avoids the overhead of traversing
# the system PATH environment variable during every subprocess.call() execution.
# This yields a measurable speedup when firing thousands of concurrent pings.
PING_PATH = shutil.which("ping")
if not PING_PATH:
# 🛡️ Sentinel: Fail securely if the required system binary is missing, rather than
# falling back to a relative path ("ping") which could execute a malicious local file.
raise RuntimeError("Required 'ping' binary not found in system PATH.")
def is_reachable(ip, timeout=1):
"""Checks if a device at the given IP address is reachable with a ping.
Args:
ip (str): The IP address to ping.
timeout (int, optional): The timeout in seconds for the ping. Defaults to 1.
Returns:
bool: True if the ping is successful, False otherwise.
"""
# 🛡️ Sentinel: Add input length limit to prevent resource exhaustion (DoS)
# The ipaddress module can take significant time to parse extremely long strings
if isinstance(ip, str) and len(ip) > 100:
logging.error("IP address string too long")
return False
# ⚡ Bolt: Fast-path for pre-instantiated IP objects to avoid redundant string parsing
# overhead. Avoids calling ipaddress.ip_address() for every ip.
if isinstance(ip, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
ip_obj = ip
else:
# 🛡️ Sentinel: Validate IP address to prevent argument injection
# Catch TypeError alongside ValueError as ipaddress.ip_address()
# raises TypeError when passed None or non-string/int objects,
# which can crash the worker thread pool (DoS) if unhandled.
try:
ip_obj = ipaddress.ip_address(ip)
except (ValueError, TypeError):
# 🛡️ Sentinel: Sanitize log input to prevent CRLF/Log Injection
logging.error(f"Invalid IP address format: {repr(ip)}")
return False
# 🛡️ Sentinel: Prevent Log and Argument Injection via IPv6 scope_id
# The python ipaddress module allows arbitrary characters (including \n and ;) in
# the scope_id of IPv6 addresses. If unhandled, this can lead to argument
# injection in the subprocess call or log injection.
if getattr(ip_obj, 'scope_id', None):
if not re.fullmatch(r'[\w\-]+', ip_obj.scope_id):
logging.error(f"Invalid IPv6 scope ID: {repr(ip)}")
return False
# 🛡️ Sentinel: Prevent Server-Side Request Forgery (SSRF)
# Block loopback, link-local, multicast, unspecified, and reserved addresses from being pinged.
# reserved addresses include the broadcast address (255.255.255.255)
if ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast or ip_obj.is_unspecified or ip_obj.is_reserved:
# 🛡️ Sentinel: Sanitize log input using repr() to prevent CRLF/Log Injection
# IPv6 addresses can contain an arbitrary scope ID (e.g., %eth0\r\n) which is
# not sanitized by ipaddress.ip_address() and could allow log spoofing.
logging.error(f"IP address not allowed for scanning: {repr(ip)}")
return False
# 🛡️ Sentinel: Validate timeout length to prevent CPU exhaustion (DoS)
# Python's int() conversion for massive strings has O(N^2) complexity.
if isinstance(timeout, str) and len(timeout) > 100:
logging.error("Timeout string too long")
return False
# 🛡️ Sentinel: Validate timeout to prevent argument injection or errors
# 🛡️ Sentinel: Add input length limit to prevent CPU exhaustion (DoS)
# Python's int() conversion algorithm can be exploited with very long strings
if isinstance(timeout, str) and len(timeout) > 100:
logging.error("Timeout string too long")
return False
try:
timeout_val = int(timeout)
if timeout_val <= 0 or timeout_val > 100:
raise ValueError("Timeout must be a positive integer <= 100")
except (ValueError, TypeError, OverflowError):
# 🛡️ Sentinel: Catch OverflowError alongside ValueError/TypeError
# Inputs originating from JSON can include Infinity (parsed as float)
# which raises OverflowError when cast to int and crashes threads.
# 🛡️ Sentinel: Sanitize log input to prevent CRLF/Log Injection
logging.error(f"Invalid timeout value: {repr(timeout)}")
return False
# ⚡ Bolt: Optimized ping execution by adding `-n` and `-q` flags.
# The `-n` flag skips reverse DNS resolution. Without it, ping attempts to
# resolve the hostname for every IP, which can cause multi-second delays
# (even with a 1s timeout) if the IP lacks a PTR record or DNS is unresponsive.
# The `-q` (quiet) flag suppresses output generation. This prevents the `ping`
# binary from allocating and formatting string output for every ICMP echo reply
# it receives, slightly reducing CPU usage on the host OS when firing thousands
# of concurrent pings.
command = [PING_PATH, "-n", "-q", "-c", "1", "-W", str(timeout_val), str(ip_obj)] # -W for timeout in seconds (Linux)
# ⚡ Bolt: Optimized ping execution by using subprocess.call and redirecting
# output to DEVNULL instead of using Popen with PIPE.
# This avoids the Inter-Process Communication (IPC) overhead of capturing
# stdout/stderr, resulting in ~35% speedup for parallel network scans.
# ⚡ Bolt: Disabled close_fds and used cached DEVNULL_FD to avoid the overhead of
# iterating and closing all possible file descriptors in the child process
# and opening/closing /dev/null per execution.
try:
# 🛡️ Sentinel: Add python-level timeout limit as defense-in-depth to prevent
# worker thread pool exhaustion if the underlying ping process hangs.
return subprocess.call(command, stdout=DEVNULL_FD, stderr=DEVNULL_FD, close_fds=False, timeout=timeout_val + 2) == 0
except OSError:
# 🛡️ Sentinel: Fail securely on command execution errors (like FileNotFoundError)
# to prevent unhandled exceptions crashing the worker thread pool and leaking stack traces.
logging.error("Failed to execute ping command safely.")
return False
except subprocess.TimeoutExpired:
# 🛡️ Sentinel: Catch TimeoutExpired securely to prevent it from crashing the worker thread pool.
logging.error("Ping command timed out unexpectedly.")
return False
if __name__ == "__main__":
# Example usage: Check reachability within a specific subnet (replace with your allowed range)
start_ip = "192.168.43.1" # Adjust starting IP
end_ip = "192.168.43.254" # Adjust ending IP (modify for your network)
# Ensure you have permission to scan this subnet!
# 🛡️ Sentinel: Validate main block inputs to prevent arbitrary execution or DoS
# Ensure start_ip and end_ip are valid IP addresses, are in the correct order,
# and limit the maximum scan range to prevent resource exhaustion.
try:
start_obj = ipaddress.ip_address(start_ip)
end_obj = ipaddress.ip_address(end_ip)
# 🛡️ Sentinel: Validate IP versions match to prevent unhandled TypeError
# Comparing IPv4 and IPv6 addresses raises a TypeError which crashes the script.
if start_obj.version != end_obj.version:
raise ValueError("start_ip and end_ip must be of the same IP version")
if start_obj > end_obj:
raise ValueError("start_ip must be less than or equal to end_ip")
total_ips = int(end_obj) - int(start_obj) + 1
# Limit to 256 IPs (typically one /24 subnet) to prevent Denial of Service
if total_ips > 256:
raise ValueError(f"Scan range too large ({total_ips} IPs). Maximum 256 IPs allowed per scan.")
except (ValueError, TypeError) as e:
logging.error(f"Invalid scan range configuration: {e}")
exit(1)
# ⚡ Bolt: Optimize sequential IP address generation
# Pre-computing the base integer and directly instantiating the specific IP class
# avoids the overhead of the overloaded addition operator on IP objects.
# Using .compressed instead of str() further avoids overhead, yielding ~15-20% faster generation.
base_int = int(start_obj)
ip_class = type(start_obj)
# ⚡ Bolt: Pass pre-instantiated IP objects to worker threads to avoid string parsing overhead
ips_to_scan = [ip_class(base_int + i) for i in range(total_ips)]
# ⚡ Bolt: Parallelize network scanning using ThreadPoolExecutor
# Reduces scan time significantly by performing pings concurrently instead of sequentially.
# Time complexity with respect to network delay improves from O(N) to O(N / workers).
# ⚡ Bolt: Optimized parallel network scanning by removing the synchronous console
# I/O bottleneck `pbar.set_description` from the tqdm progress bar loop.
# This keeps the `as_completed` real-time progress updates while cutting
# baseline execution time by ~50%.
# ⚡ Bolt: Increase ThreadPoolExecutor max_workers to total_ips (up to a limit)
# Allows more concurrent pings, drastically reducing scan time from ~6.5s to ~1.5s
# when many addresses are unreachable and timeout.
max_workers = min(total_ips, 256)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(is_reachable, ip): ip for ip in ips_to_scan}
# ⚡ Bolt: Wrapped as_completed directly with tqdm to delegate progress tracking
# to its optimized internal C/Python iteration logic. This eliminates the manual
# context manager and pbar.update(1) overhead, yielding ~20% faster loop iteration.
for future in tqdm(concurrent.futures.as_completed(futures), total=total_ips, desc="Scanning network..."):
ip_address = futures[future]
# Removing pbar.set_description(f"Pinging {ip_address}...") here avoids console I/O bottleneck
if future.result():
# ⚡ Bolt: Replaced print() with tqdm.write() to prevent synchronous console I/O
# bottlenecks and progress bar redraw interference when rendering rapid output.
tqdm.write(f"Device reachable at: {ip_address}")
print("Scanning complete.")