File tree Expand file tree Collapse file tree 1 file changed +7
-3
lines changed Expand file tree Collapse file tree 1 file changed +7
-3
lines changed Original file line number Diff line number Diff line change @@ -166,7 +166,7 @@ def handle_binlog_stream(config):
166
166
cache .set_log_file (binlogevent .next_binlog )
167
167
cache .set_log_pos (binlogevent .position )
168
168
else :
169
- print ( binlogevent .packet .log_pos )
169
+ print binlogevent .packet .log_pos
170
170
for row in binlogevent .rows :
171
171
event = {"host" : binlogevent ._ctl_connection .host , "schema" : binlogevent .schema ,
172
172
"table" : binlogevent .table ,
@@ -191,7 +191,11 @@ def handle_binlog_stream(config):
191
191
pipeline = job ["pipeline" ]
192
192
rows = do_pipeline (pipeline , event ["values" ])
193
193
dest = job ["dest" ]
194
- to_dest (dest , rows )
194
+ if isinstance (dest , list ):
195
+ for d in dest :
196
+ to_dest (d , rows )
197
+ else :
198
+ to_dest (dest , rows )
195
199
196
200
cache .set_log_pos (binlogevent .packet .log_pos )
197
201
logging .info (json .dumps (event , cls = DateEncoder ))
@@ -208,4 +212,4 @@ def handle_binlog_stream(config):
208
212
if config_module .STREAM == "INIT" :
209
213
handle_init_stream (config_module )
210
214
elif config_module .STREAM == "BINLOG" :
211
- handle_binlog_stream (config_module )
215
+ handle_binlog_stream (config_module )
You can’t perform that action at this time.
0 commit comments