Skip to content

Commit

Permalink
mumax3-server: use stateless httpfs
Browse files Browse the repository at this point in the history
  • Loading branch information
barnex committed Sep 19, 2014
1 parent f60e271 commit 2baab02
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 37 deletions.
21 changes: 8 additions & 13 deletions cmd/mumax3-server/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package main

import (
"fmt"
"github.com/mumax/3/httpfs"
"github.com/mumax/3/util"
"io"
"log"
"os/exec"
"strings"
"time"

"github.com/mumax/3/httpfs"
"github.com/mumax/3/util"
)

// Runs a compute service on this node, if GPUs are available.
Expand All @@ -20,7 +20,7 @@ func (n *Node) RunComputeService() {
return
}

// stack of available GPU numbers
// queue of available GPU numbers
idle := make(chan int, len(n.GPUs))
for i := range n.GPUs {
idle <- i
Expand Down Expand Up @@ -50,7 +50,7 @@ func (n *Node) RunComputeService() {
n.unlock()

status := runJob(job)
n.FSServer.CloseAll(JobOutputDir(JobInputFile(job.URL)))
//n.FSServer.CloseAll(JobOutputDir(JobInputFile(job.URL)))

// remove from "running" list
n.lock()
Expand Down Expand Up @@ -110,7 +110,7 @@ func (n *Node) KillJob(url string) error {
}

// prepare exec.Cmd to run mumax3 compute process
func makeProcess(inputURL string, gpu int, webAddr string) (*exec.Cmd, *httpfs.File) {
func makeProcess(inputURL string, gpu int, webAddr string) (*exec.Cmd, io.WriteCloser) {
// prepare command
command := *flag_mumax
gpuFlag := fmt.Sprint(`-gpu=`, gpu)
Expand All @@ -120,14 +120,9 @@ func makeProcess(inputURL string, gpu int, webAddr string) (*exec.Cmd, *httpfs.F
cmd := exec.Command(command, gpuFlag, httpFlag, cacheFlag, forceFlag, inputURL)

// Pipe stdout, stderr to log file over httpfs
fs, errFS := httpfs.Dial("http://" + JobHost(inputURL) + "/fs")
if errFS != nil {
log.Println(errFS)
return nil, nil
}
outDir := util.NoExt(JobInputFile(inputURL)) + ".out"
fs.Mkdir(outDir, 0777)
out, errD := fs.Create(outDir + "/stdout.txt")
httpfs.Mkdir(outDir)
out, errD := httpfs.Create(outDir + "/stdout.txt")
if errD != nil {
log.Println(errD)
return nil, nil
Expand Down
10 changes: 6 additions & 4 deletions cmd/mumax3-server/job.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package main

import (
"github.com/mumax/3/util"
"net/url"
"os/exec"
"strings"
"time"

"github.com/mumax/3/util"
)

// compute Job
type Job struct {
URL string // URL of the input file, e.g., http://hostname/fs/user/inputfile.mx3
URL string // URL of the input file, e.g., http://hostname/fs/user/inputfile.mx3
// all of this is cache:
outputURL string // URL of the output directory, access via OutputURL()
Node string // Address of the node that runs/ran this job, if any. E.g.: computenode2:35360
GPU int // GPU number on the compute node that runs/ran this job, if any
Expand Down Expand Up @@ -47,7 +47,9 @@ func (j *Job) Path() string {
return j.URL[len("http://"):]
}

func NewJob(URL string) Job { return Job{URL: URL} }
func NewJob(URL string) Job {
return Job{URL: URL}
}

// Returns how long this job has been running
func (j *Job) Runtime() time.Duration {
Expand Down
4 changes: 1 addition & 3 deletions cmd/mumax3-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func main() {

node = &Node{
Addr: laddr,
RootDir: "./",
upSince: time.Now(),
MumaxVersion: DetectMumax(),
GPUs: DetectGPUs(),
Expand All @@ -57,8 +56,7 @@ func main() {
http.HandleFunc("/call/", node.HandleRPC)
http.HandleFunc("/do/", node.HandleHumanRPC)
http.HandleFunc("/", node.HandleStatus)
node.FSServer = httpfs.NewServer(node.RootDir, "/fs/")
http.Handle("/fs/", node.FSServer)
httpfs.Handle()

go func() {
log.Println("serving at", laddr)
Expand Down
4 changes: 0 additions & 4 deletions cmd/mumax3-server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ import (
"reflect"
"sync"
"time"

"github.com/mumax/3/httpfs"
)

type Node struct {
Addr string // canonical (unique) address of node, read-only
RootDir string // httpfs storage root
MumaxVersion string
upSince time.Time
FSServer *httpfs.Server

// compute service
GPUs []GPU
Expand Down
13 changes: 6 additions & 7 deletions cmd/mumax3-server/que.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"math"
"path"
"strings"
"time"
)
Expand Down Expand Up @@ -76,12 +75,12 @@ func (n *Node) AddJob(fname string) {
defer n.unlock()
//log.Println("Push job:", fname)

if path.IsAbs(fname) {
if !strings.HasPrefix(fname, n.RootDir) {
panic("AddJob " + fname + ": not in root: " + n.RootDir) // TODO: handle gracefully
}
fname = fname[len(n.RootDir):] // strip root prefix
}
// if path.IsAbs(fname) {
// if !strings.HasPrefix(fname, n.RootDir) {
// panic("AddJob " + fname + ": not in root: " + n.RootDir) // TODO: handle gracefully
// }
// fname = fname[len(n.RootDir):] // strip root prefix
// }

split := strings.Split(fname, "/")
first := ""
Expand Down
6 changes: 1 addition & 5 deletions cmd/mumax3-server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,7 @@ Uptime: {{.Uptime}} <br/>
</p>
<h2>HTTPFS service</h2><p>
<b>Storage root:</b> <a href="http://{{.Addr}}/fs/">{{.RootDir}}</a>
<br/><b>Open Files:</b><br/>
{{range .FSServer.LsOF}}
{{.}}<br/>
{{end}}
:-)
</p>
Expand Down
1 change: 0 additions & 1 deletion httpfs/lockserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (l *LockServer) Lock(owner, key string) (ok bool) {
l.leases[key] = &lease{owner: owner, t: now}
return true
}

}

// remove expired leases. a run is linear in the total number of leases,
Expand Down

0 comments on commit 2baab02

Please sign in to comment.