Skip to content

Commit

Permalink
ARROW-6316: [Go] implement new ARROW format with 32b-aligned buffers
Browse files Browse the repository at this point in the history
Closes apache#5251 from sbinet/issue-6316 and squashes the following commits:

7427340 <Sebastien Binet> ARROW-6316:  implement new ARROW format with 32b-aligned buffers

Authored-by: Sebastien Binet <[email protected]>
Signed-off-by: Wes McKinney <[email protected]>
  • Loading branch information
sbinet authored and wesm committed Sep 13, 2019
1 parent f0d7760 commit c64033d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 12 deletions.
5 changes: 3 additions & 2 deletions go/arrow/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ const (
)

var (
paddingBytes [kArrowAlignment]byte
kEOS = [4]byte{0, 0, 0, 0} // end of stream message
paddingBytes [kArrowAlignment]byte
kEOS = [8]byte{0, 0, 0, 0, 0, 0, 0, 0} // end of stream message
kIPCContToken uint32 = 0xFFFFFFFF // 32b continuation indicator for FlatBuffers 8b alignment
)

func paddedLength(nbytes int64, alignment int32) int64 {
Expand Down
27 changes: 23 additions & 4 deletions go/arrow/ipc/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,31 @@ func (r *MessageReader) Message() (*Message, error) {
var buf = make([]byte, 4)
_, err := io.ReadFull(r.r, buf)
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
return nil, errors.Wrap(err, "arrow/ipc: could not read continuation indicator")
}
msgLen := int32(binary.LittleEndian.Uint32(buf))
if msgLen == 0 {
// optional 0 EOS control message
var (
cid = binary.LittleEndian.Uint32(buf)
msgLen int32
)
switch cid {
case 0:
// EOS message.
return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
case kIPCContToken:
_, err = io.ReadFull(r.r, buf)
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
}
msgLen = int32(binary.LittleEndian.Uint32(buf))
if msgLen == 0 {
// optional 0 EOS control message
return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
}

default:
// ARROW-6314: backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
msgLen = int32(cid)
}

buf = make([]byte, msgLen)
Expand Down
31 changes: 25 additions & 6 deletions go/arrow/ipc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,19 @@ func (blk fileBlock) NewMessage() (*Message, error) {
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message metadata")
}
meta := memory.NewBufferBytes(buf[4:]) // drop buf-size already known from blk.Meta

prefix := 0
switch binary.LittleEndian.Uint32(buf) {
case 0:
case kIPCContToken:
prefix = 8
default:
// ARROW-6314: backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
prefix = 4
}

meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta

buf = make([]byte, blk.Body)
_, err = io.ReadFull(r, buf)
Expand Down Expand Up @@ -1002,19 +1014,26 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
)

// ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
paddedMsgLen := int32(msg.Len()) + 4
paddedMsgLen := int32(msg.Len()) + 8
remainder := paddedMsgLen % alignment
if remainder != 0 {
paddedMsgLen += alignment - remainder
}

tmp := make([]byte, 4)

// write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
binary.LittleEndian.PutUint32(tmp, kIPCContToken)
_, err = w.Write(tmp)
if err != nil {
return 0, errors.Wrap(err, "arrow/ipc: could not write continuation bit indicator")
}

// the returned message size includes the length prefix, the flatbuffer, + padding
n = int(paddedMsgLen)

tmp := make([]byte, 4)

// write the flatbuffer size prefix, including padding
sizeFB := paddedMsgLen - 4
sizeFB := paddedMsgLen - 8
binary.LittleEndian.PutUint32(tmp, uint32(sizeFB))
_, err = w.Write(tmp)
if err != nil {
Expand All @@ -1028,7 +1047,7 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
}

// write any padding
padding := paddedMsgLen - int32(msg.Len()) - 4
padding := paddedMsgLen - int32(msg.Len()) - 8
if padding > 0 {
_, err = w.Write(paddingBytes[:padding])
if err != nil {
Expand Down

0 comments on commit c64033d

Please sign in to comment.