Skip to content

Commit

Permalink
[FLINK-21208][python] Make Arrow Coder serialize schema info in every…
Browse files Browse the repository at this point in the history
… batch

This closes apache#14844
  • Loading branch information
HuangXingBo committed Feb 7, 2021
1 parent 8e69b2f commit 78f4b1f
Show file tree
Hide file tree
Showing 16 changed files with 58 additions and 11 deletions.
8 changes: 4 additions & 4 deletions flink-python/pyflink/fn_execution/ResettableIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ class ResettableIO(io.RawIOBase):
def set_input_bytes(self, b):
self._input_bytes = b
self._input_offset = 0
self._size = len(b)

def readinto(self, b):
"""
Read up to len(b) bytes into the writable buffer *b* and return
the number of bytes read. If no bytes are available, None is returned.
"""
input_len = len(self._input_bytes)
output_buffer_len = len(b)
remaining = input_len - self._input_offset
remaining = self._size - self._input_offset

if remaining >= output_buffer_len:
b[:] = self._input_bytes[self._input_offset:self._input_offset + output_buffer_len]
self._input_offset += output_buffer_len
return output_buffer_len
elif remaining > 0:
b[:remaining] = self._input_bytes[self._input_offset:self._input_offset + remaining]
self._input_offset = input_len
self._input_offset = self._size
return remaining
else:
return None
Expand All @@ -66,7 +66,7 @@ def seekable(self):
return False

def readable(self):
return True
return self._size - self._input_offset

def writable(self):
return True
10 changes: 5 additions & 5 deletions flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,13 @@ def __init__(self, schema, row_type, timezone):
self._timezone = timezone
self._resettable_io = ResettableIO()
self._batch_reader = ArrowCoderImpl._load_from_stream(self._resettable_io)
self._batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema)
self.data_out_stream = create_OutputStream()
self._resettable_io.set_output_stream(self.data_out_stream)

def encode_to_stream(self, cols, out_stream, nested):
data_out_stream = self.data_out_stream
self._batch_writer.write_batch(
batch_writer = pa.RecordBatchStreamWriter(self._resettable_io, self._schema)
batch_writer.write_batch(
pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
out_stream.write_var_int64(data_out_stream.size())
out_stream.write(data_out_stream.get())
Expand All @@ -638,9 +638,9 @@ def decode_from_stream(self, in_stream, nested):

@staticmethod
def _load_from_stream(stream):
reader = pa.ipc.open_stream(stream)
for batch in reader:
yield batch
while stream.readable():
reader = pa.ipc.open_stream(stream)
yield reader.read_next_batch()

def _decode_one_batch_from_stream(self, in_stream: create_InputStream, size: int) -> List:
self._resettable_io.set_input_bytes(in_stream.read(size))
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/table/tests/test_udaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def test_mixed_with_built_in_functions_with_retract(self):
expected = Row('Hi,Hi,hello,hello2', 'Hi', 'hello', 4, 5, 'Hi,Hi,hello2,hello',
'Hi|Hi|hello2|hello', 10, 11.0, 2, Decimal(3.0), 24, 28.0, 6, 7.0,
3.1622777, 3.6514838, 10.0, 13.333333)
expected.set_row_kind(RowKind.INSERT)
expected.set_row_kind(RowKind.UPDATE_AFTER)
self.assertEqual(result[len(result) - 1], expected)

def test_mixed_with_built_in_functions_without_retract(self):
Expand Down Expand Up @@ -338,7 +338,7 @@ def test_mixed_with_built_in_functions_without_retract(self):
result = [i for i in result_table.execute().collect()]
expected = Row('Hi,Hi,hello,hello2', 'Hi', 'hello', 4, 5, 'Hi,Hi,hello2,hello',
'Hi|Hi|hello2|hello', 10, 11.0, 2, Decimal(3.0), 24, 28.0)
expected.set_row_kind(RowKind.INSERT)
expected.set_row_kind(RowKind.UPDATE_AFTER)
self.assertEqual(result[len(result) - 1], expected)

def test_using_decorator(self):
Expand Down
8 changes: 8 additions & 0 deletions flink-python/pyflink/testing/test_case_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def setUp(self):
self.env,
environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_old_planner().build())
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")


class PyFlinkBatchTableTestCase(PyFlinkTestCase):
Expand All @@ -169,6 +171,8 @@ def setUp(self):
self.env = ExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.t_env = BatchTableEnvironment.create(self.env, TableConfig())
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")

def collect(self, table):
j_table = table._j_table
Expand All @@ -191,6 +195,8 @@ def setUp(self):
self.t_env = StreamTableEnvironment.create(
self.env, environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")


class PyFlinkBlinkBatchTableTestCase(PyFlinkTestCase):
Expand All @@ -204,6 +210,8 @@ def setUp(self):
environment_settings=EnvironmentSettings.new_instance()
.in_batch_mode().use_blink_planner().build())
self.t_env._j_tenv.getPlanner().getExecEnv().setParallelism(2)
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")


class PythonAPICompletenessTestCase(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,20 @@ public abstract class ArrowSerializer<T> {
/** Writer which is responsible for convert the arrow format data into byte array. */
private transient ArrowStreamWriter arrowStreamWriter;

/** Reusable InputStream used to holding the execution results to be deserialized. */
private transient InputStream bais;

/** Reusable OutputStream used to holding the serialized input elements. */
private transient OutputStream baos;

public ArrowSerializer(RowType inputType, RowType outputType) {
this.inputType = inputType;
this.outputType = outputType;
}

public void open(InputStream bais, OutputStream baos) throws Exception {
this.bais = bais;
this.baos = baos;
allocator = ArrowUtils.getRootAllocator().newChildAllocator("allocator", 0, Long.MAX_VALUE);
arrowStreamReader = new ArrowStreamReader(bais, allocator);

Expand Down Expand Up @@ -126,4 +134,15 @@ public void finishCurrentBatch() throws Exception {
arrowStreamWriter.writeBatch();
arrowWriter.reset();
}

public void resetReader() throws IOException {
arrowReader = null;
arrowStreamReader.close();
arrowStreamReader = new ArrowStreamReader(bais, allocator);
}

public void resetWriter() throws IOException {
arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
arrowStreamWriter.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws IOException {
for (int i = 0; i < rowCount; i++) {
resultCollector.collect(Row.join(forwardedInputQueue.poll(), arrowSerializer.read(i)));
}
arrowSerializer.resetReader();
}

@Override
Expand Down Expand Up @@ -121,6 +122,7 @@ private void invokeCurrentBatch() throws Exception {
pythonFunctionRunner.process(baos.toByteArray());
checkInvokeFinishBundleByCount();
baos.reset();
arrowSerializer.resetWriter();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected void invokeCurrentBatch() throws Exception {
elementCount += currentBatchCount;
checkInvokeFinishBundleByCount();
currentBatchCount = 0;
arrowSerializer.resetWriter();
}
}

Expand Down Expand Up @@ -106,5 +107,6 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
RowData result = arrowSerializer.read(i);
rowDataWrapper.collect(reuseJoinedRow.replace(key, result));
}
arrowSerializer.resetReader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
windowAggResult.replace(key, arrowSerializer.read(i));
rowDataWrapper.collect(reuseJoinedRow.replace(windowAggResult, windowProperty));
}
arrowSerializer.resetReader();
}

private void triggerWindowProcess() throws Exception {
Expand All @@ -195,6 +196,7 @@ private void triggerWindowProcess() throws Exception {
checkInvokeFinishBundleByCount();
currentBatchCount = 0;
baos.reset();
arrowSerializer.resetWriter();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ && isInCurrentOverWindow(
elementCount += currentBatchCount;
checkInvokeFinishBundleByCount();
currentBatchCount = 0;
arrowSerializer.resetWriter();
}
lastKeyDataStartPos = forwardedInputQueue.size();
}
Expand All @@ -248,6 +249,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
reuseJoinedRow.setRowKind(input.getRowKind());
rowDataWrapper.collect(reuseJoinedRow.replace(input, arrowSerializer.read(i)));
}
arrowSerializer.resetReader();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
}
}
arrowSerializer.resetReader();
}

void registerCleanupTimer(long timestamp, TimeDomain domain) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
reuseJoinedRow.setRowKind(key.getRowKind());
rowDataWrapper.collect(reuseJoinedRow.replace(key, data));
}
arrowSerializer.resetReader();
}

void registerProcessingCleanupTimer(long currentTime) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ void invokeCurrentBatch() throws Exception {
checkInvokeFinishBundleByCount();
currentBatchCount = 0;
baos.reset();
arrowSerializer.resetWriter();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
windowAggResult.replace(key, arrowSerializer.read(i));
rowDataWrapper.collect(reuseJoinedRow.replace(windowAggResult, windowProperty));
}
arrowSerializer.resetReader();
}

@Override
Expand Down Expand Up @@ -313,6 +314,7 @@ private void triggerWindowProcess(W window) throws Exception {
checkInvokeFinishBundleByCount();
currentBatchCount = 0;
baos.reset();
arrowSerializer.resetWriter();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
cRowWrapper.setChange(input.change());
cRowWrapper.collect(Row.join(input.row(), arrowSerializer.read(i)));
}
arrowSerializer.resetReader();
}

@Override
Expand All @@ -125,6 +126,7 @@ private void invokeCurrentBatch() throws Exception {
pythonFunctionRunner.process(baos.toByteArray());
checkInvokeFinishBundleByCount();
baos.reset();
arrowSerializer.resetWriter();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws Exception {
reuseJoinedRow.setRowKind(input.getRowKind());
rowDataWrapper.collect(reuseJoinedRow.replace(input, arrowSerializer.read(i)));
}
arrowSerializer.resetReader();
}

@Override
Expand All @@ -126,6 +127,7 @@ private void invokeCurrentBatch() throws Exception {
pythonFunctionRunner.process(baos.toByteArray());
checkInvokeFinishBundleByCount();
baos.reset();
arrowSerializer.resetWriter();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ protected void startBundle() {
RowData firstData = arrowSerializer.read(lowerBoundary);
arrowSerializer.write(firstData);
}
arrowSerializer.resetReader();
} else {
arrowSerializer.load();
arrowSerializer.write(arrowSerializer.read(0));
arrowSerializer.resetReader();
}
arrowSerializer.finishCurrentBatch();
buffer.add(baos.toByteArray());
baos.reset();
arrowSerializer.resetWriter();
};
}

Expand Down

0 comments on commit 78f4b1f

Please sign in to comment.