Skip to content

Commit 5aa195f

Browse files
committed
Scripts: Add PCAP Parser (incl. Nonce brute-forcer)
1 parent 9a05029 commit 5aa195f

2 files changed

Lines changed: 97 additions & 68 deletions

File tree

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ httpx
55
aiortc
66
construct
77
dpkt
8+
hexdump
89

910
wheel
1011
flake8

xcloud/scripts/pcap_reader.py

Lines changed: 96 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -3,95 +3,123 @@
33
"""
44
import argparse
55
import logging
6-
from typing import Any, Optional
6+
import struct
7+
from typing import Any, Optional, Generator
78

89
import dpkt
10+
from hexdump import hexdump
911
from aiortc import rtp
1012
from aioice import stun
1113
from construct.lib import containers
1214

13-
from ..protocol import packets, teredo, ipv6
15+
from ..protocol import packets, teredo, ipv6, srtp_crypto
1416

1517

1618
logging.basicConfig(level=logging.DEBUG)
1719
containers.setGlobalPrintFullStrings(True)
1820
LOG = logging.getLogger(__name__)
1921

20-
def get_info_stun(stun: stun.Message) -> None:
21-
return f'STUN: {stun}'
22-
23-
def get_info_rtp(rtp: rtp.RtpPacket) -> None:
24-
try:
25-
payload_name = packets.PayloadType(rtp.payload_type)
26-
except:
27-
payload_name = '<UNKNOWN>'
28-
29-
return f'RTP: {payload_name.name} {rtp} SSRC={rtp.ssrc}'
30-
31-
def get_info_teredo(teredo: teredo.TeredoPacket) -> None:
32-
info = f'TEREDO: {teredo}'
33-
if teredo.ipv6.next_header != ipv6.NO_NEXT_HEADER:
34-
data = teredo.ipv6.data
35-
if type(data) == bytes:
36-
raise ValueError(f'TEREDO contains unparsed-subpacket: {data}')
37-
subpacket_info = get_info_general(data)
38-
info += f'\n -> TEREDO-WRAPPED: {subpacket_info}'
39-
return info
40-
41-
42-
PACKET_TYPES = [
43-
(stun.parse_message, get_info_stun),
44-
(rtp.RtpPacket.parse, get_info_rtp),
45-
(teredo.TeredoPacket.parse, get_info_teredo)
46-
]
47-
48-
def get_info_general(packet: Any) -> Optional[str]:
49-
if isinstance(packet, dpkt.udp.UDP):
50-
data = bytes(packet.data)
51-
for cls, info_func in PACKET_TYPES:
52-
try:
53-
instance = cls(data)
54-
info = info_func(instance)
55-
return info
56-
except:
57-
pass
58-
elif isinstance(packet, bytes):
59-
return '<RAW BYTES>'
60-
else:
61-
return '<UNHANDLED>'
62-
63-
def packet_filter(filepath):
64-
with open(filepath, 'rb') as fh:
65-
for ts, buf in dpkt.pcap.Reader(fh):
66-
eth = dpkt.ethernet.Ethernet(buf)
67-
68-
# Make sure the Ethernet data contains an IP packet
69-
if not isinstance(eth.data, dpkt.ip.IP):
70-
continue
71-
72-
ip = eth.data
73-
subpacket = ip.data
74-
if not isinstance(subpacket, dpkt.udp.UDP):
75-
continue
76-
77-
yield(subpacket, ts)
78-
79-
80-
def parse_file(pcap_filepath: str) -> None:
81-
for packet, timestamp in packet_filter(pcap_filepath):
82-
info = get_info_general(packet)
83-
if info:
84-
print(info)
22+
class XcloudPcapParser:
23+
def __init__(self, srtp_key: Optional[str]):
24+
self.crypto: Optional[srtp_crypto.SrtpContext] = None
25+
if srtp_key:
26+
self.crypto = srtp_crypto.SrtpContext.from_base64(srtp_key)
27+
28+
@property
29+
def PACKET_TYPES(self):
30+
return [
31+
(stun.parse_message, self.get_info_stun),
32+
(rtp.RtpPacket.parse, self.get_info_rtp),
33+
(teredo.TeredoPacket.parse, self.get_info_teredo)
34+
]
35+
36+
def get_info_stun(self, stun: stun.Message) -> None:
37+
return f'STUN: {stun}'
38+
39+
def brute_force_nonce(self, nonce_orig: bytes) -> Generator:
40+
for byte1 in range(0, 0xFF):
41+
for byte2 in range(0, 0xFF):
42+
nonce_transform = b''.join([nonce_orig[:5], struct.pack('!B', byte1), nonce_orig[6:11], struct.pack('!B', byte2)])
43+
yield nonce_transform
44+
45+
def get_info_rtp(self, rtp: rtp.RtpPacket) -> None:
46+
try:
47+
payload_name = packets.PayloadType(rtp.payload_type)
48+
except:
49+
payload_name = '<UNKNOWN>'
50+
51+
info_str = f'RTP: {payload_name.name} {rtp} SSRC={rtp.ssrc}'
52+
if self.crypto:
53+
rtp_packet_serialized = rtp.serialize()
54+
rtp_header, rtp_data = rtp_packet_serialized[:12], rtp_packet_serialized[12:]
55+
nonce_orig = self.crypto.session_keys.nonce_key[2:]
56+
for nonce_transformed in self.brute_force_nonce(nonce_orig):
57+
try:
58+
decrypted = self.crypto._decrypt(self.crypto.decryptor_ctx, nonce_transformed, rtp_data, rtp_header)
59+
info_str += "\n" + hexdump(decrypted, result='return') + "\n"
60+
except Exception:
61+
pass
62+
return info_str
63+
64+
def get_info_teredo(self, teredo: teredo.TeredoPacket) -> None:
65+
info = f'TEREDO: {teredo}'
66+
if teredo.ipv6.next_header != ipv6.NO_NEXT_HEADER:
67+
data = teredo.ipv6.data
68+
if type(data) == bytes:
69+
raise ValueError(f'TEREDO contains unparsed-subpacket: {data}')
70+
subpacket_info = self.get_info_general(data)
71+
info += f'\n -> TEREDO-WRAPPED: {subpacket_info}'
72+
return info
73+
74+
def get_info_general(self, packet: Any) -> Optional[str]:
75+
if isinstance(packet, dpkt.udp.UDP):
76+
data = bytes(packet.data)
77+
for cls, info_func in self.PACKET_TYPES:
78+
try:
79+
instance = cls(data)
80+
info = info_func(instance)
81+
return info
82+
except:
83+
pass
84+
elif isinstance(packet, bytes):
85+
return '<RAW BYTES>'
86+
else:
87+
return '<UNHANDLED>'
88+
89+
def packet_filter(self, filepath):
90+
with open(filepath, 'rb') as fh:
91+
for ts, buf in dpkt.pcap.Reader(fh):
92+
eth = dpkt.ethernet.Ethernet(buf)
93+
94+
# Make sure the Ethernet data contains an IP packet
95+
if not isinstance(eth.data, dpkt.ip.IP):
96+
continue
97+
98+
ip = eth.data
99+
subpacket = ip.data
100+
if not isinstance(subpacket, dpkt.udp.UDP):
101+
continue
102+
103+
yield(subpacket, ts)
104+
105+
106+
def parse_file(self, pcap_filepath: str) -> None:
107+
for packet, timestamp in self.packet_filter(pcap_filepath):
108+
info = self.get_info_general(packet)
109+
if info:
110+
print(info)
85111

86112
def main():
87113
parser = argparse.ArgumentParser(
88114
"XCloud PCAP parser",
89115
description="PCAP Parser for XCloud network traffic"
90116
)
91117
parser.add_argument("filepath", help="Path to PCAP/NG file")
118+
parser.add_argument("--key", "-k", help="SRTP key")
92119
args = parser.parse_args()
93120

94-
parse_file(args.filepath)
121+
pcap_parser = XcloudPcapParser(args.key)
122+
pcap_parser.parse_file(args.filepath)
95123

96124

97125
if __name__ == "__main__":

0 commit comments

Comments
 (0)