-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathcauchy_decoding
executable file
·79 lines (63 loc) · 2.57 KB
/
cauchy_decoding
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python
from __future__ import division
import numpy
import random
import logging
import logging.config
import operator
import gevent
import itertools
from clusterdfs.networking import Client
from clusterdfs.bufferedio import IOBuffer, InputStream
from clusterdfs.cauchyec import CauchyEC
from clusterdfs.datanode import DataNodeHeader
def iterzipbuffers(iterables):
iterators = [iter(i) for i in iterables]
while True:
yield [next(i) for i in iterators]
class CauchyDecoding(object):
def __init__(self, n, k):
m = n-k
w = 4
nodes = [('thinclient-%02d'%(i),7777) for i in xrange(51)]
random.shuffle(nodes)
#nodes = nodes[:n-1]+[('localhost',7777)]
nodes = nodes[:n-1]
clients = [Client(*node) for node in nodes]
raw_clients = clients[:k]
encoded_clients = clients[k:]
for i,client in enumerate(raw_clients):
client.send({'op':DataNodeHeader.OP_RETRIEVE, 'id':'girl.64mb.part%d'%(i)})
#client.send({'op':DataNodeHeader.OP_RETRIEVE, 'id':'part%d'%(i)})
for i,client in enumerate(encoded_clients):
client.send({'op':DataNodeHeader.OP_STORE, 'id':'ec_coded'})
#client.send({'op':DataNodeHeader.OP_STORE, 'id':'ec_coded_%d'%(i)})
stream_readers = [c.get_reader() for c in raw_clients]
size = None
for r in stream_readers:
if size!=None:
assert r.input_stream.size==size
else:
size = r.input_stream.size
input_stream = InputStream(size)
for client in encoded_clients:
client.send(input_stream)
code = CauchyEC(k, m, bitfield=w)
encoded_iobuffers = [IOBuffer() for i in xrange(m)]
encoded_buffers = [iobuffer.as_numpy_byte_array() for iobuffer in encoded_iobuffers]
for raw_iobuffers in iterzipbuffers(stream_readers):
raw_buffers = [iobuffer.as_numpy_byte_array() for iobuffer in raw_iobuffers]
code.encode(raw_buffers, encoded_buffers)
length = None
for iobuffer in raw_iobuffers:
if length==None:
length = iobuffer.length
else:
assert length==iobuffer.length
iobuffer.reset()
for iobuffer, client in itertools.izip(encoded_iobuffers, encoded_clients):
iobuffer.length = length
client.send(iobuffer)
for client in itertools.chain(encoded_clients, raw_clients):
client.assert_ack()
CauchyCoding(16, 11)