Skip to content

Commit

Permalink
fixed bug regarding small buffer sizes, better retrieval informatics
Browse files Browse the repository at this point in the history
  • Loading branch information
sjarvie committed Mar 13, 2014
1 parent 0b2ed2f commit dce2d56
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 66 deletions.
119 changes: 67 additions & 52 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const SERVERADDR = "localhost:8080"
const SIZEOFBLOCK = 1000
const SIZEOFBLOCK = 4096

var id string // client id
var state = HB // internal statemachine
Expand Down Expand Up @@ -51,7 +51,7 @@ type Block struct {
type BlockHeader struct {
DatanodeID string // ID of datanode which holds the block
Filename string //the remote name of the block including the path "/test/0"
Size uint64 // size of Block in bytes
Size int64 // size of Block in bytes
BlockNum int // the 0 indexed position of Block within file
NumBlocks int // total number of Blocks in file
}
Expand All @@ -66,6 +66,20 @@ type Packet struct {
Headers []BlockHeader // optional BlockHeader list
}

// Error formatting stucture
type errorString struct {
s string
}

func (e *errorString) Error() string {
return e.s
}

// New returns an error that formats as the given text.
func New(text string) error {
return &errorString{text}
}

// SendPackets encodes packets and transmits them to their proper recipients
func SendPackets(encoder *json.Encoder, ch chan Packet) {
for p := range ch {
Expand Down Expand Up @@ -119,9 +133,9 @@ func BlockHeadersFromFile(localname, remotename string) []BlockHeader {
remotename = "/" + remotename
}

n := uint64(SIZEOFBLOCK)
n := int64(SIZEOFBLOCK)
if blocknum == (numblocks - 1) {
n = uint64(info.Size()) % SIZEOFBLOCK
n = int64(info.Size()) % SIZEOFBLOCK

}

Expand Down Expand Up @@ -153,97 +167,95 @@ func WriteJSON(fileName string, key interface{}) {

// BlocksFromFile split a File into Blocks for storage on the filesystem
// in the future this will read a fixed number of blocks at a time from disc for reasonable memory utilization
func BlocksFromFile(localname, remotename string) []Block {

var bls []Block
func DistributeBlocksFromFile(localname, remotename string) error {

info, err := os.Lstat(localname)
if err != nil {
panic(err)
return err
}

// get read buffer
fi, err := os.Open(localname)
if err != nil {
panic(err)
return err
}
defer func() {
if err := fi.Close(); err != nil {
panic(err)
}
}()
r := bufio.NewReader(fi)

// Create Blocks
total := int((info.Size() / SIZEOFBLOCK) + 1)
bls = make([]Block, 0, total)

num := 0

for {
for num < total {

// read a chunk from file uint64o Block data buffer
// read a chunk from file into Block data buffer
buf := make([]byte, SIZEOFBLOCK)
w := bytes.NewBuffer(nil)

n, err := r.Read(buf)
if err != nil && err != io.EOF {
panic(err)
return err
}
if n == 0 {
break
}

if _, err := w.Write(buf[:n]); err != nil {
panic(err)
return err
}

// write full Block to disc
if strings.Index(remotename, "/") != 0 {
remotename = "/" + remotename
}

h := BlockHeader{"", remotename, uint64(n), num, total}
h := BlockHeader{"", remotename, int64(n), num, total}

data := make([]byte, 0, n)
data = w.Bytes()[0:n]
b := Block{h, data}
bls = append(bls, b)

err = DistributeBlock(b)
if err != nil {
return err
}

// generate new Block
num += 1

}
return bls
}
if err := fi.Close(); err != nil {
return err
}

type errorString struct {
s string
return nil
}

func (e *errorString) Error() string {
return e.s
}
func DistributeBlock(b Block) error {

// New returns an error that formats as the given text.
func New(text string) error {
return &errorString{text}
p := new(Packet)
p.SRC = id
p.DST = "NN"
p.CMD = DISTRIBUTE
p.Data = b
encoder.Encode(*p)

var r Packet
decoder.Decode(&r)
if r.CMD != ACK {
return errors.New("Could not distribute block to namenode")
}
return nil
}

func DistributeBlocks(blocks []Block) error {

for _, b := range blocks {
p := new(Packet)
p.SRC = id
p.DST = "NN"
p.CMD = DISTRIBUTE
p.Data = b
encoder.Encode(*p)

var r Packet
decoder.Decode(&r)
if r.CMD != ACK {
return errors.New("Could not distribute block to namenode")
err := DistributeBlock(b)

if err != nil {
return errors.New("Could not distribute blocks to namenode")
}
}
return nil
Expand All @@ -266,7 +278,7 @@ func RetrieveFile(localname, remotename string) {
p.SRC = id
p.CMD = GETHEADERS
p.Headers = make([]BlockHeader, 1, 1)
p.Headers[0] = BlockHeader{"", remotename, uint64(0), 0, 0}
p.Headers[0] = BlockHeader{"", remotename, int64(0), 0, 0}
encoder.Encode(*p)

// get header list
Expand All @@ -290,12 +302,13 @@ func RetrieveFile(localname, remotename string) {
return
}
defer outFile.Close()
w := bufio.NewWriter(outFile)
w := bufio.NewWriterSize(outFile, SIZEOFBLOCK)

// for each header, retrieve its block and write to disc
headers := r.Headers

fmt.Println("Received File Headers for ", p.Headers[0].Filename, ". Retrieving Blocks ")
fmt.Println("Received File Headers for ", p.Headers[0].Filename, ". Retrieving ", r.Headers[0].NumBlocks, " Blocks ")

for _, h := range headers {

// send request
Expand All @@ -305,6 +318,7 @@ func RetrieveFile(localname, remotename string) {
p.CMD = RETRIEVEBLOCK
p.Headers = make([]BlockHeader, 1, 1)
p.Headers[0] = h

encoder.Encode(*p)

// receive block
Expand All @@ -315,15 +329,18 @@ func RetrieveFile(localname, remotename string) {
fmt.Println("Received bad packet ", p)
return
}

b := r.Data
n := b.Header.Size

_, err := w.Write(b.Data[:n])
if err != nil {
panic(err)
}
fmt.Printf(".")
w.Flush()
}
w.Flush()

fmt.Printf(" Done! \n")
fmt.Println("Wrote file to disc at ", localname)
}

Expand Down Expand Up @@ -355,18 +372,16 @@ func ReceiveInput() {
}

// generate blocks from new File for distribution
blocks := BlocksFromFile(localname, remotename)

fmt.Println("Distributing ", len(blocks), "file blocks")
fmt.Println("Distributing file blocks")
err = DistributeBlocksFromFile(localname, remotename)

// send blocks to namenode for distribution

err = DistributeBlocks(blocks)
if err != nil {
fmt.Println(err)
continue
}

// send blocks to namenode for distribution

} else {
remotename := file1
localname := file2
Expand Down
12 changes: 6 additions & 6 deletions datanode/datanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Block struct {
type BlockHeader struct {
DatanodeID string // ID of datanode which holds the block
Filename string //the remote name of the block including the path "/test/0"
Size uint64 // size of Block in bytes
Size int64 // size of Block in bytes
BlockNum int // the 0 indexed position of Block within file
NumBlocks int // total number of Blocks in file
}
Expand Down Expand Up @@ -126,10 +126,10 @@ func WriteBlock(b Block) {
fname := h.Filename + "/" + strconv.Itoa(h.BlockNum)

for _, dir := range list {
fmt.Println("Looking for directory ", h.Filename)
fmt.Println("Comparing to ", dir.Name())

if "/"+dir.Name() == h.Filename {
WriteJSON(root+fname, b)
log.Println("Wrote Block ", root+fname, "to disc")
return
}
}
Expand All @@ -142,7 +142,7 @@ func WriteBlock(b Block) {

fname = h.Filename + "/" + strconv.Itoa(h.BlockNum)
WriteJSON(root+fname, b)
log.Println("Wrote Block ", fname, "to disc")
log.Println("Wrote Block ", root+fname, "to disc")
return

}
Expand Down Expand Up @@ -191,11 +191,11 @@ func BlockFromHeader(h BlockHeader) Block {
var b Block
if "/"+dir.Name() == h.Filename {
ReadJSON(root+fname, &b)
fmt.Println("Found block ", b)
fmt.Println("Found block ", root+fname)
return b
}
}
fmt.Println("Block not found!")
fmt.Println("Block not found ", root+fname)
return errBlock
}

Expand Down
2 changes: 1 addition & 1 deletion namenode/examplenamenode.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
<ConfigOption key="namenodeid">NN</ConfigOption>
<ConfigOption key="listenhost">localhost</ConfigOption>
<ConfigOption key="listenport">8080</ConfigOption>
<ConfigOption key="sizeofblock">1000</ConfigOption>
<ConfigOption key="sizeofblock">4096</ConfigOption>
</ConfigOptionList>
19 changes: 12 additions & 7 deletions namenode/namenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
)

// Config Options
var host string // listen host
var port string // listen port
var SIZEOFBLOCK int //size of block in bytes
var id string // the namenode id
var host string // listen host
var port string // listen port
var SIZEOFBLOCK int64 //size of block in bytes
var id string // the namenode id

var headerChannel chan BlockHeader // processes headers into filesystem
var sendChannel chan Packet // enqueued packets for transmission
Expand Down Expand Up @@ -71,7 +71,7 @@ type Block struct {
type BlockHeader struct {
DatanodeID string // ID of datanode which holds the block
Filename string //the remote name of the block including the path "/test/0"
Size uint64 // size of Block in bytes
Size int64 // size of Block in bytes
BlockNum int // the 0 indexed position of Block within file
NumBlocks int // total number of Blocks in file
}
Expand All @@ -98,7 +98,7 @@ type filenode struct {
type datanode struct {
ID string
listed bool
size uint64
size int64
}

// By is used to select the fields used when comparing datanodes
Expand Down Expand Up @@ -535,10 +535,15 @@ func ParseConfigXML(configpath string) error {
case "listenport":
port = o.Value
case "sizeofblock":
n, err := strconv.Atoi(o.Value)
n, err := strconv.ParseInt(o.Value, 0, 64)
if err != nil {
return err
}

if n < int64(4096) {
return errors.New("Buffer size must be greater than or equal to 4096 bytes")
}
fmt.Println("SIZEOFBLOCK", n)
SIZEOFBLOCK = n
default:
return errors.New("Bad ConfigOption received Key : " + o.Key + " Value : " + o.Value)
Expand Down

0 comments on commit dce2d56

Please sign in to comment.