Skip to content

Commit

Permalink
fixings
Browse files Browse the repository at this point in the history
git-svn-id: http://sands.sce.ntu.edu.sg/svn/lluis/trunk@5 6fa9df1f-5871-4ed8-8e73-65a2acd6172f
  • Loading branch information
lluis committed Jan 6, 2012
1 parent 38cf263 commit 41d9455
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 26 deletions.
3 changes: 3 additions & 0 deletions clusterdfs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""This is the main module."""

__all__ = ['bufferedio','datablock','processname','networking','namenode','datanode','dfs']
37 changes: 16 additions & 21 deletions clusterdfs/bufferedio.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import io
import numpy
import gevent
import gevent.queue
import gevent.socket
import numpy

class IOBuffer(object):
def __init__(self):
self.buff = bytearray(io.DEFAULT_BUFFER_SIZE)
#self.buff = numpy.ndarray(shape=(io.DEFAULT_BUFFER_SIZE), dtype=numpy.byte)
#self.buff = bytearray(io.DEFAULT_BUFFER_SIZE)
self.buff = numpy.ndarray(shape=(io.DEFAULT_BUFFER_SIZE), dtype=numpy.uint8)
self.mem = memoryview(self.buff)
self.length = 0

def data(self):
return self.mem[0:self.length]

class BufferedIO(object):
def __init__(self, callback, num_buffers=2):
def __init__(self, size=0, callback=None, num_buffers=2):
assert callback!=None
assert size>0
assert num_buffers>0

self.write_queue = gevent.queue.Queue()
self.read_queue = gevent.queue.Queue()

Expand All @@ -26,6 +30,7 @@ def __init__(self, callback, num_buffers=2):
self.callback = callback
self.end_reading = False
self.processed = 0
self.size = size

def process(self):
while (not self.end_reading) or self.read_queue.qsize()>0:
Expand All @@ -37,7 +42,7 @@ def fill_buffer(self):
assert False, 'Unimplemented method'

def finished(self):
assert False, 'Unimplemented method'
return self.processed >= self.size

def run(self):
while not self.finished():
Expand All @@ -50,38 +55,28 @@ def run(self):
self.reader.join()

class FileBufferedIO(BufferedIO):
def __init__(self, f, *args, **kwargs):
def __init__(self, f, *args, offset=0, **kwargs):
BufferedIO.__init__(self, *args, **kwargs)
self.fio = io.open(f, 'rb')
self.tofinish = False

def finished(self):
return self.tofinish
self.fio.seek(offset)

def fill_buffer(self, iobuffer):
assert not self.tofinish
num = self.fio.readinto(iobuffer.buff)
iobuffer.length = num
if num==0: self.tofinish = True
return num

class SocketBufferedIO(BufferedIO):
def __init__(self, socket, size, *args, **kwargs):
def __init__(self, socket, *args, **kwargs):
BufferedIO.__init__(self, *args, **kwargs)
self.socket = socket
self.received = 0
self.size = size

def finished(self):
return self.received==self.size

def fill_buffer(self, iobuffer):
assert self.received<self.size

num = self.socket.recv_into(iobuffer.buff, min(len(iobuffer.buff), self.size-self.received))
num = self.socket.recv_into(iobuffer.buff, min(len(iobuffer.buff), self.size-self.processed))
if num>0:
iobuffer.length = num
self.received += num
return num
else:
raise IOError("Socket disconnected.")

__all__ = ['SocketBufferedIO','FileBufferedIO']
22 changes: 17 additions & 5 deletions clusterdfs/datanode.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DataNodeHeader(object):
OP_RETRIEVE = 1
OP_REMOVE = 2

class DataNodeQuery(ServerHandle):
class DataNodeStore(ServerHandle):
def process_query(self):
if self.header['op']==DataNodeHeader.OP_STORE:
return self.store_block()
Expand Down Expand Up @@ -101,19 +101,31 @@ def store_block(self):
def retrieve_block(self):
# Read block properties
block_id = self.header['id']
path = os.path.join(self.server.config.datadir, block_id)
block_path = os.path.join(self.server.config.datadir, block_id)
block_size = os.path.getsize(path)
logging.info("Sending block '%s' (%d bytes) to %s."%(block_id, block_size, self.address))
block_offset = self.header['offset'] if ('offset' in self.header) else 0
block_length = self.header['length'] if ('length' in self.header) else block_size

# Do error control
if block_length+block_offset < block_size:
return ServerResponse.error(msg='The requested data is larger than block_size.')

# Measuring size
logging.info("Sending block '%s' (%d bytes, %d offset) to %s."%(block_id, block_length, block_offset, self.address))

# Send block size
self.send(block_size)
self.send(block_length)

# Process block
for data in FileIterable(path):
self.socket.sendall(data)

return ServerResponse.ok(msg='Block retrieved successfully.')

def retrieve_block_callback(self, data):
"""Called by FileBufferedIO in :py:meth:retrieve_block"""
self.socket.sendall(data)

class DataNodeNotifier(object):
def __init__(self, config, server):
self.config = config
Expand Down Expand Up @@ -146,7 +158,7 @@ class DataNode(Server):
def __init__(self, config):
self.config = config
logging.info("Configuring DataNode to listen on localhost:%d"%(self.config.port))
Server.__init__(self, DataNodeQuery, port=self.config.port)
Server.__init__(self, DataNodeStore, port=self.config.port)
self.notifier = DataNodeNotifier(self.config, self)
self.lock_file = os.path.join(self.config.datadir, '.lock')

Expand Down
Empty file modified clusterdfs/dfs.py
100755 → 100644
Empty file.
Empty file modified clusterdfs/namenode.py
100755 → 100644
Empty file.

0 comments on commit 41d9455

Please sign in to comment.