-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheven_handler_row_only.go
93 lines (71 loc) · 2.42 KB
/
even_handler_row_only.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"context"
"log/slog"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)
var _ canal.EventHandler = (*RowOnlyEventHandler)(nil)
type RowOnlyEventHandler struct {
logger *slog.Logger
}
func (e *RowOnlyEventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
return nil
}
func (e *RowOnlyEventHandler) OnGTID(header *replication.EventHeader, gtid mysql.BinlogGTIDEvent) error {
return nil
}
func (e *RowOnlyEventHandler) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error {
return nil
}
func (e *RowOnlyEventHandler) OnRawEvent(event *replication.BinlogEvent) error {
return nil
}
func (e *RowOnlyEventHandler) OnRotate(header *replication.EventHeader, r *replication.RotateEvent) error {
return nil
}
func (e *RowOnlyEventHandler) OnRow(evt *canal.RowsEvent) error {
ctx := context.Background()
attr := make([]slog.Attr, 0)
attr = append(attr,
slog.String("event_type", evt.Header.EventType.String()),
slog.Any("rows", evt.Rows),
slog.String("action", evt.Action))
if evt.Table == nil {
e.logger.LogAttrs(ctx, slog.LevelInfo, "OnRow", attr...)
return nil
}
columns := make([]slog.Attr, 0, len(evt.Table.Columns)*2)
for _, v := range evt.Table.Columns {
columns = append(columns,
slog.String("name", v.Name),
slog.String("raw_type", v.RawType))
}
attr = append(attr,
slog.String("table", evt.Table.String()),
slog.Group("columns", slog.Attr{Value: slog.GroupValue(columns...)}))
e.logger.LogAttrs(ctx, slog.LevelInfo, "OnRow", attr...)
return nil
}
func (e *RowOnlyEventHandler) OnTableChanged(header *replication.EventHeader, schema string, table string) error {
e.logger.Info("OnTableChanged",
slog.String("event_type", header.EventType.String()),
slog.String("table", table),
slog.String("schema", schema))
return nil
}
func (e *RowOnlyEventHandler) OnUnmarshal(data []byte) (interface{}, error) {
e.logger.Info("OnUnmarshal",
slog.String("data", string(data)))
return nil, nil
}
func (e *RowOnlyEventHandler) OnXID(header *replication.EventHeader, pos mysql.Position) error {
return nil
}
func (e *RowOnlyEventHandler) OnRowsQueryEvent(evt *replication.RowsQueryEvent) error {
return nil
}
func (e *RowOnlyEventHandler) String() string {
return "binL.RowOnlyEventHandler"
}