Skip to content

Commit

Permalink
httpfs: append handler with size check
Browse files Browse the repository at this point in the history
  • Loading branch information
barnex committed Jan 7, 2015
1 parent c63a982 commit 8e28923
Showing 1 changed file with 61 additions and 29 deletions.
90 changes: 61 additions & 29 deletions httpfs/httpfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
)

var (
Logging = false // enables logging
Logging = true // enables logging
wd = "" // working directory, see SetWD
lock sync.Mutex // synchronous local FS access to avoid races
)
Expand Down Expand Up @@ -87,15 +88,22 @@ func Read(URL string) ([]byte, error) {
}
}

func Append(URL string, p []byte) error {
// Append p to the file given by URL,
// but first assure that the file had the expected size.
// Used to avoid accidental concurrent writes by two processes to the same file.
func AppendSize(URL string, p []byte, size int64) error {
URL = cleanup(URL)
if isRemote(URL) {
return httpAppend(URL, p)
return httpAppend(URL, p, size)
} else {
return localAppend(URL, p)
return localAppend(URL, p, size)
}
}

func Append(URL string, p []byte) error {
return AppendSize(URL, p, -1)
}

func Put(URL string, p []byte) error {
URL = cleanup(URL)
if isRemote(URL) {
Expand Down Expand Up @@ -151,58 +159,64 @@ func Handle() {
}

// general handler func for file name, input data and response writer.
type handlerFunc func(fname string, data []byte, w io.Writer) error
type handlerFunc func(fname string, data []byte, w io.Writer, query url.Values) error

func newHandler(prefix action, f handlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

//log.Println("SLEEPING")
//time.Sleep(50 * time.Millisecond)

//defer r.Body.Close()
fname := r.URL.Path[len(prefix)+2:] // strip "/prefix/"
query := r.URL.Query()
data, err := ioutil.ReadAll(r.Body)

Log("httpfs req:", prefix, fname, len(data), "B payload")
Log("httpfs req:", prefix, fname, query.Encode(), len(data), "B payload")

if err != nil {
Log("httpfs err:", prefix, fname, ":", err)
http.Error(w, err.Error(), http.StatusBadRequest)
}

err2 := f(fname, data, w)
err2 := f(fname, data, w, query)
if err2 != nil {
Log("httpfs err:", prefix, fname, ":", err2)
http.Error(w, err2.Error(), http.StatusInternalServerError)
}
}
}

func handleAppend(fname string, data []byte, w io.Writer) error {
return localAppend(fname, data)
func handleAppend(fname string, data []byte, w io.Writer, q url.Values) error {
size := int64(-1)
s := q.Get("size")
if s != "" {
var err error
size, err = strconv.ParseInt(s, 0, 64)
if err != nil {
return err
}
}
return localAppend(fname, data, size)
}

func handlePut(fname string, data []byte, w io.Writer) error {
func handlePut(fname string, data []byte, w io.Writer, q url.Values) error {
return localPut(fname, data)
}

func handleLs(fname string, data []byte, w io.Writer) error {
func handleLs(fname string, data []byte, w io.Writer, q url.Values) error {
ls, err := localLs(fname)
if err != nil {
return err
}
return json.NewEncoder(w).Encode(ls)
}

func handleMkdir(fname string, data []byte, w io.Writer) error {
func handleMkdir(fname string, data []byte, w io.Writer, q url.Values) error {
return localMkdir(fname)
}

func handleTouch(fname string, data []byte, w io.Writer) error {
func handleTouch(fname string, data []byte, w io.Writer, q url.Values) error {
return localTouch(fname)
}

func handleRead(fname string, data []byte, w io.Writer) error {
func handleRead(fname string, data []byte, w io.Writer, q url.Values) error {
b, err := localRead(fname)
if err != nil {
return err
Expand All @@ -211,13 +225,15 @@ func handleRead(fname string, data []byte, w io.Writer) error {
return err2
}

func handleRemove(fname string, data []byte, w io.Writer) error {
func handleRemove(fname string, data []byte, w io.Writer, q url.Values) error {
return localRemove(fname)
}

func do(a action, URL string, body []byte) (resp []byte, err error) {
// TODO: query values
func do(a action, URL string, body []byte, query url.Values) (resp []byte, err error) {
u, err := url.Parse(URL)
u.Path = string(a) + path.Clean("/"+u.Path)
u.RawQuery = query.Encode()
response, errR := http.Post(u.String(), "data", bytes.NewReader(body))
if errR != nil {
return nil, mkErr(a, URL, errR)
Expand Down Expand Up @@ -251,17 +267,17 @@ func mkErr(a action, URL string, err error) error {
// client-side, remote server

func httpMkdir(URL string) error {
_, err := do(MKDIR, URL, nil)
_, err := do(MKDIR, URL, nil, nil)
return err
}

func httpTouch(URL string) error {
_, err := do(TOUCH, URL, nil)
_, err := do(TOUCH, URL, nil, nil)
return err
}

func httpLs(URL string) (ls []string, err error) {
r, errHTTP := do(LS, URL, nil)
r, errHTTP := do(LS, URL, nil, nil)
if errHTTP != nil {
return nil, errHTTP
}
Expand All @@ -272,22 +288,26 @@ func httpLs(URL string) (ls []string, err error) {
return ls, nil
}

func httpAppend(URL string, data []byte) error {
_, err := do(APPEND, URL, data)
func httpAppend(URL string, data []byte, size int64) error {
var query map[string][]string
if size >= 0 {
query = map[string][]string{"size": {fmt.Sprint(size)}}
}
_, err := do(APPEND, URL, data, query)
return err
}

func httpPut(URL string, data []byte) error {
_, err := do(PUT, URL, data)
_, err := do(PUT, URL, data, nil)
return err
}

func httpRead(URL string) ([]byte, error) {
return do(READ, URL, nil)
return do(READ, URL, nil, nil)
}

func httpRemove(URL string) error {
_, err := do(RM, URL, nil)
_, err := do(RM, URL, nil, nil)
return err
}

Expand Down Expand Up @@ -323,14 +343,26 @@ func localLs(fname string) ([]string, error) {
return ls, nil
}

func localAppend(fname string, data []byte) error {
func localAppend(fname string, data []byte, size int64) error {
lock.Lock()
defer lock.Unlock()

f, err := os.OpenFile(fname, os.O_APPEND|os.O_WRONLY, FilePerm)
if err != nil {
return err
}

if size >= 0 {
fi, errFi := f.Stat()
if errFi != nil {
return errFi
}

if size != fi.Size() {
return fmt.Errorf(`httpfs: file size mismatch, possible concurrent access. size=%v B, expected=%v B`, fi.Size(), size)
}
}

defer f.Close()
_, err2 := f.Write(data)
return err2
Expand Down

0 comments on commit 8e28923

Please sign in to comment.