Skip to content

Commit

Permalink
ARROW-597: [Python] Add read_pandas convenience to stream and file re…
Browse files Browse the repository at this point in the history
…ader classes. Add some data type docstrings

Author: Wes McKinney <[email protected]>

Closes apache#855 from wesm/ARROW-597 and squashes the following commits:

1c9c3e2 [Wes McKinney] Add read_pandas convenience to stream and file reader classes. Add a bunch of missing API docstrings
  • Loading branch information
wesm committed Jul 17, 2017
1 parent e370174 commit 5fbfd8e
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 64 deletions.
7 changes: 0 additions & 7 deletions python/doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Scalar Value Types
:toctree: generated/

NA
NAType
Scalar
ArrayValue
BooleanValue
Expand Down Expand Up @@ -210,12 +209,6 @@ Type Classes
:toctree: generated/

DataType
DecimalType
DictionaryType
FixedSizeBinaryType
Time32Type
Time64Type
TimestampType
Field
Schema

Expand Down
12 changes: 12 additions & 0 deletions python/doc/source/ipc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,15 @@ batches in the file, and can read any at random:
reader.num_record_batches
b = reader.get_batch(3)
b.equals(batch)
Reading from Stream and File Format for pandas
----------------------------------------------

The stream and file reader classes have a special ``read_pandas`` method to
simplify reading multiple record batches and converting them to a single
DataFrame output:

.. ipython:: python
df = pa.open_file(buf).read_pandas()
df
2 changes: 2 additions & 0 deletions python/doc/source/pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
.. specific language governing permissions and limitations
.. under the License.
.. _pandas:

Using PyArrow with pandas
=========================

Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
float16, float32, float64,
binary, string, decimal,
list_, struct, dictionary, field,
DataType, NAType,
Field,
Schema,
schema,
Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# under the License.

cdef class Message:
"""
Container for an Arrow IPC message with metadata and optional body
"""
cdef:
unique_ptr[CMessage] message

Expand Down Expand Up @@ -100,6 +103,10 @@ body length: {2}""".format(self.type, metadata_len, body_len)


cdef class MessageReader:
"""
Interface for reading Message objects from some source (like an
InputStream)
"""
cdef:
unique_ptr[CMessageReader] reader

Expand Down
23 changes: 21 additions & 2 deletions python/pyarrow/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@
import pyarrow.lib as lib


class RecordBatchStreamReader(lib._RecordBatchReader):
class _ReadPandasOption(object):

def read_pandas(self, **options):
"""
Read contents of stream and convert to pandas.DataFrame using
Table.to_pandas
Parameters
----------
**options : arguments to forward to Table.to_pandas
Returns
-------
df : pandas.DataFrame
"""
table = self.read_all()
return table.to_pandas(**options)


class RecordBatchStreamReader(lib._RecordBatchReader, _ReadPandasOption):
"""
Reader for the Arrow streaming binary format
Expand Down Expand Up @@ -54,7 +73,7 @@ def __init__(self, sink, schema):
self._open(sink, schema)


class RecordBatchFileReader(lib._RecordBatchFileReader):
class RecordBatchFileReader(lib._RecordBatchFileReader, _ReadPandasOption):
"""
Class for reading Arrow record batch data from the Arrow binary file format
Expand Down
4 changes: 3 additions & 1 deletion python/pyarrow/scalar.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ NA = None


cdef class NAType(Scalar):

"""
Null (NA) value singleton
"""
def __cinit__(self):
global NA
if NA is not None:
Expand Down
35 changes: 27 additions & 8 deletions python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def write_batches(self):
batches.append(batch)

writer.close()
return batches
return frames, batches


class TestFile(MessagingTest, unittest.TestCase):
Expand All @@ -78,7 +78,7 @@ def test_empty_file(self):
pa.open_file(buf)

def test_simple_roundtrip(self):
batches = self.write_batches()
_, batches = self.write_batches()
file_contents = pa.BufferReader(self._get_source())

reader = pa.open_file(file_contents)
Expand All @@ -92,7 +92,7 @@ def test_simple_roundtrip(self):
assert reader.schema.equals(batches[0].schema)

def test_read_all(self):
batches = self.write_batches()
_, batches = self.write_batches()
file_contents = pa.BufferReader(self._get_source())

reader = pa.open_file(file_contents)
Expand All @@ -101,6 +101,16 @@ def test_read_all(self):
expected = pa.Table.from_batches(batches)
assert result.equals(expected)

def test_read_pandas(self):
frames, _ = self.write_batches()

file_contents = pa.BufferReader(self._get_source())
reader = pa.open_file(file_contents)
result = reader.read_pandas()

expected = pd.concat(frames)
assert_frame_equal(result, expected)


class TestStream(MessagingTest, unittest.TestCase):

Expand All @@ -113,7 +123,7 @@ def test_empty_stream(self):
pa.open_stream(buf)

def test_simple_roundtrip(self):
batches = self.write_batches()
_, batches = self.write_batches()
file_contents = pa.BufferReader(self._get_source())
reader = pa.open_stream(file_contents)

Expand All @@ -130,7 +140,7 @@ def test_simple_roundtrip(self):
reader.get_next_batch()

def test_read_all(self):
batches = self.write_batches()
_, batches = self.write_batches()
file_contents = pa.BufferReader(self._get_source())
reader = pa.open_stream(file_contents)

Expand All @@ -142,7 +152,7 @@ def test_read_all(self):
class TestMessageReader(MessagingTest, unittest.TestCase):

def _get_example_messages(self):
batches = self.write_batches()
_, batches = self.write_batches()
file_contents = self._get_source()
buf_reader = pa.BufferReader(file_contents)
reader = pa.MessageReader.open_stream(buf_reader)
Expand Down Expand Up @@ -187,6 +197,15 @@ def test_read_record_batch(self):
read_batch = pa.read_record_batch(message, batch.schema)
assert read_batch.equals(batch)

def test_read_pandas(self):
frames, _ = self.write_batches()
file_contents = pa.BufferReader(self._get_source())
reader = pa.open_stream(file_contents)
result = reader.read_pandas()

expected = pd.concat(frames)
assert_frame_equal(result, expected)


class TestSocket(MessagingTest, unittest.TestCase):

Expand Down Expand Up @@ -249,7 +268,7 @@ def _get_writer(self, sink, schema):

def test_simple_roundtrip(self):
self.start_server(do_read_all=False)
writer_batches = self.write_batches()
_, writer_batches = self.write_batches()
reader_schema, reader_batches = self.stop_and_get_result()

assert reader_schema.equals(writer_batches[0].schema)
Expand All @@ -259,7 +278,7 @@ def test_simple_roundtrip(self):

def test_read_all(self):
self.start_server(do_read_all=True)
writer_batches = self.write_batches()
_, writer_batches = self.write_batches()
_, result = self.stop_and_get_result()

expected = pa.Table.from_batches(writer_batches)
Expand Down
Loading

0 comments on commit 5fbfd8e

Please sign in to comment.