Skip to content

Commit

Permalink
Merge pull request matteobertozzi#2 from bbloniarz-trulia/master
Browse files Browse the repository at this point in the history
Some small additions to hadoop-python
  • Loading branch information
matteobertozzi committed Sep 4, 2011
2 parents 2507dac + e895ede commit f62b9bb
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 35 deletions.
6 changes: 5 additions & 1 deletion python-hadoop/hadoop/io/InputStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def size(self):
return self._count - self._offset

def toByteArray(self):
return self._buffer[self._offset, self._offset+self._count]
return self._buffer[self._offset:self._offset+self._count]

def reset(self, data, offset=0, length=0):
if data and not length:
Expand Down Expand Up @@ -140,6 +140,10 @@ def readByte(self):
data = self._stream.read(1)
return struct.unpack(">b", data)[0]

def readUByte(self):
data = self._stream.read(1)
return struct.unpack("B", data)[0]

def readBoolean(self):
data = self._stream.read(1)
return struct.unpack(">?", data)[0]
Expand Down
5 changes: 5 additions & 0 deletions python-hadoop/hadoop/io/OutputStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def writeByte(self, value):
assert len(data) == 1
return self._stream.write(data)

def writeUByte(self, value):
data = struct.pack("B", value)
assert len(data) == 1
return self._stream.write(data)

def writeBoolean(self, value):
data = struct.pack(">?", value)
assert len(data) == 1
Expand Down
65 changes: 38 additions & 27 deletions python-hadoop/hadoop/io/SequenceFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,20 @@ def getCompressionCodec(self):
return self._codec

def getKeyClass(self):
if not self._key_class:
self._key_class = hadoopClassFromName(self._key_class_name)
return self._key_class

def getKeyClassName(self):
return hadoopClassName(self._key_class)
return hadoopClassName(self.getKeyClass())

def getValueClass(self):
if not self._value_class:
self._value_class = hadoopClassFromName(self._value_class_name)
return self._value_class

def getValueClassName(self):
return hadoopClassName(self._value_class)
return hadoopClassName(self.getValueClass())

def getPosition(self):
return self._stream.getPos()
Expand All @@ -316,29 +320,27 @@ def isBlockCompressed(self):
def isCompressed(self):
return self._decompress

def nextKey(self, key):
def nextRawKey(self):
if not self._block_compressed:
record_length = self._readRecordLength()
if record_length < 0:
return False
return None

record_data = self._stream.read(record_length + 4)
self._record.reset(record_data)

self._record.readInt() # read key_length
key.readFields(self._record)
key_length = self._stream.readInt()
key = DataInputBuffer(self._stream.read(key_length))
self._record.reset(self._stream.read(record_length - key_length))
return key
else:
if hasattr(self, '_block_index') and \
self._block_index < self._record[0]:
self._sync_seen = False
records, keys_len, keys, values_len, values = self._record
readVInt(keys_len)
key.readFields(keys)
key_length = readVInt(keys_len)
self._block_index += 1
return True
return DataInputBuffer(keys.read(key_length))

if self._stream.getPos() >= self._end:
return False
return None

# Read Sync
self._stream.readInt() # -1
Expand All @@ -362,11 +364,28 @@ def _readBuffer():
self._record = (records, keys_len, keys, values_len, values)
self._block_index = 1

readVInt(keys_len)
key.readFields(keys)
key_length = readVInt(keys_len)
return DataInputBuffer(keys.read(key_length))

def nextKey(self, key):
record, keyLen = self.nextRawKey()
if not record:
return False
key.readFields(record)
return True

def nextRawValue(self):
if not self._block_compressed:
if self._decompress:
compress_data = self._record.read(self._record.size())
return self._codec.decompressInputStream(compress_data)
else:
return self._record
else:
records, keys_len, keys, values_len, values = self._record
value_length = readVInt(values_len)
return DataInputBuffer(values.read(value_length))

def next(self, key, value):
more = self.nextKey(key)
if more:
Expand Down Expand Up @@ -430,10 +449,8 @@ def _initialize(self, path, start, length):
# Same as below, but with UTF8 Deprecated Class
raise NotImplementedError
else:
key_class_name = Text.readString(self._stream)
value_class_name = Text.readString(self._stream)
self._key_class = hadoopClassFromName(key_class_name)
self._value_class = hadoopClassFromName(value_class_name)
self._key_class_name = Text.readString(self._stream)
self._value_class_name = Text.readString(self._stream)

if ord(self._version) > 2:
self._decompress = self._stream.readBoolean()
Expand Down Expand Up @@ -482,13 +499,7 @@ def _readRecordLength(self):
return length

def _getCurrentValue(self, value):
stream = nextRawValue()
value.readFields(stream)
if not self._block_compressed:
if self._decompress:
compress_data = self._record.read(self._record.size())
value.readFields(self._codec.decompressInputStream(compress_data))
else:
value.readFields(self._record)
assert self._record.size() == 0
else:
records, keys_len, keys, values_len, values = self._record
value.readFields(values)
9 changes: 3 additions & 6 deletions python-hadoop/hadoop/io/WritableUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ def readVLong(data_input):

i = 0
for idx in xrange(length - 1):
b = data_input.readByte()
if b < 0:
b = -b | 0x80

b = data_input.readUByte()
i = i << 8
i = i | (b & 0xFF)
i = i | b

return (i ^ -1) if isNegativeVInt(first_byte) else i

Expand Down Expand Up @@ -61,7 +58,7 @@ def writeVLong(data_output, value):
mask = 0xFF << shiftbits

x = (value & mask) >> shiftbits
data_output.writeByte(-(x & 0x7f) if x & 0x80 else x)
data_output.writeUByte(x)

def isNegativeVInt(value):
return value < -120 or (value >= -112 and value < 0)
Expand Down
1 change: 1 addition & 0 deletions python-hadoop/hadoop/io/compress/CodecPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from BZip2Codec import *
from ZlibCodec import *
from GzipCodec import *

class CodecPool(object):
def __new__(cls, *p, **k):
Expand Down
40 changes: 40 additions & 0 deletions python-hadoop/hadoop/io/compress/GzipCodec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python
# ========================================================================
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gzip

from hadoop.io.InputStream import DataInputBuffer
import StringIO

class GzipCodec:
def compress(self, data):
ioObj = StringIO.StringIO()
f = gzip.GzipFile(fileobj = ioObj, mode='wb')
f.write(data)
f.close()
return ioObj.getValue()

def decompress(self, data):
ioObj = StringIO.StringIO(data)
f = gzip.GzipFile(fileobj = ioObj, mode='rb')
d = f.read()
f.close()
return d

def decompressInputStream(self, data):
return DataInputBuffer(self.decompress(data))
2 changes: 1 addition & 1 deletion python-hadoop/hadoop/util/ReflectionUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ def classFromName(class_path):
if not module_name:
raise ValueError('Class name must contain module part.')

module = __import__(module_name, globals(), locals(), [class_name], -1)
module = __import__(module_name, globals(), locals(), [str(class_name)], -1)
return getattr(module, class_name)

0 comments on commit f62b9bb

Please sign in to comment.