66namespace libproxy ;
77
88use Error ;
9+ use ErrorException ;
910use Exception ;
1011use libproxy \data \LatencyData ;
1112use libproxy \data \TickSyncPacket ;
1718use libproxy \protocol \ProxyPacketSerializer ;
1819use pmmp \thread \Thread as NativeThread ;
1920use pmmp \thread \ThreadSafeArray ;
21+ use pocketmine \network \mcpe \compression \DecompressionException ;
22+ use pocketmine \network \mcpe \compression \ZlibCompressor ;
2023use pocketmine \network \mcpe \convert \TypeConverter ;
2124use pocketmine \network \mcpe \EntityEventBroadcaster ;
2225use pocketmine \network \mcpe \NetworkSession ;
2326use pocketmine \network \mcpe \PacketBroadcaster ;
27+ use pocketmine \network \mcpe \protocol \PacketDecodeException ;
2428use pocketmine \network \mcpe \protocol \PacketPool ;
29+ use pocketmine \network \mcpe \protocol \serializer \PacketBatch ;
2530use pocketmine \network \mcpe \protocol \serializer \PacketSerializerContext ;
31+ use pocketmine \network \mcpe \protocol \types \CompressionAlgorithm ;
2632use pocketmine \network \mcpe \raklib \PthreadsChannelReader ;
2733use pocketmine \network \mcpe \raklib \PthreadsChannelWriter ;
2834use pocketmine \network \NetworkInterface ;
3238use pocketmine \Server ;
3339use pocketmine \snooze \SleeperHandlerEntry ;
3440use pocketmine \thread \ThreadCrashException ;
41+ use pocketmine \timings \Timings ;
3542use pocketmine \utils \Binary ;
3643use pocketmine \utils \BinaryDataException ;
44+ use pocketmine \utils \BinaryStream ;
3745use Socket ;
3846use ThreadedArray ;
3947use WeakMap ;
48+ use function base64_encode ;
4049use function bin2hex ;
50+ use function ord ;
4151use function socket_close ;
4252use function socket_create_pair ;
4353use function socket_last_error ;
4656use function strlen ;
4757use function substr ;
4858use function trim ;
59+ use function zstd_uncompress ;
4960use const AF_INET ;
5061use const AF_UNIX ;
5162use const SOCK_STREAM ;
@@ -214,7 +225,7 @@ private function onPacketReceive(string $buffer): void
214225 break ; // might be data arriving from the client after the server has closed the connection
215226 }
216227
217- $ session ->handleEncoded ($ pk ->payload );
228+ $ this ->handleEncoded ($ session , $ pk ->payload );
218229 $ this ->receiveBytes += strlen ($ pk ->payload );
219230 break ;
220231 }
@@ -225,6 +236,70 @@ private function onPacketReceive(string $buffer): void
225236 }
226237 }
227238
239+ /**
240+ * @throws PacketHandlingException
241+ */
242+ public function handleEncoded (NetworkSession $ session , string $ payload ): void
243+ {
244+ if (!(fn () => $ this ->connected )->call ($ session )) {
245+ return ;
246+ }
247+
248+ Timings::$ playerNetworkReceive ->startTiming ();
249+ try {
250+ (fn () => $ this ->packetBatchLimiter ->decrement ())->call ($ session );
251+
252+ if (strlen ($ payload ) < 1 ) {
253+ throw new PacketHandlingException ("No bytes in payload " );
254+ }
255+
256+ Timings::$ playerNetworkReceiveDecompress ->startTiming ();
257+ $ compressionType = ord ($ payload [0 ]);
258+ $ compressed = substr ($ payload , 1 );
259+
260+ try {
261+ $ decompressed = match ($ compressionType ) {
262+ CompressionAlgorithm::NONE => $ compressed ,
263+ CompressionAlgorithm::ZLIB => $ session ->getCompressor ()->decompress ($ compressed ),
264+ CompressionAlgorithm::NONE - 1 => ($ d = zstd_uncompress ($ compressed )) === false ? throw new DecompressionException ("Failed to decompress packet " ) : $ d ,
265+ default => throw new PacketHandlingException ("Packet compressed with unexpected compression type $ compressionType " )
266+ };
267+ } catch (ErrorException |DecompressionException $ e ) {
268+ $ session ->getLogger ()->debug ("Failed to decompress packet: " . base64_encode ($ compressed ));
269+ throw PacketHandlingException::wrap ($ e , "Compressed packet batch decode error " );
270+ } finally {
271+ Timings::$ playerNetworkReceiveDecompress ->stopTiming ();
272+ }
273+
274+ try {
275+ $ stream = new BinaryStream ($ decompressed );
276+ $ count = 0 ;
277+ foreach (PacketBatch::decodeRaw ($ stream ) as $ buffer ) {
278+ (fn () => $ this ->gamePacketLimiter ->decrement ())->call ($ session );
279+ if (++$ count > 100 ) {
280+ throw new PacketHandlingException ("Too many packets in batch " );
281+ }
282+ $ packet = PacketPool::getInstance ()->getPacket ($ buffer );
283+ if ($ packet === null ) {
284+ $ session ->getLogger ()->debug ("Unknown packet: " . base64_encode ($ buffer ));
285+ throw new PacketHandlingException ("Unknown packet received " );
286+ }
287+ try {
288+ $ session ->handleDataPacket ($ packet , $ buffer );
289+ } catch (PacketHandlingException $ e ) {
290+ $ session ->getLogger ()->debug ($ packet ->getName () . ": " . base64_encode ($ buffer ));
291+ throw PacketHandlingException::wrap ($ e , "Error processing " . $ packet ->getName ());
292+ }
293+ }
294+ } catch (PacketDecodeException |BinaryDataException $ e ) {
295+ $ session ->getLogger ()->logException ($ e );
296+ throw PacketHandlingException::wrap ($ e , "Packet batch decode error " );
297+ }
298+ } finally {
299+ Timings::$ playerNetworkReceive ->stopTiming ();
300+ }
301+ }
302+
228303 public function tick (): void
229304 {
230305 if (!$ this ->proxy ->isRunning ()) {
@@ -295,20 +370,13 @@ public function createSession(int $socketId, string $ip, int $port): NetworkSess
295370 new ProxyPacketSender ($ socketId , $ this ),
296371 $ this ->packetBroadcaster ,
297372 $ this ->entityEventBroadcaster ,
298- MultiCompressor ::getInstance (),
373+ ZlibCompressor ::getInstance (),
299374 TypeConverter::getInstance (),
300375 $ ip ,
301376 $ port
302377 );
303378
304379 $ this ->sessions [$ socketId ] = $ session ;
305-
306- // Set the LoginPacketHandler, since compression is handled by the proxy
307- (function (): void {
308- /** @noinspection PhpUndefinedFieldInspection */
309- $ this ->onSessionStartSuccess ();
310- })->call ($ session );
311-
312380 return $ session ;
313381 }
314382
0 commit comments