Skip to content

Commit

Permalink
testing and error handling for block assignment and distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
sjarvie committed Mar 12, 2014
1 parent 2ec1f33 commit 1a49a23
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 36 deletions.
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
RETRIEVEBLOCK = iota // request to retrieve a Block
DISTRIBUTE = iota // request to distribute a Block to a datanode
GETHEADERS = iota // request to retrieve the headers of a given filename
ERROR = iota // notification of a failed request
ERROR = iota // notification of a failed request
)

// A file is composed of one or more Blocks
Expand All @@ -61,7 +61,7 @@ type Packet struct {
SRC string // source ID
DST string // destination ID
CMD int // command for the handler
Err string // optional error explanation
Err string // optional error explanation
Data Block // optional Block
Headers []BlockHeader // optional BlockHeader list
}
Expand Down
3 changes: 2 additions & 1 deletion datanode/datanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ const (
RETRIEVEBLOCK = iota // request to retrieve a Block
DISTRIBUTE = iota // request to distribute a Block to a datanode
GETHEADERS = iota // request to retrieve the headers of a given filename
ERROR = iota // notification of a failed request
ERROR = iota // notification of a failed request
)

// A file is composed of one or more Blocks
type Block struct {
Header BlockHeader // metadata
Expand Down
2 changes: 1 addition & 1 deletion godfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
cmd := os.Args[1]

if cmd == "namenode" {
namenode.Init()
namenode.Run()
} else if cmd == "datanode" {
id := os.Args[2]
fspath := os.Args[3]
Expand Down
43 changes: 43 additions & 0 deletions namenode/assignblock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package namenode

import (
"testing"
)

func TestInvalidInput(t *testing.T) {

Init()

dn1 := datanode{"DN1", true, 0}
datanodemap["DN1"] = &dn1

// Test a bad block
var b1 Block

_, err := AssignBlock(b1)

if err == nil {
t.Errorf("Distributed invalid Block")
}
}

func TestValidInput(t *testing.T) {

Init()

dn1 := datanode{"DN1", true, 0}
datanodemap["DN1"] = &dn1

// Test a bad block
var b1 Block

inh := BlockHeader{"DN1", "/out.txt", 1, 0, 1}
b1.Header = inh
b1.Data = make([]byte, 1, 1)

_, err := AssignBlock(b1)

if err != nil {
t.Errorf("Could not insert valid block")
}
}
10 changes: 10 additions & 0 deletions namenode/mergenode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,14 @@ func TestDuplicateInsert(t *testing.T) {
t.Errorf("%s", err)
}

blks, ok := filemap["/out.txt"]
if !ok {
t.Errorf("merge failed ")
}
harr1, ok := blks[0]

if len(harr1) != 1 {
t.Errorf("merge failed: Expected 1 , Got " + strconv.Itoa(len(harr1)))
}

}
74 changes: 42 additions & 32 deletions namenode/namenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
RETRIEVEBLOCK = iota // request to retrieve a Block
DISTRIBUTE = iota // request to distribute a Block to a datanode
GETHEADERS = iota // request to retrieve the headers of a given filename
ERROR = iota // notification of a failed request
ERROR = iota // notification of a failed request
)

// A file is composed of one or more Blocks
Expand All @@ -67,7 +67,7 @@ type Packet struct {
SRC string // source ID
DST string // destination ID
CMD int // command for the handler
Err string // optional error explanation
Err string // optional error explanation
Data Block // optional Block
Headers []BlockHeader // optional BlockHeader list
}
Expand Down Expand Up @@ -227,50 +227,46 @@ func MergeNode(h BlockHeader) error {
return nil
}

// DistributeBlocks creates packets based on BlockHeader metadata and enqueues them for transmission
func DistributeBlocks(bls []Block) {
// AssignBlocks creates packets based on BlockHeader metadata and enqueues them for transmission
func AssignBlocks(bls []Block) {
for _, b := range bls {
DistributeBlock(b)
AssignBlock(b)
}
}

// DistributeBlocks chooses a datanode which balances the load across nodes for a block and enqueues
// AssignBlocks chooses a datanode which balances the load across nodes for a block and enqueues
// the block for distribution
func DistributeBlock(b Block) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic ", r)
fmt.Println("Unable to distribute block ", b)
return
}
}()
func AssignBlock(b Block) (Packet, error) {
p := new(Packet)

if &b == nil || &b.Header == nil || &b.Data == nil || b.Header.Filename == "" ||
b.Header.Size <= 0 || b.Header.BlockNum < 0 || b.Header.NumBlocks <= b.Header.BlockNum {
return *p, errors.New("Invalid Block input")
}

if len(datanodemap) < 1 {
fmt.Println("No connected datanodes, cannot save file")
return
return *p, errors.New("Cannot distribute Block, no datanodes are connected")
}

//Random load balancing
// Create Packet and send block
p.SRC = id
p.CMD = BLOCK

//Random load balancing
nodeIDs := make([]string, len(datanodemap), len(datanodemap))
i := 0
for _, v := range datanodemap {
nodeIDs[i] = v.ID
i++
}

rand.Seed(time.Now().UTC().UnixNano())

p := new(Packet)

nodeindex := rand.Intn(len(nodeIDs))
p.DST = nodeIDs[nodeindex]
b.Header.DatanodeID = p.DST

p.SRC = id
p.CMD = BLOCK
p.Data = Block{b.Header, b.Data}
sendChannel <- *p

return *p, nil

}

Expand Down Expand Up @@ -336,7 +332,13 @@ func HandlePacket(p Packet) {
case DISTRIBUTE:
b := p.Data
fmt.Println("Distributing Block", b)
DistributeBlock(b)
p, err := AssignBlock(b)
if err != nil {
r.CMD = ERROR
r.Err = err.Error()
}
sendChannel <- p

r.CMD = ACK
case RETRIEVEBLOCK:
r.CMD = RETRIEVEBLOCK
Expand Down Expand Up @@ -370,13 +372,14 @@ func HandlePacket(p Packet) {
fname := p.Headers[0].Filename
blockMap, ok := filemap[fname]
if !ok {
fmt.Println(filemap)
r.CMD = ERROR
r.Err = "File not found " + fname
fmt.Println("Requested file in filesystem, ", fname)
fmt.Println("Requested file in filesystem not found, ", fname)
break
}

_,ok = blockMap[0]
_, ok = blockMap[0]
if !ok {
r.CMD = ERROR
r.Err = "Could not locate first block in file"
Expand Down Expand Up @@ -499,29 +502,36 @@ func HandleConnection(conn net.Conn) {
func Init() {

// setup filesystem
fmt.Println("Initializing Namenode")
root = &filenode{"/", nil, make([]*filenode, 0, 5)}
filemap = make(map[string]map[int][]BlockHeader)

// setup communication
headerChannel = make(chan BlockHeader)
//blockReceiverChannel = make(chan Block)
sendChannel = make(chan Packet)
sendMap = make(map[string]*json.Encoder)
sendMapLock = sync.Mutex{}
clientMap = make(map[BlockHeader]string)
clientMapLock = sync.Mutex{}

id = "NN"
datanodemap = make(map[string]*datanode)
}

// Run starts the namenode
func Run() {

// setup filesystem
Init()

// Start communication
go HandleBlockHeaders()
go SendPackets()

id = "NN"
listener, err := net.Listen("tcp", SERVERADDR)
if err != nil {
log.Fatal("Fatal error ", err.Error())
}

datanodemap = make(map[string]*datanode)

// listen for datanode connections
for {
conn, err := listener.Accept()
Expand Down

0 comments on commit 1a49a23

Please sign in to comment.