Skip to content

Commit

Permalink
configuration parsing for client and namenode, bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sjarvie committed Mar 13, 2014
1 parent dce2d56 commit df71792
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 69 deletions.
95 changes: 79 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"bufio"
"bytes"
"encoding/json"
"encoding/xml"
"strconv"

"errors"
"fmt"
"io"
Expand All @@ -15,10 +18,11 @@ import (
"sync"
)

const SERVERADDR = "localhost:8080"
const SIZEOFBLOCK = 4096

var id string // client id
// Config Options
var serverhost string // serverhost
var serverport string // serverport
var SIZEOFBLOCK int //size of block in bytes
var id string // the namenode id
var state = HB // internal statemachine
var sendChannel chan Packet // for outbound Packets
var receiveChannel chan Packet // for in bound Packets
Expand All @@ -41,6 +45,17 @@ const (
ERROR = iota // notification of a failed request
)

// The XML parsing structures for configuration options
type ConfigOptionList struct {
XMLName xml.Name `xml:"ConfigOptionList"`
ConfigOptions []ConfigOption `xml:"ConfigOption"`
}

type ConfigOption struct {
Key string `xml:"key,attr"`
Value string `xml:",chardata"`
}

// A file is composed of one or more Blocks
type Block struct {
Header BlockHeader // metadata
Expand All @@ -51,7 +66,7 @@ type Block struct {
type BlockHeader struct {
DatanodeID string // ID of datanode which holds the block
Filename string //the remote name of the block including the path "/test/0"
Size int64 // size of Block in bytes
Size int // size of Block in bytes
BlockNum int // the 0 indexed position of Block within file
NumBlocks int // total number of Blocks in file
}
Expand Down Expand Up @@ -124,7 +139,7 @@ func BlockHeadersFromFile(localname, remotename string) []BlockHeader {
}()

// Create Blocks
numblocks := int((info.Size() / SIZEOFBLOCK) + 1)
numblocks := int(info.Size()/int64(SIZEOFBLOCK) + 1)
headers = make([]BlockHeader, numblocks, numblocks)
blocknum := 0

Expand All @@ -133,9 +148,9 @@ func BlockHeadersFromFile(localname, remotename string) []BlockHeader {
remotename = "/" + remotename
}

n := int64(SIZEOFBLOCK)
n := SIZEOFBLOCK
if blocknum == (numblocks - 1) {
n = int64(info.Size()) % SIZEOFBLOCK
n = int(info.Size() % int64(SIZEOFBLOCK))

}

Expand Down Expand Up @@ -182,7 +197,7 @@ func DistributeBlocksFromFile(localname, remotename string) error {
r := bufio.NewReader(fi)

// Create Blocks
total := int((info.Size() / SIZEOFBLOCK) + 1)
total := int((info.Size() / int64(SIZEOFBLOCK)) + 1)

num := 0

Expand All @@ -209,7 +224,7 @@ func DistributeBlocksFromFile(localname, remotename string) error {
remotename = "/" + remotename
}

h := BlockHeader{"", remotename, int64(n), num, total}
h := BlockHeader{"", remotename, n, num, total}

data := make([]byte, 0, n)
data = w.Bytes()[0:n]
Expand All @@ -220,6 +235,7 @@ func DistributeBlocksFromFile(localname, remotename string) error {
return err
}

fmt.Printf(".")
// generate new Block
num += 1

Expand All @@ -228,6 +244,7 @@ func DistributeBlocksFromFile(localname, remotename string) error {
return err
}

fmt.Printf(" Done! \n")
return nil
}

Expand All @@ -250,14 +267,17 @@ func DistributeBlock(b Block) error {

func DistributeBlocks(blocks []Block) error {

fmt.Println("Distributing file blocks")
for _, b := range blocks {

err := DistributeBlock(b)

if err != nil {
return errors.New("Could not distribute blocks to namenode")
return errors.New("Distrubution Error: " + err.Error())
}
fmt.Printf(".")
}
fmt.Printf(" Done! \n")

return nil
}

Expand All @@ -278,7 +298,7 @@ func RetrieveFile(localname, remotename string) {
p.SRC = id
p.CMD = GETHEADERS
p.Headers = make([]BlockHeader, 1, 1)
p.Headers[0] = BlockHeader{"", remotename, int64(0), 0, 0}
p.Headers[0] = BlockHeader{"", remotename, 0, 0, 0}
encoder.Encode(*p)

// get header list
Expand Down Expand Up @@ -372,7 +392,6 @@ func ReceiveInput() {
}

// generate blocks from new File for distribution
fmt.Println("Distributing file blocks")
err = DistributeBlocksFromFile(localname, remotename)

if err != nil {
Expand All @@ -393,11 +412,55 @@ func ReceiveInput() {

}

// Parse Config sets up the node with the provided XML file
func ParseConfigXML(configpath string) error {
xmlFile, err := os.Open(configpath)
if err != nil {
return err
}
defer xmlFile.Close()

var list ConfigOptionList
err = xml.NewDecoder(xmlFile).Decode(&list)
if err != nil {
return err
}

for _, o := range list.ConfigOptions {

switch o.Key {
// single client for now
// case "id":
// id = o.Value
case "serverhost":
serverhost = o.Value
case "serverport":
serverport = o.Value
case "sizeofblock":
n, err := strconv.Atoi(o.Value)
if err != nil {
return err
}

if n < 4096 {
return errors.New("Buffer size must be greater than or equal to 4096 bytes")
}
SIZEOFBLOCK = n
default:
return errors.New("Bad ConfigOption received Key : " + o.Key + " Value : " + o.Value)
}
}

return nil
}

// Initializes the client and begins communication
func Init() {
func Run(configpath string) {

ParseConfigXML(configpath)

id = "C"
conn, err := net.Dial("tcp", SERVERADDR)
conn, err := net.Dial("tcp", serverhost+":"+serverport)
CheckError(err)

encoder = json.NewEncoder(conn)
Expand Down
30 changes: 30 additions & 0 deletions client/configparser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package client

import (
"testing"
)

func TestValidXML(t *testing.T) {

err := ParseConfigXML("exampleclient.xml")
if err != nil {
t.Errorf(err.Error())
}
// Single client for now
// if id != "C" {
// t.Errorf("Config did not set id correctly")
// }

if serverhost != "localhost" {
t.Errorf("Config did not set host correctly")
}

if serverport != "8080" {
t.Errorf("Config did not set port correctly")
}

if SIZEOFBLOCK != 4096 {
t.Errorf("Config did not set SIZEOFBLOCK correctly")
}

}
5 changes: 5 additions & 0 deletions client/exampleclient.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<ConfigOptionList>
<ConfigOption key="serverhost">localhost</ConfigOption>
<ConfigOption key="serverport">8080</ConfigOption>
<ConfigOption key="sizeofblock">4096</ConfigOption>
</ConfigOptionList>
34 changes: 34 additions & 0 deletions datanode/configparser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datanode

import (
"testing"
)

func TestValidXML(t *testing.T) {

err := ParseConfigXML("exampledatanode.xml")
if err != nil {
t.Errorf(err.Error())
}

if id != "DN1" {
t.Errorf("Config did not set id correctly")
}

if root != "/DN1" {
t.Errorf("Config did not set id correctly")
}

if serverhost != "localhost" {
t.Errorf("Config did not set host correctly")
}

if serverport != "8080" {
t.Errorf("Config did not set port correctly")
}

if SIZEOFBLOCK != 4096 {
t.Errorf("Config did not set SIZEOFBLOCK correctly")
}

}
Loading

0 comments on commit df71792

Please sign in to comment.