44import argparse
55import logging
66import struct
7- from typing import Any , Optional , Generator
7+ from typing import Any , Optional
88
99import dpkt
1010from hexdump import hexdump
@@ -25,6 +25,7 @@ def __init__(self, srtp_key: Optional[str]):
2525 if srtp_key :
2626 self .outgoing_crypto = srtp_crypto .SrtpContext .from_base64 (srtp_key )
2727 self .incoming_crypto = srtp_crypto .SrtpContext .from_base64 (srtp_key )
28+ self .xbox_mac : Optional [bytes ] = None
2829
2930 @property
3031 def PACKET_TYPES (self ):
@@ -48,11 +49,14 @@ def get_info_rtp(self, rtp: rtp.RtpPacket, is_client: bool) -> None:
4849 if self .incoming_crypto and self .outgoing_crypto :
4950 rtp_packet = rtp .serialize ()
5051 try :
51- if is_client :
52- rtp_decrypted = self .outgoing_crypto .decrypt_packet (rtp_packet )
52+ if isinstance (is_client , bool ):
53+ if is_client :
54+ rtp_decrypted = self .outgoing_crypto .decrypt_packet (rtp_packet )
55+ else :
56+ rtp_decrypted = self .incoming_crypto .decrypt_packet (rtp_packet )
57+ info_str += "\n " + hexdump (rtp_decrypted .payload , result = 'return' ) + "\n "
5358 else :
54- rtp_decrypted = self .incoming_crypto .decrypt_packet (rtp_packet )
55- info_str += "\n " + hexdump (rtp_decrypted .payload , result = 'return' ) + "\n "
59+ info_str += "\n UNKNOWN DIRECTION \n "
5660 except Exception :
5761 info_str += "\n DECRYPTION FAILED \n "
5862 return info_str
@@ -67,13 +71,12 @@ def get_info_teredo(self, teredo: teredo.TeredoPacket, is_client: bool) -> None:
6771 info += f'\n -> TEREDO-WRAPPED: { subpacket_info } '
6872 return info
6973
70- def get_info_general (self , packet : Any ) -> Optional [str ]:
74+ def get_info_general (self , packet : Any , is_client : bool ) -> Optional [str ]:
7175 if isinstance (packet , dpkt .udp .UDP ):
7276 data = bytes (packet .data )
7377 for cls , info_func in self .PACKET_TYPES :
7478 try :
7579 instance = cls (data )
76- is_client = (packet .dport == 54881 )
7780 info = info_func (instance , is_client )
7881 return info
7982 except :
@@ -96,12 +99,21 @@ def packet_filter(self, filepath):
9699 if not isinstance (ip .data , dpkt .udp .UDP ):
97100 continue
98101
99- yield (ip , ts )
102+ # Check packet direction (client/host)
103+ if not self .xbox_mac and ip .data .sport == 3074 :
104+ self .xbox_mac = eth .src
105+
106+ if self .xbox_mac :
107+ is_client = (eth .src == self .xbox_mac )
108+ else :
109+ is_client = None
110+
111+ yield (ip , ts , is_client )
100112
101113
102114 def parse_file (self , pcap_filepath : str ) -> None :
103- for packet , timestamp in self .packet_filter (pcap_filepath ):
104- info = self .get_info_general (packet .data )
115+ for packet , timestamp , is_client in self .packet_filter (pcap_filepath ):
116+ info = self .get_info_general (packet .data , is_client )
105117 if info :
106118 print (info )
107119
0 commit comments