From 029a60d602a26dcc1f9281c7544b0dc95d1ca4c9 Mon Sep 17 00:00:00 2001 From: heehehe Date: Mon, 21 Aug 2023 01:32:18 +0900 Subject: [PATCH 01/23] docs: modify BeginLoadQueryEvent comment --- pymysqlreplication/event.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 6395a3ae..ada74359 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -352,10 +352,11 @@ def _read_status_vars_value_for_key(self, key): class BeginLoadQueryEvent(BinLogEvent): """ + This event is written into the binary log file for LOAD DATA INFILE events + if the server variable binlog_mode was set to "STATEMENT". - Attributes: - file_id - block-data + :ivar file_id: the id of the file + :ivar block-data: data block about "LOAD DATA INFILE" """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(BeginLoadQueryEvent, self).__init__(from_packet, event_size, table_map, From 1e34edb7b9e29c01ef8f7cbd71e02844b25a86f8 Mon Sep 17 00:00:00 2001 From: starcat37 Date: Tue, 22 Aug 2023 18:46:03 +0900 Subject: [PATCH 02/23] docs: add NotImplementedEvent comment --- pymysqlreplication/event.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index ada74359..5c1802db 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -439,6 +439,11 @@ def _dump(self): class NotImplementedEvent(BinLogEvent): + """ + Used as a temporary class for events that have not yet been implemented. + + The event referencing this class skips parsing. + """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(NotImplementedEvent, self).__init__( from_packet, event_size, table_map, ctl_connection, **kwargs) From a0fd49c0cecdc0ff8b935fa6aeacdc393e4fb21d Mon Sep 17 00:00:00 2001 From: starcat37 Date: Tue, 22 Aug 2023 18:48:21 +0900 Subject: [PATCH 03/23] docs: modify IntvarEvent comment --- pymysqlreplication/event.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 5c1802db..20ce66ef 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -419,10 +419,12 @@ def _dump(self): class IntvarEvent(BinLogEvent): """ - - Attributes: - type - value + Stores the value of auto-increment variables. + This event will be created just before a QueryEvent. + + :ivar type: int - 1 byte identifying the type of variable stored. + Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2). + :ivar value: int - The value of the variable """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(IntvarEvent, self).__init__(from_packet, event_size, table_map, From dd5b2323a22a00b6f8729e0c28c4f9238145990c Mon Sep 17 00:00:00 2001 From: ebang Date: Fri, 25 Aug 2023 14:54:17 +0900 Subject: [PATCH 04/23] docs: modify MariadbGtidEvents comments --- pymysqlreplication/event.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 20ce66ef..7dcff94e 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -92,8 +92,15 @@ def __repr__(self): class MariadbGtidEvent(BinLogEvent): """ - GTID change in binlog event in MariaDB - https://mariadb.com/kb/en/gtid_event/ + GTID(Global Transaction Identifier) change in binlog event in MariaDB + + for more information: `[see details] `_. + + :ivar server_id: int - The ID of the server where the GTID event occurred. + :ivar gtid_seq_no: int - The sequence number of the GTID event. + :ivar domain_id: int - The domain ID associated with the GTID event. + :ivar flags: int - Flags related to the GTID event. + :ivar gtid: str - The Global Transaction Identifier in the format ‘domain_id-server_id-gtid_seq_no’. """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From 679543f555596489cd875364bef352e913387638 Mon Sep 17 00:00:00 2001 From: ebang Date: Fri, 25 Aug 2023 14:55:21 +0900 Subject: [PATCH 05/23] docs: modify GtidEvent comments --- pymysqlreplication/event.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 7dcff94e..affb6671 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -54,6 +54,16 @@ def _dump(self): class GtidEvent(BinLogEvent): """GTID change in binlog event + + For more information : `[GTID] `_ `[see also] `_ + + :ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR. + In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format. + :ivar sid: 16 byte sequence - UUID representing the SID + :ivar gno: int - Group number, second component of GTID. + :ivar lt_type: int(1 byte) - The type of logical timestamp used in the logical clock fields. + :ivar last_committed: Store the transaction's commit parent sequence_number + :ivar sequence_number: The transaction's logical timestamp assigned at prepare phase """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(GtidEvent, self).__init__(from_packet, event_size, table_map, From 5456e1dadd5eaddf7dc8a6035ce714b044e097aa Mon Sep 17 00:00:00 2001 From: ebang Date: Fri, 25 Aug 2023 14:56:54 +0900 Subject: [PATCH 06/23] docs: modify HeartbeatLogEvent comments --- pymysqlreplication/event.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index affb6671..c581ccf8 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -216,22 +216,26 @@ def _dump(self): class HeartbeatLogEvent(BinLogEvent): """A Heartbeat event - Heartbeats are sent by the master only if there are no unsent events in the - binary log file for a period longer than the interval defined by - MASTER_HEARTBEAT_PERIOD connection setting. - - A mysql server will also play those to the slave for each skipped - events in the log. I (baloo) believe the intention is to make the slave - bump its position so that if a disconnection occurs, the slave only - reconnects from the last skipped position (see Binlog_sender::send_events - in sql/rpl_binlog_sender.cc). That makes 106 bytes of data for skipped - event in the binlog. *this is also the case with GTID replication*. To - mitigate such behavior, you are expected to keep the binlog small (see - max_binlog_size, defaults to 1G). - In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). + Heartbeats are sent by the master. + Master sends heartbeats when there are no unsent events in the binary log file after certain period of time. + The interval is defined by MASTER_HEARTBEAT_PERIOD connection setting. - Attributes: - ident: Name of the current binlog + `[see MASTER_HEARTBEAT_PERIOD] `_. + + A Mysql server also does it for each skipped events in the log. + This is because to make the slave bump its position so that + if a disconnection occurs, the slave will only reconnects from the lasted skipped position. (Baloo's idea) + + (see Binlog_sender::send_events in sql/rpl_binlog_sender.cc). + + Warning: + That makes 106 bytes of data for skipped event in the binlog. + *this is also the case with GTID replication*. + To mitigate such behavior, you are expected to keep the binlog small + (see max_binlog_size, defaults to 1G). + In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). + + :ivar ident: Name of the current binlog """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From de0cc85f5407e3d3ac162846eba1359536654964 Mon Sep 17 00:00:00 2001 From: ebang Date: Fri, 25 Aug 2023 14:57:45 +0900 Subject: [PATCH 07/23] docs: modify XAPrepareEvent comments --- pymysqlreplication/event.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index c581ccf8..1f0e6229 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -150,11 +150,13 @@ def dump(self): class XAPrepareEvent(BinLogEvent): """An XA prepare event is generated for a XA prepared transaction. - Like Xid_event it contains XID of the *prepared* transaction + Like Xid_event, it contains XID of the **prepared** transaction. - Attributes: - one_phase: current XA transaction commit method - xid: serialized XID representation of XA transaction + For more information: `[see details] `_. + + :ivar one_phase: current XA transaction commit method + :ivar xid_format_id: a number that identifies the format used by the gtrid and bqual values + :ivar xid: serialized XID representation of XA transaction (xid_gtrid + xid_bqual) """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(XAPrepareEvent, self).__init__(from_packet, event_size, table_map, From 21ffd3b344964cfc08d76f4d0a6931771de9d13c Mon Sep 17 00:00:00 2001 From: ebang Date: Fri, 25 Aug 2023 14:58:32 +0900 Subject: [PATCH 08/23] docs: modify RotateEvent comments --- pymysqlreplication/event.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 1f0e6229..89fa9098 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -130,10 +130,12 @@ def _dump(self): class RotateEvent(BinLogEvent): """Change MySQL bin log file + Represents information for the slave to know the name of the binary log it is going to receive. - Attributes: - position: Position inside next binlog - next_binlog: Name of next binlog file + For more information: `[see details] `_. + + :ivar position: int - Position inside next binlog + :ivar next_binlog: str - Name of next binlog file """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(RotateEvent, self).__init__(from_packet, event_size, table_map, From bc6fbaab215e57ebd4bd423b5b8ded2e5c0dd21a Mon Sep 17 00:00:00 2001 From: mjs Date: Sun, 27 Aug 2023 19:46:08 +0900 Subject: [PATCH 09/23] docs: add FormatDescriptionEvent comment --- pymysqlreplication/event.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 89fa9098..ec43ead7 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -184,6 +184,16 @@ def _dump(self): class FormatDescriptionEvent(BinLogEvent): + """ + Represents a Format Description Event in the MySQL binary log. + + This event is written at the start of a binary log file for binlog version 4. + It provides the necessary information to decode subsequent events in the file. + + :ivar binlog_version: int - Version of the binary log format. + :ivar mysql_version_str: str - Server's MySQL version in string format. + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(FormatDescriptionEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) From c6e9bc2b2b5a531e63877977751c7a1bf66ae9bc Mon Sep 17 00:00:00 2001 From: heehehe Date: Tue, 29 Aug 2023 01:23:31 +0900 Subject: [PATCH 10/23] docs: modify ExecuteLoadQueryEvent comment --- pymysqlreplication/event.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index ec43ead7..9feab4f8 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -305,8 +305,7 @@ def _read_status_vars_value_for_key(self, key): Parsing logic from mysql-server source code edited by dongwook-chan https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336 - Args: - key: key for status variable + :ivar key: key for status variable """ if key == Q_FLAGS2_CODE: # 0x00 self.flags2 = self.packet.read_uint32() @@ -409,18 +408,23 @@ def _dump(self): class ExecuteLoadQueryEvent(BinLogEvent): """ - - Attributes: - slave_proxy_id - execution_time - schema_length - error_code - status_vars_length - - file_id - start_pos - end_pos - dup_handling_flags + This event handles "LOAD DATA INFILE" statement. + LOAD DATA INFILE statement reads data from file and insert into database's table. + Since QueryEvent cannot explain this special action, ExecuteLoadQueryEvent is needed. + So it is similar to a QUERY_EVENT except that it has extra static fields. + + :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server + :ivar execution_time: int - The number of seconds that the statement took to execute + :ivar schema_length: int - The length of the default database's name when the statement was executed. + This name appears later, in the variable data part. + It is necessary for statements such as INSERT INTO t VALUES(1) that don't specify the database + and rely on the default database previously selected by USE + :ivar error_code: int - The error code resulting from execution of the statement on the master + :ivar status_vars_length: int - The length of the status variable block + :ivar file_id: int - The id of the loaded file + :ivar start_pos: int - Offset from the start of the statement to the beginning of the filename + :ivar end_pos: int - Offset from the start of the statement to the end of the filename + :ivar dup_handling_flags: int - How LOAD DATA INFILE handles duplicated data (0x0: error, 0x1: ignore, 0x2: replace) """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(ExecuteLoadQueryEvent, self).__init__(from_packet, event_size, table_map, From 95001ca7317c665d461baf79f73639b39d0fbdef Mon Sep 17 00:00:00 2001 From: heehehe Date: Tue, 29 Aug 2023 01:28:12 +0900 Subject: [PATCH 11/23] docs: remove unnecessary comments from ExecuteLoadQueryEvent --- pymysqlreplication/event.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 9feab4f8..ff12712a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -416,9 +416,6 @@ class ExecuteLoadQueryEvent(BinLogEvent): :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server :ivar execution_time: int - The number of seconds that the statement took to execute :ivar schema_length: int - The length of the default database's name when the statement was executed. - This name appears later, in the variable data part. - It is necessary for statements such as INSERT INTO t VALUES(1) that don't specify the database - and rely on the default database previously selected by USE :ivar error_code: int - The error code resulting from execution of the statement on the master :ivar status_vars_length: int - The length of the status variable block :ivar file_id: int - The id of the loaded file From e67bb54a389555df8400532b13d21aa0787cea08 Mon Sep 17 00:00:00 2001 From: ebang Date: Tue, 29 Aug 2023 16:12:28 +0900 Subject: [PATCH 12/23] docs: modify XidEvent comments --- pymysqlreplication/event.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index ff12712a..671c0b69 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -213,9 +213,11 @@ class StopEvent(BinLogEvent): class XidEvent(BinLogEvent): """A COMMIT event + Generated when COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine occurs. - Attributes: - xid: Transaction ID for 2PC + For more information : `[see details] `_. + + :ivar xid: uint - Transaction ID for 2 Phrase Commit. """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From 9cb581c04df80cb5b61db9b9dc3dfe17892dc6b0 Mon Sep 17 00:00:00 2001 From: will-k Date: Wed, 30 Aug 2023 03:05:57 +0900 Subject: [PATCH 13/23] Add docstring to QueryEvent --- pymysqlreplication/event.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 671c0b69..f1c84748 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -266,8 +266,19 @@ def _dump(self): class QueryEvent(BinLogEvent): - '''This event is trigger when a query is run of the database. - Only replicated queries are logged.''' + """ + QueryEvent is generated for each query that modified database. + If row-based replication is used, DML will not be logged as RowsEvent instead. + + :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server + :ivar execution_time: int - The time from when the query started to when it was logged in the binlog, in seconds. + :ivar schema_length: int - The length of the name of the currently selected database. + :ivar error_code: int - Error code generated by the master + :ivar status_vars_length: int - The length of the status variable + + :ivar schema: str - The name of the currently selected database. + :ivar query: str - The query executed. + """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(QueryEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) From 0199f1672d4efd29052027aed6488fc13714fe4d Mon Sep 17 00:00:00 2001 From: heehehe Date: Wed, 30 Aug 2023 10:56:01 +0900 Subject: [PATCH 14/23] fix: modify tab indentation to spaces --- pymysqlreplication/event.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index e42beafb..5f011710 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -59,7 +59,7 @@ class GtidEvent(BinLogEvent): :ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR. In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format. - :ivar sid: 16 byte sequence - UUID representing the SID + :ivar sid: 16 byte sequence - UUID representing the SID :ivar gno: int - Group number, second component of GTID. :ivar lt_type: int(1 byte) - The type of logical timestamp used in the logical clock fields. :ivar last_committed: Store the transaction's commit parent sequence_number @@ -313,7 +313,7 @@ class HeartbeatLogEvent(BinLogEvent): This is because to make the slave bump its position so that if a disconnection occurs, the slave will only reconnects from the lasted skipped position. (Baloo's idea) - (see Binlog_sender::send_events in sql/rpl_binlog_sender.cc). + (see Binlog_sender::send_events in sql/rpl_binlog_sender.cc) Warning: That makes 106 bytes of data for skipped event in the binlog. @@ -543,7 +543,7 @@ class IntvarEvent(BinLogEvent): This event will be created just before a QueryEvent. :ivar type: int - 1 byte identifying the type of variable stored. - Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2). + Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2). :ivar value: int - The value of the variable """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -649,7 +649,7 @@ class NotImplementedEvent(BinLogEvent): """ Used as a temporary class for events that have not yet been implemented. - The event referencing this class skips parsing. + The event referencing this class skips parsing. """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super().__init__( From b52c4d7416ce9b8e10d373e22487f40bc092cb15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=80=EC=A0=95=20=EB=B0=A9?= <108053947+ebang091@users.noreply.github.com> Date: Wed, 30 Aug 2023 11:28:43 +0900 Subject: [PATCH 15/23] docs: modify XidEvent comments --- pymysqlreplication/event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 5f011710..7903f8d0 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -288,7 +288,7 @@ class XidEvent(BinLogEvent): For more information : `[see details] `_. - :ivar xid: uint - Transaction ID for 2 Phrase Commit. + :ivar xid: uint - Transaction ID for 2 Phase Commit. """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): From 04974bfa8f195e8ed771265081ed4e9a8353c21f Mon Sep 17 00:00:00 2001 From: ebang Date: Wed, 30 Aug 2023 17:55:16 +0900 Subject: [PATCH 16/23] docs: add typing in BinLogEvent --- pymysqlreplication/event.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 7903f8d0..9f4862c3 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,8 +3,12 @@ import binascii import struct import datetime +import pymysql +from pymysql.protocol import MysqlPacket +from typing import Dict, Tuple, Type from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch +from pymysqlreplication.table import Table class BinLogEvent(object): @@ -17,6 +21,20 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, freeze_schema=False, fail_on_table_metadata_unavailable=False, ignore_decode_errors=False): + self.packet: Type[MysqlPacket] = from_packet + self.table_map: Dict[int, Table] = table_map + self.event_type: int = self.packet.event_type + self.timestamp: int = self.packet.timestamp + self.event_size: int = event_size + self._ctl_connection: pymysql.connections.Connection = ctl_connection + self.mysql_version: Tuple[int, int ,int] = mysql_version + self._fail_on_table_metadata_unavailable: bool = fail_on_table_metadata_unavailable + self._ignore_decode_errors: bool = ignore_decode_errors + # The event have been fully processed, if processed is false + # the event will be skipped + self._processed: bool = True + self.complete: bool = True + self.packet = from_packet self.table_map = table_map self.event_type = self.packet.event_type From c57038d4814cccba3682efa7bf709978eea58e30 Mon Sep 17 00:00:00 2001 From: ebang Date: Wed, 30 Aug 2023 17:59:21 +0900 Subject: [PATCH 17/23] docs: add typing in GtidEvent --- pymysqlreplication/event.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 9f4862c3..908556d5 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -87,14 +87,14 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.commit_flag = struct.unpack("!B", self.packet.read(1))[0] == 1 - self.sid = self.packet.read(16) - self.gno = struct.unpack('= (5, 7): - self.last_committed = struct.unpack(' Date: Wed, 30 Aug 2023 18:00:14 +0900 Subject: [PATCH 18/23] docs: add tying in MaradbGtidEvent --- pymysqlreplication/event.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 908556d5..9d28e779 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -134,11 +134,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.server_id = self.packet.server_id - self.gtid_seq_no = self.packet.read_uint64() - self.domain_id = self.packet.read_uint32() - self.flags = self.packet.read_uint8() - self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + self.server_id: int = self.packet.server_id + self.gtid_seq_no: int = self.packet.read_uint64() + self.domain_id: int = self.packet.read_uint32() + self.flags: int = self.packet.read_uint8() + self.gtid: str = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) def _dump(self): super()._dump() From 5a37b9ec4d2528b0d7d4ef49871105f4077cbe7b Mon Sep 17 00:00:00 2001 From: ebang Date: Wed, 30 Aug 2023 18:01:20 +0900 Subject: [PATCH 19/23] docs: add tying in MaradbBinLogCheckPointEvent --- pymysqlreplication/event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 9d28e779..e6cdb0e8 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -159,8 +159,8 @@ class MariadbBinLogCheckPointEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - filename_length = self.packet.read_uint32() - self.filename = self.packet.read(filename_length).decode() + filename_length: int = self.packet.read_uint32() + self.filename: str = self.packet.read(filename_length).decode() def _dump(self): print('Filename:', self.filename) From 8811fcace2f169ab55c7e60d01ff1d6a83ec54af Mon Sep 17 00:00:00 2001 From: ebang Date: Wed, 30 Aug 2023 18:37:07 +0900 Subject: [PATCH 20/23] docs: add typing in MariadbGtidListEvent, MariadbAnnotateRowsEvent --- pymysqlreplication/event.py | 46 +++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index e6cdb0e8..bf05998d 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -5,7 +5,7 @@ import datetime import pymysql from pymysql.protocol import MysqlPacket -from typing import Dict, Tuple, Type +from typing import Dict, List, Tuple, Type from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch from pymysqlreplication.table import Table @@ -73,7 +73,7 @@ def _dump(self): class GtidEvent(BinLogEvent): """GTID change in binlog event - For more information : `[GTID] `_ `[see also] `_ + For more information : `[see GtidEvent] `_ `[see also] `_ :ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR. In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format. @@ -122,7 +122,7 @@ class MariadbGtidEvent(BinLogEvent): """ GTID(Global Transaction Identifier) change in binlog event in MariaDB - for more information: `[see details] `_. + for more information: `[see MariadbGtidEvent] `_. :ivar server_id: int - The ID of the server where the GTID event occurred. :ivar gtid_seq_no: int - The sequence number of the GTID event. @@ -149,10 +149,9 @@ class MariadbBinLogCheckPointEvent(BinLogEvent): """ Represents a checkpoint in a binlog event in MariaDB. - More details are available in the MariaDB Knowledge Base: - https://mariadb.com/kb/en/binlog_checkpoint_event/ + for more information: `[see MaridbBinLogCheckPointEvent] `_ - :ivar filename_length: int - The length of the filename. + :ivar filename_length: int - The length of the filename. :ivar filename: str - The name of the file saved at the checkpoint. """ @@ -172,11 +171,11 @@ class MariadbAnnotateRowsEvent(BinLogEvent): https://mariadb.com/kb/en/annotate_rows_event/ Attributes: - sql_statement: The SQL statement + sql_statement: str - The SQL statement """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.sql_statement = self.packet.read(event_size) + self.sql_statement: str = self.packet.read(event_size) def _dump(self): super()._dump() @@ -184,18 +183,15 @@ def _dump(self): class MariadbGtidListEvent(BinLogEvent): """ - GTID List event - https://mariadb.com/kb/en/gtid_list_event/ + for more information: `[see MariadbGtidListEvent] `_ - Attributes: - gtid_length: Number of GTIDs - gtid_list: list of 'MariadbGtidObejct' - - 'MariadbGtidObejct' Attributes: - domain_id: Replication Domain ID - server_id: Server_ID - gtid_seq_no: GTID sequence - gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no' + :ivar gtid_length: int - Number of GTIDs + :ivar gtid_list: List - list of 'MariadbGtidObejct' + + :ivar domain_id: int - Replication Domain ID (MariadbGtidObject) + :ivar server_id: int -Server_ID (MariadbGtidObject) + :ivar gtid_seq_no: int - GTID sequence (MariadbGtidObject) + :ivar gtid: str - 'domain_id'+ 'server_id' + 'gtid_seq_no' (MariadbGtidObject) """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -207,14 +203,14 @@ class MariadbGtidObejct(BinLogEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.domain_id = self.packet.read_uint32() - self.server_id = self.packet.read_uint32() - self.gtid_seq_no = self.packet.read_uint64() - self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + self.domain_id: int = self.packet.read_uint32() + self.server_id: int = self.packet.read_uint32() + self.gtid_seq_no: int = self.packet.read_uint64() + self.gtid: str = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) - self.gtid_length = self.packet.read_uint32() - self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] + self.gtid_length: int = self.packet.read_uint32() + self.gtid_list: List[MariadbGtidObejct] = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] class RotateEvent(BinLogEvent): From bcc65c55374f2f18a67b92b392d85cef1bffe870 Mon Sep 17 00:00:00 2001 From: ebang Date: Wed, 30 Aug 2023 18:39:40 +0900 Subject: [PATCH 21/23] modify BinLogEvent in event.py --- pymysqlreplication/event.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index bf05998d..d7a2a34f 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -35,20 +35,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, self._processed: bool = True self.complete: bool = True - self.packet = from_packet - self.table_map = table_map - self.event_type = self.packet.event_type - self.timestamp = self.packet.timestamp - self.event_size = event_size - self._ctl_connection = ctl_connection - self.mysql_version = mysql_version - self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable - self._ignore_decode_errors = ignore_decode_errors - # The event have been fully processed, if processed is false - # the event will be skipped - self._processed = True - self.complete = True - def _read_table_id(self): # Table ID is 6 byte # pad little-endian number From 07b561de9017b48810942ad9cfa6b4159f2c995f Mon Sep 17 00:00:00 2001 From: jaehyeonpy Date: Thu, 31 Aug 2023 21:52:43 +0900 Subject: [PATCH 22/23] add type hints to the rest of the events, improve docstring readability in some events --- pymysqlreplication/event.py | 455 +++++++++++++++++++++++------------- 1 file changed, 289 insertions(+), 166 deletions(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index d7a2a34f..3038069b 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -5,7 +5,7 @@ import datetime import pymysql from pymysql.protocol import MysqlPacket -from typing import Dict, List, Tuple, Type +from typing import Dict, List, NoReturn, Tuple, Type, Union from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch from pymysqlreplication.table import Table @@ -200,21 +200,30 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) class RotateEvent(BinLogEvent): - """Change MySQL bin log file - Represents information for the slave to know the name of the binary log it is going to receive. + """ + The event changes MySQL bin log file. + It represents information for the slave to know the name of the binary log it is going to receive. For more information: `[see details] `_. + In detail, the class creates the following python objects in the constructor: + :ivar position: int - Position inside next binlog :ivar next_binlog: str - Name of next binlog file """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.position = struct.unpack(' None: + """Dumps the python objects for the event.""" print("=== %s ===" % (self.__class__.__name__)) print("Position: %d" % self.position) print("Next binlog file: %s" % self.next_binlog) @@ -222,33 +231,42 @@ def dump(self): class XAPrepareEvent(BinLogEvent): - """An XA prepare event is generated for a XA prepared transaction. + """ + An XA prepare event is generated for a XA prepared transaction. Like Xid_event, it contains XID of the **prepared** transaction. For more information: `[see details] `_. - - :ivar one_phase: current XA transaction commit method - :ivar xid_format_id: a number that identifies the format used by the gtrid and bqual values - :ivar xid: serialized XID representation of XA transaction (xid_gtrid + xid_bqual) + + In detail, the class creates the following python objects in the constructor: + + :ivar one_phase: str - current XA transaction commit method + :ivar xid_format_id: int - a number that identifies the format used by the gtrid and bqual values + :ivar xid: str - serialized XID representation of XA transaction (xid_gtrid + xid_bqual) """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # one_phase is True: XA COMMIT ... ONE PHASE # one_phase is False: XA PREPARE - self.one_phase = (self.packet.read(1) != b'\x00') - self.xid_format_id = struct.unpack(' str: + """Gets the xid python objects.""" return self.xid_gtrid.decode() + self.xid_bqual.decode() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" print("One phase: %s" % self.one_phase) print("XID formatID: %d" % self.xid_format_id) print("XID: %s" % self.xid) @@ -256,53 +274,68 @@ def _dump(self): class FormatDescriptionEvent(BinLogEvent): """ - Represents a Format Description Event in the MySQL binary log. + The event represents a Format Description Event in the MySQL binary log. This event is written at the start of a binary log file for binlog version 4. It provides the necessary information to decode subsequent events in the file. - :ivar binlog_version: int - Version of the binary log format. - :ivar mysql_version_str: str - Server's MySQL version in string format. + In detail, the class creates the following python objects in the constructor: + + :ivar binlog_version: str - Version of the binary log format + :ivar mysql_version_str: str - Server's MySQL version in string format """ - - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.binlog_version = struct.unpack(' None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.binlog_version: str = struct.unpack(' None: + """Dumps the python objects for the event.""" print("Binlog version: %s" % self.binlog_version) print("MySQL version: %s" % self.mysql_version_str) class StopEvent(BinLogEvent): + """Does nothing.""" pass class XidEvent(BinLogEvent): - """A COMMIT event - Generated when COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine occurs. + """ + A COMMIT event is generated when COMMIT of a transaction that modifies + one or more tables of an XA-capable storage engine occurs. - For more information : `[see details] `_. + For more information : `[see details] `_. - :ivar xid: uint - Transaction ID for 2 Phase Commit. + In detail, the class creates the following python objects in the constructor: + + :ivar xid: int - Transaction ID for 2 Phase Commit """ + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.xid: int = struct.unpack(' None: + """Dumps the python objects for the event.""" super()._dump() print("Transaction ID: %d" % (self.xid)) class HeartbeatLogEvent(BinLogEvent): - """A Heartbeat event + """ + A Heartbeat event Heartbeats are sent by the master. Master sends heartbeats when there are no unsent events in the binary log file after certain period of time. The interval is defined by MASTER_HEARTBEAT_PERIOD connection setting. @@ -320,18 +353,23 @@ class HeartbeatLogEvent(BinLogEvent): *this is also the case with GTID replication*. To mitigate such behavior, you are expected to keep the binlog small (see max_binlog_size, defaults to 1G). - In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). + In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). - :ivar ident: Name of the current binlog - """ + In detail, the class creates the following python objects in the constructor: - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, - table_map, ctl_connection, - **kwargs) - self.ident = self.packet.read(event_size).decode() + :ivar ident: str - Name of the current binlog + """ + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.ident: str = self.packet.read(event_size).decode() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("Current binlog: %s" % (self.ident)) @@ -341,25 +379,31 @@ class QueryEvent(BinLogEvent): QueryEvent is generated for each query that modified database. If row-based replication is used, DML will not be logged as RowsEvent instead. + In detail, the class creates the following python objects in the constructor: + :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server - :ivar execution_time: int - The time from when the query started to when it was logged in the binlog, in seconds. - :ivar schema_length: int - The length of the name of the currently selected database. + :ivar execution_time: int - The time from when the query started to when it was logged in the binlog, in seconds + :ivar schema_length: int - The length of the name of the currently selected database :ivar error_code: int - Error code generated by the master :ivar status_vars_length: int - The length of the status variable - :ivar schema: str - The name of the currently selected database. - :ivar query: str - The query executed. + :ivar schema: str - The name of the currently selected database + :ivar query: str - The query executed """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Post-header - self.slave_proxy_id = self.packet.read_uint32() - self.execution_time = self.packet.read_uint32() - self.schema_length = struct.unpack("!B", self.packet.read(1))[0] - self.error_code = self.packet.read_uint16() - self.status_vars_length = self.packet.read_uint16() + self.slave_proxy_id: int = self.packet.read_uint32() + self.execution_time: int = self.packet.read_uint32() + self.schema_length: int = struct.unpack("!B", self.packet.read(1))[0] + self.error_code: int = self.packet.read_uint16() + self.status_vars_length: int = self.packet.read_uint16() # Payload status_vars_end_pos = self.packet.read_bytes + self.status_vars_length @@ -369,64 +413,96 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) # read VALUE for status variable self._read_status_vars_value_for_key(status_vars_key) - self.schema = self.packet.read(self.schema_length) + self.schema: str = self.packet.read(self.schema_length) self.packet.advance(1) - self.query = self.packet.read(event_size - 13 - self.status_vars_length - - self.schema_length - 1).decode("utf-8") + self.query: str = self.packet.read(event_size - 13 - self.status_vars_length + - self.schema_length - 1).decode("utf-8") #string[EOF] query - def _dump(self): + def _dump(self) -> None: + """Dump the python objects for the event.""" super()._dump() print("Schema: %s" % (self.schema)) print("Execution time: %d" % (self.execution_time)) print("Query: %s" % (self.query)) - def _read_status_vars_value_for_key(self, key): - """parse status variable VALUE for given KEY + def _read_status_vars_value_for_key(self, key: int) -> Union[None, NoReturn]: + """ + CAUTION: this function returns nothing(not None) or raises StatusVariableMismatch! + + It parses status variable VALUE for given KEY. A status variable in query events is a sequence of status KEY-VALUE pairs. Parsing logic from mysql-server source code edited by dongwook-chan https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336 - :ivar key: key for status variable + See details about status variable at: + https://mariadb.com/kb/en/query_event/ + https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Query__event.html + + In detail, the function creates python objects depending on the given key: + + :ivar flags2: int + :ivar sql_mode: int + :ivar auto_increment_increment: int + :ivar auto_increment_offset: int + :ivar character_set_client: int + :ivar collation_connection: int + :ivar collation_server: int + :ivar time_zone: str + :ivar catalog_nz_code: str + :ivar lc_time_names_number: int + :ivar charset_database_number: int + :ivar table_map_for_update: int + :ivar user: str + :ivar host: str + :ivar mts_accessed_db_names: [str, ...] + :ivar microseconds: int + :ivar explicit_defaults_ts: bool + :ivar ddl_xid: int + :ivar default_collation_for_utf8mb4_number: int + :ivar sql_require_primary_key: int + :ivar default_table_encryption: int + :ivar hrnow: int + :ivar xid: int """ if key == Q_FLAGS2_CODE: # 0x00 - self.flags2 = self.packet.read_uint32() + self.flags2: int = self.packet.read_uint32() elif key == Q_SQL_MODE_CODE: # 0x01 - self.sql_mode = self.packet.read_uint64() + self.sql_mode: int = self.packet.read_uint64() elif key == Q_CATALOG_CODE: # 0x02 for MySQL 5.0.x pass elif key == Q_AUTO_INCREMENT: # 0x03 - self.auto_increment_increment = self.packet.read_uint16() - self.auto_increment_offset = self.packet.read_uint16() + self.auto_increment_increment: int = self.packet.read_uint16() + self.auto_increment_offset: int = self.packet.read_uint16() elif key == Q_CHARSET_CODE: # 0x04 - self.character_set_client = self.packet.read_uint16() - self.collation_connection = self.packet.read_uint16() - self.collation_server = self.packet.read_uint16() + self.character_set_client: int = self.packet.read_uint16() + self.collation_connection: int = self.packet.read_uint16() + self.collation_server: int = self.packet.read_uint16() elif key == Q_TIME_ZONE_CODE: # 0x05 time_zone_len = self.packet.read_uint8() if time_zone_len: - self.time_zone = self.packet.read(time_zone_len) + self.time_zone: str = self.packet.read(time_zone_len) elif key == Q_CATALOG_NZ_CODE: # 0x06 catalog_len = self.packet.read_uint8() if catalog_len: - self.catalog_nz_code = self.packet.read(catalog_len) + self.catalog_nz_code: str = self.packet.read(catalog_len) elif key == Q_LC_TIME_NAMES_CODE: # 0x07 - self.lc_time_names_number = self.packet.read_uint16() + self.lc_time_names_number: int = self.packet.read_uint16() elif key == Q_CHARSET_DATABASE_CODE: # 0x08 - self.charset_database_number = self.packet.read_uint16() + self.charset_database_number: int = self.packet.read_uint16() elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09 - self.table_map_for_update = self.packet.read_uint64() + self.table_map_for_update: int = self.packet.read_uint64() elif key == Q_MASTER_DATA_WRITTEN_CODE: # 0x0A pass elif key == Q_INVOKER: # 0x0B user_len = self.packet.read_uint8() if user_len: - self.user = self.packet.read(user_len) + self.user: str = self.packet.read(user_len) host_len = self.packet.read_uint8() if host_len: - self.host = self.packet.read(host_len) + self.host: str = self.packet.read(host_len) elif key == Q_UPDATED_DB_NAMES: # 0x0C mts_accessed_dbs = self.packet.read_uint8() """ @@ -444,47 +520,55 @@ def _read_status_vars_value_for_key(self, key): for i in range(mts_accessed_dbs): db = self.packet.read_string() dbs.append(db) - self.mts_accessed_db_names = dbs + self.mts_accessed_db_names: [str, ...] = dbs elif key == Q_MICROSECONDS: # 0x0D - self.microseconds = self.packet.read_uint24() + self.microseconds: int = self.packet.read_uint24() elif key == Q_COMMIT_TS: # 0x0E pass elif key == Q_COMMIT_TS2: # 0x0F pass elif key == Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:# 0x10 - self.explicit_defaults_ts = self.packet.read_uint8() + self.explicit_defaults_ts: bool = self.packet.read_uint8() elif key == Q_DDL_LOGGED_WITH_XID: # 0x11 - self.ddl_xid = self.packet.read_uint64() + self.ddl_xid: int = self.packet.read_uint64() elif key == Q_DEFAULT_COLLATION_FOR_UTF8MB4: # 0x12 - self.default_collation_for_utf8mb4_number = self.packet.read_uint16() + self.default_collation_for_utf8mb4_number: int = self.packet.read_uint16() elif key == Q_SQL_REQUIRE_PRIMARY_KEY: # 0x13 - self.sql_require_primary_key = self.packet.read_uint8() + self.sql_require_primary_key: int = self.packet.read_uint8() elif key == Q_DEFAULT_TABLE_ENCRYPTION: # 0x14 - self.default_table_encryption = self.packet.read_uint8() + self.default_table_encryption: int = self.packet.read_uint8() elif key == Q_HRNOW: - self.hrnow = self.packet.read_uint24() + self.hrnow: int = self.packet.read_uint24() elif key == Q_XID: - self.xid = self.packet.read_uint64() + self.xid: int = self.packet.read_uint64() else: raise StatusVariableMismatch + class BeginLoadQueryEvent(BinLogEvent): """ This event is written into the binary log file for LOAD DATA INFILE events if the server variable binlog_mode was set to "STATEMENT". - :ivar file_id: the id of the file - :ivar block-data: data block about "LOAD DATA INFILE" + In detail, the class creates the following python objects in the constructor: + + :ivar file_id: int - the id of the file + :ivar block-data: str - data block about "LOAD DATA INFILE" """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.file_id = self.packet.read_uint32() - self.block_data = self.packet.read(event_size - 4) + self.file_id: int = self.packet.read_uint32() + self.block_data: str = self.packet.read(event_size - 4) - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("File id: %d" % (self.file_id)) print("Block data: %s" % (self.block_data)) @@ -497,9 +581,11 @@ class ExecuteLoadQueryEvent(BinLogEvent): Since QueryEvent cannot explain this special action, ExecuteLoadQueryEvent is needed. So it is similar to a QUERY_EVENT except that it has extra static fields. + In detail, the class creates the following python objects in the constructor: + :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server :ivar execution_time: int - The number of seconds that the statement took to execute - :ivar schema_length: int - The length of the default database's name when the statement was executed. + :ivar schema_length: int - The length of the default database's name when the statement was executed :ivar error_code: int - The error code resulting from execution of the statement on the master :ivar status_vars_length: int - The length of the status variable block :ivar file_id: int - The id of the loaded file @@ -507,24 +593,29 @@ class ExecuteLoadQueryEvent(BinLogEvent): :ivar end_pos: int - Offset from the start of the statement to the end of the filename :ivar dup_handling_flags: int - How LOAD DATA INFILE handles duplicated data (0x0: error, 0x1: ignore, 0x2: replace) """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Post-header - self.slave_proxy_id = self.packet.read_uint32() - self.execution_time = self.packet.read_uint32() - self.schema_length = self.packet.read_uint8() - self.error_code = self.packet.read_uint16() - self.status_vars_length = self.packet.read_uint16() + self.slave_proxy_id: int = self.packet.read_uint32() + self.execution_time: int = self.packet.read_uint32() + self.schema_length: int = self.packet.read_uint8() + self.error_code: int = self.packet.read_uint16() + self.status_vars_length: int = self.packet.read_uint16() # Payload - self.file_id = self.packet.read_uint32() - self.start_pos = self.packet.read_uint32() - self.end_pos = self.packet.read_uint32() - self.dup_handling_flags = self.packet.read_uint8() + self.file_id: int = self.packet.read_uint32() + self.start_pos: int = self.packet.read_uint32() + self.end_pos: int = self.packet.read_uint32() + self.dup_handling_flags: int = self.packet.read_uint8() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super(ExecuteLoadQueryEvent, self)._dump() print("Slave proxy id: %d" % (self.slave_proxy_id)) print("Execution time: %d" % (self.execution_time)) @@ -539,22 +630,29 @@ def _dump(self): class IntvarEvent(BinLogEvent): """ - Stores the value of auto-increment variables. - This event will be created just before a QueryEvent. - - :ivar type: int - 1 byte identifying the type of variable stored. + The event stores the value of auto-increment variables. + It will be created just before a QueryEvent. + + In detail, the class creates the following python objects in the constructor: + + :ivar type: int - 1 byte identifying the type of variable stored. \ Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2). :ivar value: int - The value of the variable """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.type = self.packet.read_uint8() - self.value = self.packet.read_uint32() + self.type: int = self.packet.read_uint8() + self.value: int = self.packet.read_uint32() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("type: %d" % (self.type)) print("Value: %d" % (self.value)) @@ -563,33 +661,40 @@ def _dump(self): class RandEvent(BinLogEvent): """ RandEvent is generated every time a statement uses the RAND() function. - Indicates the seed values to use for generating a random number with RAND() in the next statement. + It indicates the seed values to use for generating a random number with RAND() in the next statement. RandEvent only works in statement-based logging (need to set binlog_format as 'STATEMENT') and only works when the seed number is not specified. + + In detail, the class creates the following python objects in the constructor: :ivar seed1: int - value for the first seed - :ivar seed2: int - value for the second seed + :ivar seed2: int - value for the second seed """ - - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str): + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + # Payload - self._seed1 = self.packet.read_uint64() - self._seed2 = self.packet.read_uint64() + self._seed1: int = self.packet.read_uint64() + self._seed2: int = self.packet.read_uint64() @property - def seed1(self): - """Get the first seed value""" + def seed1(self) -> int: + """Gets the first seed value.""" return self._seed1 @property - def seed2(self): - """Get the second seed value""" + def seed2(self) -> int: + """Gets the second seed value.""" return self._seed2 - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("seed1: %d" % (self.seed1)) print("seed2: %d" % (self.seed2)) @@ -603,22 +708,28 @@ class MariadbStartEncryptionEvent(BinLogEvent): additional configuration steps are required in MariaDB. (Link: https://mariadb.com/kb/en/encrypting-binary-logs/) - This event is written just once, after the Format Description event + This event is written just once, after the Format Description event. - Attributes: - schema: The Encryption scheme, always set to 1 for system files. - key_version: The Encryption key version. - nonce: Nonce (12 random bytes) of current binlog file. - """ + In detail, the class creates the following python objects in the constructor: - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + :ivar schema: int - The Encryption scheme, always set to 1 for system files + :ivar key_version: int - The Encryption key version + :ivar nonce: bytes - Nonce (12 random bytes) of current binlog file + """ + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.schema = self.packet.read_uint8() - self.key_version = self.packet.read_uint32() - self.nonce = self.packet.read(12) + self.schema: int = self.packet.read_uint8() + self.key_version: int = self.packet.read_uint32() + self.nonce: bytes = self.packet.read(12) - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" print("Schema: %d" % self.schema) print("Key version: %d" % self.key_version) print(f"Nonce: {self.nonce}") @@ -626,20 +737,28 @@ def _dump(self): class RowsQueryLogEvent(BinLogEvent): """ - Record original query for the row events in Row-Based Replication + This class records original query for the row events in Row-Based Replication. - More details are available in the MySQL Knowledge Base: - https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html + In detail, it creates the following python objects in the constructor: - :ivar query_length: uint - Length of the SQL statement + :ivar query_length: int - Length of the SQL statement :ivar query: str - The executed SQL statement + + More details are available in the MySQL Knowledge Base: + https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.query_length = self.packet.read_uint8() - self.query = self.packet.read(self.query_length).decode('utf-8') - def dump(self): + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.query_length: int = self.packet.read_uint8() + self.query: str = self.packet.read(self.query_length).decode('utf-8') + + def dump(self) -> None: + """Dumps the python objects for the event.""" print("=== %s ===" % (self.__class__.__name__)) print("Query length: %d" % self.query_length) print("Query: %s" % self.query) @@ -647,11 +766,15 @@ def dump(self): class NotImplementedEvent(BinLogEvent): """ - Used as a temporary class for events that have not yet been implemented. + This class is used as a temporary class for events that have not yet been implemented. - The event referencing this class skips parsing. + The event class referencing this class skips parsing. """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__( - from_packet, event_size, table_map, ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.packet.advance(event_size) From 6a4ee795a63f2e7b588daabf984e5e4d8d6124d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=80=EC=A0=95=20=EB=B0=A9?= <108053947+ebang091@users.noreply.github.com> Date: Fri, 1 Sep 2023 17:38:36 +0900 Subject: [PATCH 23/23] Update event.py --- pymysqlreplication/event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 3038069b..5a0e2df1 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -21,7 +21,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, freeze_schema=False, fail_on_table_metadata_unavailable=False, ignore_decode_errors=False): - self.packet: Type[MysqlPacket] = from_packet + self.packet: MysqlPacket = from_packet self.table_map: Dict[int, Table] = table_map self.event_type: int = self.packet.event_type self.timestamp: int = self.packet.timestamp