|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from opengsq.binary_reader import BinaryReader |
| 4 | +from opengsq.exceptions import InvalidPacketException |
| 5 | +from opengsq.protocol_base import ProtocolBase |
| 6 | +from opengsq.protocol_socket import UdpClient |
| 7 | +from opengsq.responses.cod5 import Info, Status, Cod5Status |
| 8 | + |
| 9 | + |
| 10 | +class CoD5(ProtocolBase): |
| 11 | + """ |
| 12 | + This class represents the Call of Duty 5: World at War Protocol. It provides methods to interact with CoD5 servers. |
| 13 | + """ |
| 14 | + |
| 15 | + full_name = "Call of Duty 5: World at War Protocol" |
| 16 | + |
| 17 | + def __init__(self, host: str, port: int = 28960, timeout: float = 5.0): |
| 18 | + """ |
| 19 | + Initializes the CoD5 object with the given parameters. |
| 20 | +
|
| 21 | + :param host: The host of the server. |
| 22 | + :param port: The port of the server (default: 28960). |
| 23 | + :param timeout: The timeout for the server connection. |
| 24 | + """ |
| 25 | + super().__init__(host, port, timeout) |
| 26 | + self._source_port = 28960 # CoD5 requires source port 28960 |
| 27 | + |
| 28 | + async def get_info(self, challenge: str = "xxx") -> Info: |
| 29 | + """ |
| 30 | + Asynchronously retrieves the server information. |
| 31 | +
|
| 32 | + :param challenge: The challenge string to send (default: "xxx"). |
| 33 | + :return: An Info object containing the server information. |
| 34 | + """ |
| 35 | + # Construct the getinfo payload: ffffffff676574696e666f20787878 |
| 36 | + payload = b"\xFF\xFF\xFF\xFF" + b"getinfo " + challenge.encode('ascii') |
| 37 | + |
| 38 | + response_data = await UdpClient.communicate(self, payload, source_port=self._source_port) |
| 39 | + |
| 40 | + # Parse the response |
| 41 | + br = BinaryReader(response_data) |
| 42 | + |
| 43 | + # Skip the header (4 bytes of 0xFF) |
| 44 | + header = br.read_bytes(4) |
| 45 | + if header != b"\xFF\xFF\xFF\xFF": |
| 46 | + raise InvalidPacketException( |
| 47 | + f"Invalid packet header. Expected: \\xFF\\xFF\\xFF\\xFF. Received: {header.hex()}" |
| 48 | + ) |
| 49 | + |
| 50 | + # Read the response type |
| 51 | + response_type = br.read_string([b'\n']) |
| 52 | + if response_type != "infoResponse": |
| 53 | + raise InvalidPacketException( |
| 54 | + f"Unexpected response type. Expected: infoResponse. Received: {response_type}" |
| 55 | + ) |
| 56 | + |
| 57 | + # Parse the key-value pairs |
| 58 | + info_data = self._parse_key_value_pairs(br) |
| 59 | + |
| 60 | + return Info(info_data) |
| 61 | + |
| 62 | + async def get_status(self) -> Status: |
| 63 | + """ |
| 64 | + Asynchronously retrieves the server status. |
| 65 | +
|
| 66 | + :return: A Status object containing the server status. |
| 67 | + """ |
| 68 | + # Construct the getstatus payload: ffffffff676574737461747573 |
| 69 | + payload = b"\xFF\xFF\xFF\xFF" + b"getstatus" |
| 70 | + |
| 71 | + response_data = await UdpClient.communicate(self, payload, source_port=self._source_port) |
| 72 | + |
| 73 | + # Parse the response |
| 74 | + br = BinaryReader(response_data) |
| 75 | + |
| 76 | + # Skip the header (4 bytes of 0xFF) |
| 77 | + header = br.read_bytes(4) |
| 78 | + if header != b"\xFF\xFF\xFF\xFF": |
| 79 | + raise InvalidPacketException( |
| 80 | + f"Invalid packet header. Expected: \\xFF\\xFF\\xFF\\xFF. Received: {header.hex()}" |
| 81 | + ) |
| 82 | + |
| 83 | + # Read the response type |
| 84 | + response_type = br.read_string([b'\n']) |
| 85 | + if response_type != "statusResponse": |
| 86 | + raise InvalidPacketException( |
| 87 | + f"Unexpected response type. Expected: statusResponse. Received: {response_type}" |
| 88 | + ) |
| 89 | + |
| 90 | + # Parse the key-value pairs |
| 91 | + status_data = self._parse_key_value_pairs(br) |
| 92 | + |
| 93 | + return Status(status_data) |
| 94 | + |
| 95 | + async def get_full_status(self, challenge: str = "xxx") -> Cod5Status: |
| 96 | + """ |
| 97 | + Asynchronously retrieves both server info and status. |
| 98 | +
|
| 99 | + :param challenge: The challenge string to send (default: "xxx"). |
| 100 | + :return: A Cod5Status object containing both info and status. |
| 101 | + """ |
| 102 | + import asyncio |
| 103 | + |
| 104 | + # Add a small delay between requests to avoid socket conflicts |
| 105 | + info = await self.get_info(challenge) |
| 106 | + await asyncio.sleep(0.1) # 100ms delay |
| 107 | + status = await self.get_status() |
| 108 | + |
| 109 | + return Cod5Status(info=info, status=status) |
| 110 | + |
| 111 | + def _parse_key_value_pairs(self, br: BinaryReader) -> dict[str, str]: |
| 112 | + """ |
| 113 | + Parses key-value pairs from the binary reader. |
| 114 | + CoD5 uses backslash (\) as delimiter between keys and values. |
| 115 | +
|
| 116 | + :param br: The BinaryReader object to parse from. |
| 117 | + :return: A dictionary containing the parsed key-value pairs. |
| 118 | + """ |
| 119 | + data = {} |
| 120 | + |
| 121 | + # Read the remaining data as string |
| 122 | + remaining_data = br.read().decode('ascii', errors='ignore') |
| 123 | + |
| 124 | + # Split by backslash and process pairs |
| 125 | + parts = remaining_data.split('\\') |
| 126 | + |
| 127 | + # Remove empty first element if it exists (starts with \) |
| 128 | + if parts and parts[0] == '': |
| 129 | + parts = parts[1:] |
| 130 | + |
| 131 | + # Process pairs (key, value, key, value, ...) |
| 132 | + for i in range(0, len(parts) - 1, 2): |
| 133 | + if i + 1 < len(parts): |
| 134 | + key = parts[i].strip() |
| 135 | + value = parts[i + 1].strip() |
| 136 | + if key: # Only add non-empty keys |
| 137 | + data[key] = value |
| 138 | + |
| 139 | + return data |
| 140 | + |
| 141 | + |
| 142 | +if __name__ == "__main__": |
| 143 | + import asyncio |
| 144 | + |
| 145 | + async def main_async(): |
| 146 | + # Test with the provided server |
| 147 | + cod5 = CoD5(host="172.29.100.29", port=28960, timeout=5.0) |
| 148 | + |
| 149 | + try: |
| 150 | + print("Getting server info...") |
| 151 | + info = await cod5.get_info() |
| 152 | + print(f"Info: {info}") |
| 153 | + print(f"Hostname: {info.hostname}") |
| 154 | + print(f"Map: {info.mapname}") |
| 155 | + print(f"Gametype: {info.gametype}") |
| 156 | + print(f"Players: {info.clients}/{info.sv_maxclients}") |
| 157 | + |
| 158 | + print("\n" + "="*50) |
| 159 | + print("Getting server status...") |
| 160 | + await asyncio.sleep(0.2) # Wait a bit before next request |
| 161 | + status = await cod5.get_status() |
| 162 | + print(f"Status: {status}") |
| 163 | + print(f"Server Name: {status.sv_hostname}") |
| 164 | + print(f"Game: {status.gamename}") |
| 165 | + print(f"Map: {status.mapname}") |
| 166 | + |
| 167 | + except Exception as e: |
| 168 | + print(f"Error: {e}") |
| 169 | + import traceback |
| 170 | + traceback.print_exc() |
| 171 | + |
| 172 | + asyncio.run(main_async()) |
0 commit comments