Skip to content

Commit

Permalink
synchronized agent channel read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
chrislusf committed Oct 20, 2015
1 parent 673c684 commit ed47f82
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 40 deletions.
16 changes: 10 additions & 6 deletions agent/agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type AgentServer struct {
Port int
name2Store map[string]*LiveDataStore
dir string
name2StoreLock sync.Mutex
name2StoreCond *sync.Cond
wg sync.WaitGroup
l net.Listener
computeResource *resource.ComputeResource
Expand All @@ -71,12 +71,16 @@ func NewAgentServer(option *AgentServerOption) *AgentServer {
}
println("starting in", absoluteDir)
option.Dir = &absoluteDir

var lock sync.Mutex

as := &AgentServer{
Option: option,
Master: *option.Master,
Port: *option.Port,
dir: absoluteDir,
name2Store: make(map[string]*LiveDataStore),
Option: option,
Master: *option.Master,
Port: *option.Port,
dir: absoluteDir,
name2Store: make(map[string]*LiveDataStore),
name2StoreCond: sync.NewCond(&lock),
computeResource: &resource.ComputeResource{
CPUCount: *option.MaxExecutor,
CPULevel: *option.CPULevel,
Expand Down
6 changes: 3 additions & 3 deletions agent/agent_server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func (as *AgentServer) handleStart(conn net.Conn,
reply := &cmd.StartResponse{}

dir := path.Join(*as.Option.Dir, startRequest.GetDir())
os.MkdirAll(dir, os.ModeDir)
os.MkdirAll(dir, 0755)
err := rsync.FetchFilesTo(startRequest.GetHost()+":"+strconv.Itoa(int(startRequest.GetPort())), dir)
if err != nil {
log.Printf("Failed to download file: %v", err)
*reply.Error = err.Error()
reply.Error = proto.String(err.Error())
return reply
}

Expand All @@ -67,7 +67,7 @@ func (as *AgentServer) handleStart(conn net.Conn,
if err != nil {
log.Printf("Failed to start command %s under %s: %v",
cmd.Path, cmd.Dir, err)
*reply.Error = err.Error()
reply.Error = proto.String(err.Error())
} else {
reply.Pid = proto.Int32(int32(cmd.Process.Pid))
}
Expand Down
25 changes: 12 additions & 13 deletions agent/agent_server_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,25 @@ import (
"net"
"time"

"github.com/chrislusf/glow/io/store"
"github.com/chrislusf/glow/util"
)

func (as *AgentServer) handleLocalReadConnection(conn net.Conn, name string, offset int64) {
as.name2StoreLock.Lock()
ds, ok := as.name2Store[name]
if !ok {
s, err := store.NewLocalFileDataStore(as.dir, fmt.Sprintf("%s-%d", name, as.Port))
if err != nil {
// log.Printf("Failed to create queue on disk: %v", err)
as.name2StoreLock.Unlock()
return
var ds *LiveDataStore
var ok bool

as.name2StoreCond.L.Lock()
for {
ds, ok = as.name2Store[name]
if ok {
break
}
as.name2Store[name] = NewLiveDataStore(s)
ds = as.name2Store[name]
println(name, "is waiting to read...")
as.name2StoreCond.Wait()
}
as.name2StoreLock.Unlock()
as.name2StoreCond.L.Unlock()

// println(name, "start reading from:", offset)
println(name, "start reading from:", offset)

closeSignal := make(chan bool, 1)

Expand Down
38 changes: 24 additions & 14 deletions agent/agent_server_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,25 @@ import (

func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {

as.name2StoreLock.Lock()
as.name2StoreCond.L.Lock()
ds, ok := as.name2Store[name]
if !ok {
s, err := store.NewLocalFileDataStore(as.dir, fmt.Sprintf("%s-%d", name, as.Port))
if err != nil {
log.Printf("Failed to create a queue on disk: %v", err)
as.name2StoreLock.Unlock()
return
}
as.name2Store[name] = NewLiveDataStore(s)
ds = as.name2Store[name]
if ok {
as.doDelete(name)
}

s, err := store.NewLocalFileDataStore(as.dir, fmt.Sprintf("%s-%d", name, as.Port))
if err != nil {
log.Printf("Failed to create a queue on disk: %v", err)
as.name2StoreCond.L.Unlock()
return
}
as.name2StoreLock.Unlock()

as.name2Store[name] = NewLiveDataStore(s)
ds = as.name2Store[name]
println(name, "is broadcasting...")
as.name2StoreCond.Broadcast()

as.name2StoreCond.L.Unlock()

//register stream
go client.NewHeartBeater(as.Port, *as.Option.Master).StartChannelHeartBeat(ds.killHeartBeater, name)
Expand All @@ -52,15 +57,20 @@ func (as *AgentServer) handleWriteConnection(r io.Reader, name string) {

func (as *AgentServer) handleDelete(name string) {

as.name2StoreLock.Lock()
as.name2StoreCond.L.Lock()
defer as.name2StoreCond.L.Unlock()

as.doDelete(name)
}

func (as *AgentServer) doDelete(name string) {

ds, ok := as.name2Store[name]
if !ok {
as.name2StoreLock.Unlock()
return
}

delete(as.name2Store, name)
as.name2StoreLock.Unlock()

ds.Destroy()
}
5 changes: 4 additions & 1 deletion driver/context_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,8 @@ func (fcd *FlowContextDriver) CloseOutputChannels(fc *flow.FlowContext) {
}

func (option *DriverOption) RelatedFileNames() []string {
return strings.Split(option.RelatedFiles, strconv.QuoteRune(os.PathListSeparator))
if option.RelatedFiles != "" {
return strings.Split(option.RelatedFiles, strconv.QuoteRune(os.PathListSeparator))
}
return []string{}
}
5 changes: 5 additions & 0 deletions driver/rsync/fetch_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"fmt"
"io/ioutil"
"path/filepath"
"sync"

"github.com/chrislusf/glow/util"
)

var fetchLock sync.Mutex

type ListFileResult struct {
Files []FileHash `json:"files,omitempty"`
}
Expand All @@ -27,6 +30,8 @@ func ListFiles(server string) ([]FileHash, error) {
}

func FetchFilesTo(driverAddress string, dir string) error {
fetchLock.Lock()
defer fetchLock.Unlock()

fileList, err := ListFiles(driverAddress)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions driver/rsync/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func NewRsyncServer(file string, relatedFiles []string) (*RsyncServer, error) {
RelatedFiles: relatedFiles,
}
if fh, err := GenerateFileHash(file); err != nil {
log.Printf("Failed to read %s: %v", file, err)
log.Printf("Failed1 to read %s: %v", file, err)
} else {
rs.fileHashes = append(rs.fileHashes, *fh)
}
for _, f := range rs.RelatedFiles {
if fh, err := GenerateFileHash(f); err != nil {
log.Printf("Failed to read %s: %v", f, err)
log.Printf("Failed2 to read %s: %v", f, err)
} else {
rs.fileHashes = append(rs.fileHashes, *fh)
}
Expand Down
3 changes: 2 additions & 1 deletion glow.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func main() {
wg.Wait()

case receiver.FullCommand():
target := r.FindTarget(*receiveFromChanName, *receiverMaster)
rc := r.NewReceiveChannel(*receiveFromChanName, 0)
recvChan, err := rc.GetDirectChannel(*receiverMaster)
recvChan, err := rc.GetDirectChannel(target)
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions io/store/rotating_file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func (l *RotatingFileStore) Destroy() {
// oldLogFiles returns the list of backup log files stored in the same
// directory as the current log file, sorted by ModTime
func (l *RotatingFileStore) listOldLogFiles() ([]logInfo, error) {
os.MkdirAll(l.dir(), 0755)
files, err := ioutil.ReadDir(l.dir())
if err != nil {
return nil, fmt.Errorf("can't read log file directory: %s", err)
Expand Down

0 comments on commit ed47f82

Please sign in to comment.