Skip to content

Commit

Permalink
Incremental Backup
Browse files Browse the repository at this point in the history
Backup of files as major overhaul of functionality performed
  • Loading branch information
sjarvie committed Mar 7, 2014
1 parent e7ed63e commit 8d9842a
Show file tree
Hide file tree
Showing 11 changed files with 728 additions and 102 deletions.
155 changes: 95 additions & 60 deletions datanode.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* Gob EchoClient
*/

package main
import (
"fmt"
Expand All @@ -10,117 +9,153 @@ import (
"io/ioutil"
)

const serverAddr = "localhost:8080"
var id, path string // command line arguments
var action = "HB"

//Action that the data node is currently undergoing
// HB : Heartbeat : Waiting for a command
// LIST : Sending directory contents
const SERVERADDR = "localhost:8080"
const SIZEOFBLOCK = 1000000
var id, blockpath string // command line arguments
var state = "HB"



type Packet struct {
ID string
Command string
Data string
Files [] string

// A file is composed of one or more blocks
type datablock struct {
header blockheader
data []byte
}

func (p Packet) String() string {
s := p.ID + " : " + p.Command
return s
// block headers hold block data information
type blockheader struct {
datanodeID string
filename string //the filename including the path "/data/test.txt"
size, sequenceNum, numBlocks int //size in bytes, the order of the chunk within file
//TODO checksum
}

// packets are sent over the network
type packet struct {
from, to string
cmd string
data block
headers []blockheader
}

func recv(conn net.Conn, packet_channel chan Packet){
// receive a packet and send to syncronized channel
func ReceivePacket(conn net.Conn, p chan Packet){
decoder := gob.NewDecoder(conn)
for {
var responsePacket Packet
decoder.Decode(&responsePacket)
packet_channel <- responsePacket
var r Packet
decoder.Decode(&r)
p <- r
}
}

func (p *packet) String() string {
s := "NN" + p.from
return s
}


func handleResponse(response Packet){
if response.Command == "ACK" {
action = "HB"
} else if response.Command == "LIST" {
action = "LIST"
fmt.Println("action now " + action)
// change state to handle request from server
func HandleResponse(r Packet){
if r.cmd == "ACK" {
state = "HB"
} else if r.cmd == "LIST" {
state = "LIST"
fmt.Println("state now " + state)
}
}

func performAction(encoder *gob.Encoder){
// handle a state's action
func PerformAction(encoder *gob.Encoder){
p := new(Packet);
p.ID = id
if action == "HB" {
p.Command = "HB"
} else if action == "LIST"{
files := listFS()
p.Files = make([]string, len(files))
for i, f := range files {
fmt.Println(f.Name(), "\t", f.Size())
p.Files[i] = f.Name()
p.from = id
p.to = "NN" //TODO better naming conventions
if state == "HB" {
p.cmd = "HB"
} else if state == "LIST"{
list := GetBlockInfos()
p.headers = make([]blockheader, len(list))

for _, b := range list {
apend(p.headers,b)
}
p.Command = "LIST"
p.cmd = "LIST"
}
encoder.Encode(p) // TODO syncronize this call
encoder.Encode(p)
}

//func blocksFromFile

// List directory contents
// List block contents
// Later add recursion and switch to blocks
func listFS() []os.FileInfo {
files, _ := ioutil.ReadDir(path)
return files
func GetBlockHeaders() []blockheader {

list, err := ioutil.ReadDir(blockpath)
CheckError(err)
headers := make([]blockheader, len())

//for _, f := range files {
// fmt.Println(f.Name())
//}
for f in range list {
var b block
ReadGob(f.Name, &b)
append(headers, b)
}
return headers
}


func ReadGob(fileName string, key interface{}) {
inFile, err := os.Open(fileName)
CheckError(err)
decoder := gob.NewDecoder(inFile)
err = decoder.Decode(key)
CheckError(err)
inFile.Close()
}

func WriteGob(fileName string, key interface{}) {
outFile, err := os.Create(fileName)
CheckError(err)
encoder := gob.NewEncoder(outFile)
err = encoder.Encode(key)
CheckError(err)
outFile.Close()
}


func main() {

if len(os.Args) != 3 {
fmt.Println("Usage: ", os.Args[0], "id path")
os.Exit(1)
}
id = os.Args[1]
path = os.Args[2]
blockpath = os.Args[2]

conn, err := net.Dial("tcp", serverAddr)
checkError(err)
conn, err := net.Dial("tcp", SERVERADDR)
CheckError(err)

encoder := gob.NewEncoder(conn)

packet_channel := make(chan Packet)
packetChannel := make(chan Packet)
tick := time.Tick(1000 * time.Millisecond)

go recv(conn,packet_channel)
go ReceivePacket(conn,packet_channel)

for {
select {
case <-tick:
performAction(encoder)
fmt.Println("action : " + action)
case response := <-packet_channel:
handleResponse(response)
fmt.Println("Received : " + response.String())
PerformAction(encoder)
fmt.Println("state : " + state)
case r := <-packetChannel:
PerformAction(r)
fmt.Println("Received : " + r.String())
}

}
os.Exit(0)
}

func checkError(err error) {
func CheckError(err error) {
if err != nil {
fmt.Println("Fatal error ", err.Error())
os.Exit(1)
//os.Exit(1)
}
}
1 change: 1 addition & 0 deletions file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello World!
Loading

0 comments on commit 8d9842a

Please sign in to comment.