11# -*- coding: utf-8 -*-
22
3- import pymysql
43import struct
54from distutils .version import LooseVersion
65
6+ import pymysql
77from pymysql .constants .COMMAND import COM_BINLOG_DUMP , COM_REGISTER_SLAVE
88from pymysql .cursors import DictCursor
99
10- from .packet import BinLogPacketWrapper
1110from .constants .BINLOG import TABLE_MAP_EVENT , ROTATE_EVENT , FORMAT_DESCRIPTION_EVENT
12- from .gtid import GtidSet
1311from .event import (
1412 QueryEvent , RotateEvent , FormatDescriptionEvent ,
1513 XidEvent , GtidEvent , StopEvent , XAPrepareEvent ,
1614 BeginLoadQueryEvent , ExecuteLoadQueryEvent ,
1715 HeartbeatLogEvent , NotImplementedEvent , MariadbGtidEvent ,
18- MariadbAnnotateRowsEvent , RandEvent , MariadbStartEncryptionEvent )
16+ MariadbAnnotateRowsEvent , RandEvent , MariadbStartEncryptionEvent , RowsQueryLogEvent )
1917from .exceptions import BinLogNotEnabled
18+ from .gtid import GtidSet
19+ from .packet import BinLogPacketWrapper
2020from .row_event import (
2121 UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent )
2222
3333
3434
3535class ReportSlave (object ):
36-
3736 """Represent the values that you may report when connecting as a slave
3837 to a master. SHOW SLAVE HOSTS related"""
3938
@@ -68,7 +67,7 @@ def __init__(self, value):
6867 self .hostname = value
6968
7069 def __repr__ (self ):
71- return '<ReportSlave hostname=%s username=%s password=%s port=%d>' % \
70+ return '<ReportSlave hostname=%s username=%s password=%s port=%d>' % \
7271 (self .hostname , self .username , self .password , self .port )
7372
7473 def encoded (self , server_id , master_id = 0 ):
@@ -123,7 +122,6 @@ def encoded(self, server_id, master_id=0):
123122
124123
125124class BinLogStreamReader (object ):
126-
127125 """Connect to replication stream and read event
128126 """
129127 report_slave = None
@@ -317,7 +315,7 @@ def __connect_to_stream(self):
317315 4294967 ))
318316 # If heartbeat is too low, the connection will disconnect before,
319317 # this is also the behavior in mysql
320- heartbeat = float (min (net_timeout / 2. , self .slave_heartbeat ))
318+ heartbeat = float (min (net_timeout / 2. , self .slave_heartbeat ))
321319 if heartbeat > 4294967 :
322320 heartbeat = 4294967
323321
@@ -353,7 +351,7 @@ def __connect_to_stream(self):
353351 cur .close ()
354352
355353 prelude = struct .pack ('<i' , len (self .log_file ) + 11 ) \
356- + bytes (bytearray ([COM_BINLOG_DUMP ]))
354+ + bytes (bytearray ([COM_BINLOG_DUMP ]))
357355
358356 if self .__resume_stream :
359357 prelude += struct .pack ('<I' , self .log_pos )
@@ -370,7 +368,7 @@ def __connect_to_stream(self):
370368 prelude += self .log_file .encode ()
371369 else :
372370 if self .is_mariadb :
373- prelude = self .__set_mariadb_settings ()
371+ prelude = self .__set_mariadb_settings ()
374372 else :
375373 # Format for mysql packet master_auto_position
376374 #
@@ -417,8 +415,8 @@ def __connect_to_stream(self):
417415 8 + # binlog_pos_info_size
418416 4 ) # encoded_data_size
419417
420- prelude = b'' + struct .pack ('<i' , header_size + encoded_data_size )\
421- + bytes (bytearray ([COM_BINLOG_DUMP_GTID ]))
418+ prelude = b'' + struct .pack ('<i' , header_size + encoded_data_size ) \
419+ + bytes (bytearray ([COM_BINLOG_DUMP_GTID ]))
422420
423421 flags = 0
424422 if not self .__blocking :
@@ -455,7 +453,7 @@ def __connect_to_stream(self):
455453 def __set_mariadb_settings (self ):
456454 # https://mariadb.com/kb/en/5-slave-registration/
457455 cur = self ._stream_connection .cursor ()
458- if self .auto_position != None :
456+ if self .auto_position != None :
459457 cur .execute ("SET @slave_connect_state='%s'" % self .auto_position )
460458 cur .execute ("SET @slave_gtid_strict_mode=1" )
461459 cur .execute ("SET @slave_gtid_ignore_duplicates=0" )
@@ -466,7 +464,7 @@ def __set_mariadb_settings(self):
466464 4 + # binlog pos
467465 2 + # binlog flags
468466 4 + # slave server_id,
469- 4 # requested binlog file name , set it to empty
467+ 4 # requested binlog file name , set it to empty
470468 )
471469
472470 prelude = struct .pack ('<i' , header_size ) + bytes (bytearray ([COM_BINLOG_DUMP ]))
@@ -478,11 +476,11 @@ def __set_mariadb_settings(self):
478476
479477 # Enable annotate rows event
480478 if self .__annotate_rows_event :
481- flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
479+ flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
482480
483481 if not self .__blocking :
484482 flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
485-
483+
486484 # binlog flags
487485 prelude += struct .pack ('<H' , flags )
488486
@@ -622,10 +620,11 @@ def _allowed_event_list(self, only_events, ignored_events,
622620 HeartbeatLogEvent ,
623621 NotImplementedEvent ,
624622 MariadbGtidEvent ,
623+ RowsQueryLogEvent ,
625624 MariadbAnnotateRowsEvent ,
626625 RandEvent ,
627- MariadbStartEncryptionEvent
628- ))
626+ MariadbStartEncryptionEvent ,
627+ ))
629628 if ignored_events is not None :
630629 for e in ignored_events :
631630 events .remove (e )
0 commit comments