1212use libproxy \protocol \LoginPacket ;
1313use libproxy \protocol \ProxyPacket ;
1414use libproxy \protocol \ProxyPacketPool ;
15- use libproxy \protocol \ProxyPacketSerializer ;
1615use NetherGames \Quiche \io \QueueWriter ;
1716use NetherGames \Quiche \QuicheConnection ;
1817use NetherGames \Quiche \socket \QuicheServerSocket ;
1918use NetherGames \Quiche \SocketAddress ;
2019use NetherGames \Quiche \stream \BiDirectionalQuicheStream ;
2120use NetherGames \Quiche \stream \QuicheStream ;
21+ use pmmp \encoding \BE ;
22+ use pmmp \encoding \ByteBufferReader ;
23+ use pmmp \encoding \ByteBufferWriter ;
24+ use pmmp \encoding \DataDecodeException ;
25+ use pmmp \encoding \LE ;
2226use pmmp \thread \ThreadSafeArray ;
2327use pocketmine \network \mcpe \compression \DecompressionException ;
2428use pocketmine \network \mcpe \compression \ZlibCompressor ;
3135use pocketmine \network \mcpe \protocol \ProtocolInfo ;
3236use pocketmine \network \mcpe \protocol \RequestNetworkSettingsPacket ;
3337use pocketmine \network \mcpe \protocol \serializer \PacketBatch ;
34- use pocketmine \network \mcpe \protocol \serializer \PacketSerializer ;
3538use pocketmine \network \mcpe \protocol \types \CompressionAlgorithm ;
3639use pocketmine \network \mcpe \raklib \PthreadsChannelReader ;
3740use pocketmine \network \mcpe \raklib \SnoozeAwarePthreadsChannelWriter ;
3841use pocketmine \network \PacketHandlingException ;
3942use pocketmine \snooze \SleeperHandler ;
4043use pocketmine \snooze \SleeperHandlerEntry ;
4144use pocketmine \thread \log \AttachableThreadSafeLogger ;
42- use pocketmine \utils \Binary ;
43- use pocketmine \utils \BinaryDataException ;
44- use pocketmine \utils \BinaryStream ;
4545use Socket ;
4646use function array_keys ;
4747use function base64_encode ;
5656
5757class ProxyServer
5858{
59+ private const INCOMING_PACKET_BATCH_HARD_LIMIT = 300 ;
60+
5961 /** @var PthreadsChannelReader */
6062 private PthreadsChannelReader $ mainToThreadReader ;
6163 /** @var SnoozeAwarePthreadsChannelWriter */
@@ -221,12 +223,12 @@ private function shutdownStream(int $streamIdentifier, string $reason, bool $fro
221223
222224 private function sendToMainBuffer (int $ streamIdentifier , ProxyPacket $ pk ): void
223225 {
224- $ serializer = new ProxyPacketSerializer ();
225- $ serializer-> putLInt ( $ streamIdentifier );
226+ $ serializer = new ByteBufferWriter ();
227+ LE :: writeUnsignedInt ( $ serializer, $ streamIdentifier );
226228
227229 $ pk ->encode ($ serializer );
228230
229- $ this ->threadToMainWriter ->write ($ serializer ->getBuffer ());
231+ $ this ->threadToMainWriter ->write ($ serializer ->getData ());
230232 }
231233
232234 public function tickProcessor (): void
@@ -237,16 +239,18 @@ public function tickProcessor(): void
237239 private function pushSockets (): void
238240 {
239241 while (($ payload = $ this ->mainToThreadReader ->read ()) !== null ) {
240- $ stream = new ProxyPacketSerializer ($ payload );
241- $ streamIdentifier = $ stream-> getLInt ( );
242+ $ stream = new ByteBufferReader ($ payload );
243+ $ streamIdentifier = LE :: readUnsignedInt ( $ stream );
242244
243- if (($ pk = ProxyPacketPool::getInstance ()->getPacket ($ payload , $ stream ->getOffset ())) === null ) {
245+ $ offset = $ stream ->getOffset ();
246+ if (($ pk = ProxyPacketPool::getInstance ()->getPacket ($ stream )) === null ) {
244247 throw new PacketHandlingException ('Packet does not exist ' );
245248 }
249+ $ stream ->setOffset ($ offset );
246250
247251 try {
248252 $ pk ->decode ($ stream );
249- } catch (BinaryDataException $ e ) {
253+ } catch (DataDecodeException $ e ) {
250254 $ this ->logger ->debug ('Closed stream with id( ' . $ streamIdentifier . ') because server sent invalid packet ' );
251255 $ this ->shutdownStream ($ streamIdentifier , 'invalid packet ' , false );
252256 return ;
@@ -278,7 +282,7 @@ private function sendPayloadWithReceipt(int $streamIdentifier, string $payload,
278282 return ;
279283 }
280284
281- $ writer ->writeWithPromise (Binary:: writeInt (strlen ($ payload )) . $ payload )->onResult (function () use ($ streamIdentifier , $ receiptId ): void {
285+ $ writer ->writeWithPromise (BE :: packSignedInt (strlen ($ payload )) . $ payload )->onResult (function () use ($ streamIdentifier , $ receiptId ): void {
282286 $ pk = new AckPacket ();
283287 $ pk ->receiptId = $ receiptId ;
284288
@@ -296,7 +300,7 @@ private function sendPayload(int $streamIdentifier, string $payload): void
296300 return ;
297301 }
298302
299- $ writer ->write (Binary:: writeInt (strlen ($ payload )) . $ payload );
303+ $ writer ->write (BE :: packSignedInt (strlen ($ payload )) . $ payload );
300304 }
301305
302306 /**
@@ -323,26 +327,26 @@ private function getProtocolId(int $streamIdentifier): int
323327 */
324328 private function sendDataPacket (int $ streamIdentifier , BedrockPacket $ packet ): void
325329 {
326- $ packetSerializer = PacketSerializer:: encoder ( $ protocolId = $ this -> getProtocolId ( $ streamIdentifier ) );
327- $ packet ->encode ($ packetSerializer );
330+ $ packetSerializer = new ByteBufferWriter ( );
331+ $ packet ->encode ($ packetSerializer, $ protocolId = $ this -> getProtocolId ( $ streamIdentifier ) );
328332
329- $ stream = new BinaryStream ();
330- PacketBatch::encodeRaw ($ stream , [$ packetSerializer ->getBuffer ()]);
331- $ payload = ($ protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr (CompressionAlgorithm::ZLIB ) : '' ) . ZlibCompressor::getInstance ()->compress ($ stream ->getBuffer ());
333+ $ stream = new ByteBufferWriter ();
334+ PacketBatch::encodeRaw ($ stream , [$ packetSerializer ->getData ()]);
335+ $ payload = ($ protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr (CompressionAlgorithm::ZLIB ) : '' ) . ZlibCompressor::getInstance ()->compress ($ stream ->getData ());
332336
333337 $ this ->sendPayload ($ streamIdentifier , $ payload );
334338 }
335339
336340 private function decodePacket (int $ streamIdentifier , BedrockPacket $ packet , string $ buffer ): void
337341 {
338- $ stream = PacketSerializer:: decoder ( $ this -> protocolId [ $ streamIdentifier ] ?? ProtocolInfo:: CURRENT_PROTOCOL , $ buffer, 0 );
342+ $ stream = new ByteBufferReader ( $ buffer );
339343 try {
340- $ packet ->decode ($ stream );
344+ $ packet ->decode ($ stream, $ this -> protocolId [ $ streamIdentifier ] ?? ProtocolInfo:: CURRENT_PROTOCOL );
341345 } catch (PacketDecodeException $ e ) {
342346 throw PacketHandlingException::wrap ($ e );
343347 }
344- if (! $ stream ->feof () ) {
345- $ remains = substr ($ stream ->getBuffer (), $ stream ->getOffset ());
348+ if ($ stream ->getUnreadLength () > 0 ) {
349+ $ remains = substr ($ stream ->getData (), $ stream ->getOffset ());
346350 $ this ->logger ->debug ("Still " . strlen ($ remains ) . " bytes unread in " . $ packet ->getName () . ": " . bin2hex ($ remains ));
347351 }
348352 }
@@ -407,14 +411,18 @@ private function onFullDataReceive(int $streamIdentifier, string $payload): void
407411 throw PacketHandlingException::wrap ($ e , "Compressed packet batch decode error " );
408412 }
409413
414+ $ count = 0 ;
410415 try {
411- $ stream = new BinaryStream ($ decompressed );
412- $ count = 0 ;
416+ $ stream = new ByteBufferReader ($ decompressed );
413417 foreach (PacketBatch::decodeRaw ($ stream ) as $ buffer ) {
414- $ this ->getGamePacketLimiter ($ streamIdentifier )->decrement ();
415- if (++$ count > 100 ) {
416- throw new PacketHandlingException ("Too many packets in batch " );
418+ if (++$ count >= self ::INCOMING_PACKET_BATCH_HARD_LIMIT ){
419+ //this should be well more than enough; under normal conditions the game packet rate limiter
420+ //will kick in well before this. This is only here to make sure we can't get huge batches of
421+ //noisy packets to bog down the server, since those aren't counted by the regular limiter.
422+ throw new PacketHandlingException ("Reached hard limit of " . self ::INCOMING_PACKET_BATCH_HARD_LIMIT . " per batch packet " );
417423 }
424+
425+ $ this ->getGamePacketLimiter ($ streamIdentifier )->decrement ();
418426 $ packet = PacketPool::getInstance ()->getPacket ($ buffer );
419427 if ($ packet === null ) {
420428 $ this ->logger ->debug ("Unknown packet: " . base64_encode ($ buffer ));
@@ -429,7 +437,7 @@ private function onFullDataReceive(int $streamIdentifier, string $payload): void
429437 throw PacketHandlingException::wrap ($ e , "Error processing " . $ packet ->getName ());
430438 }
431439 }
432- } catch (PacketDecodeException |BinaryDataException $ e ) {
440+ } catch (PacketDecodeException |DataDecodeException $ e ) {
433441 $ this ->logger ->logException ($ e );
434442 throw PacketHandlingException::wrap ($ e , "Packet batch decode error " );
435443 }
@@ -457,8 +465,8 @@ private function onDataReceive(int $streamIdentifier, string $data): void
457465 return ; // wait for more data
458466 } else {
459467 try {
460- $ packetLength = Binary:: readInt (substr ($ buffer , 0 , 4 ));
461- } catch (BinaryDataException $ exception ) {
468+ $ packetLength = BE :: unpackSignedInt (substr ($ buffer , 0 , 4 ));
469+ } catch (DataDecodeException $ exception ) {
462470 $ this ->shutdownStream ($ streamIdentifier , 'invalid packet ' , false );
463471 return ;
464472 }
0 commit comments