Skip to content

Commit

Permalink
FS: added support for byte range requests
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed Jan 5, 2016
1 parent 4bca54c commit 8a83396
Show file tree
Hide file tree
Showing 5 changed files with 414 additions and 49 deletions.
233 changes: 184 additions & 49 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ type FS struct {
// Transparent compression is disabled by default.
Compress bool

// Enables byte range requests if set to true.
//
// Byte range requests are disabled by default.
AcceptByteRange bool

// Path rewriting function.
//
// By default request path is not modified.
Expand Down Expand Up @@ -150,6 +155,7 @@ func FSHandler(root string, stripSlashes int) RequestHandler {
Root: root,
IndexNames: []string{"index.html"},
GenerateIndexPages: true,
AcceptByteRange: true,
PathRewrite: NewPathSlashesStripper(stripSlashes),
}
return fs.NewRequestHandler()
Expand Down Expand Up @@ -193,6 +199,7 @@ func (fs *FS) NewRequestHandler() RequestHandler {
pathRewrite: fs.PathRewrite,
generateIndexPages: fs.GenerateIndexPages,
compress: fs.Compress,
acceptByteRange: fs.AcceptByteRange,
cacheDuration: cacheDuration,
cache: make(map[string]*fsFile),
compressedCache: make(map[string]*fsFile),
Expand All @@ -215,6 +222,7 @@ type fsHandler struct {
pathRewrite PathRewriteFunc
generateIndexPages bool
compress bool
acceptByteRange bool
cacheDuration time.Duration

cache map[string]*fsFile
Expand Down Expand Up @@ -257,14 +265,16 @@ func (ff *fsFile) smallFileReader() io.Reader {
v := ff.h.smallFileReaderPool.Get()
if v == nil {
r := &fsSmallFileReader{
ff: ff,
ff: ff,
endPos: ff.contentLength,
}
return r
}
r := v.(*fsSmallFileReader)
r.ff = ff
if r.offset > 0 {
panic("BUG: fsSmallFileReader with non-nil offset found in the pool")
r.endPos = ff.contentLength
if r.startPos > 0 {
panic("BUG: fsSmallFileReader with non-nil startPos found in the pool")
}
return r
}
Expand Down Expand Up @@ -302,6 +312,7 @@ func (ff *fsFile) bigFileReader() (io.Reader, error) {
return &bigFileReader{
f: f,
ff: ff,
r: f,
}, nil
}

Expand Down Expand Up @@ -333,23 +344,36 @@ func (ff *fsFile) decReadersCount() {
type bigFileReader struct {
f *os.File
ff *fsFile
r io.Reader
lr io.LimitedReader
}

func (r *bigFileReader) UpdateByteRange(startPos, endPos int) error {
if _, err := r.f.Seek(int64(startPos), 0); err != nil {
return err
}
r.r = &r.lr
r.lr.R = r.f
r.lr.N = int64(endPos - startPos + 1)
return nil
}

func (r *bigFileReader) Read(p []byte) (int, error) {
return r.f.Read(p)
return r.r.Read(p)
}

func (r *bigFileReader) WriteTo(w io.Writer) (int64, error) {
if rf, ok := w.(io.ReaderFrom); ok {
// fast path. Senfile must be triggered
return rf.ReadFrom(r.f)
return rf.ReadFrom(r.r)
}

// slow path
return copyZeroAlloc(w, r.f)
return copyZeroAlloc(w, r.r)
}

func (r *bigFileReader) Close() error {
r.r = r.f
n, err := r.f.Seek(0, 0)
if err == nil {
if n != 0 {
Expand All @@ -368,74 +392,93 @@ func (r *bigFileReader) Close() error {
}

type fsSmallFileReader struct {
ff *fsFile
offset int64
ff *fsFile
startPos int
endPos int
}

func (r *fsSmallFileReader) Close() error {
ff := r.ff
ff.decReadersCount()
r.ff = nil
r.offset = 0
r.startPos = 0
r.endPos = 0
ff.h.smallFileReaderPool.Put(r)
return nil
}

func (r *fsSmallFileReader) UpdateByteRange(startPos, endPos int) error {
r.startPos = startPos
r.endPos = endPos + 1
return nil
}

func (r *fsSmallFileReader) Read(p []byte) (int, error) {
ff := r.ff
tailLen := r.endPos - r.startPos
if tailLen <= 0 {
return 0, io.EOF
}
if len(p) > tailLen {
p = p[:tailLen]
}

ff := r.ff
if ff.f != nil {
n, err := ff.f.ReadAt(p, r.offset)
r.offset += int64(n)
n, err := ff.f.ReadAt(p, int64(r.startPos))
r.startPos += n
return n, err
}

if r.offset == int64(len(ff.dirIndex)) {
return 0, io.EOF
}
n := copy(p, ff.dirIndex[r.offset:])
r.offset += int64(n)
n := copy(p, ff.dirIndex[r.startPos:])
r.startPos += n
return n, nil
}

func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) {
if r.offset != 0 {
panic("BUG: non-zero offset! Read() mustn't be called before WriteTo()")
if r.startPos != 0 {
panic("BUG: non-zero startPos! Read() mustn't be called before WriteTo()")
}

ff := r.ff

var n int
var err error
if ff.f != nil {
if rf, ok := w.(io.ReaderFrom); ok {
return rf.ReadFrom(r)
}

bufv := copyBufPool.Get()
buf := bufv.([]byte)
for err != nil {
n, err = ff.f.ReadAt(buf, r.offset)
nw, errw := w.Write(buf[:n])
r.offset += int64(nw)
if errw == nil && nw != n {
panic("BUG: Write(p) returned (n, nil), where n != len(p)")
}
if err == nil {
err = errw
}
}
copyBufPool.Put(bufv)
if ff.f == nil {
n, err = w.Write(ff.dirIndex[r.startPos:r.endPos])
return int64(n), err
}

if err == io.EOF {
err = nil
if rf, ok := w.(io.ReaderFrom); ok {
return rf.ReadFrom(r)
}

curPos := r.startPos
bufv := copyBufPool.Get()
buf := bufv.([]byte)
for err != nil {
tailLen := r.endPos - curPos
if tailLen <= 0 {
break
}
if len(buf) > tailLen {
buf = buf[:tailLen]
}
n, err = ff.f.ReadAt(buf, int64(curPos))
nw, errw := w.Write(buf[:n])
curPos += nw
if errw == nil && nw != n {
panic("BUG: Write(p) returned (n, nil), where n != len(p)")
}
if err == nil {
err = errw
}
return r.offset, err
}
copyBufPool.Put(bufv)

n, err = w.Write(ff.dirIndex)
r.offset += int64(n)
return r.offset, err
if err == io.EOF {
err = nil
}
return int64(curPos - r.startPos), err
}

func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile {
Expand Down Expand Up @@ -502,7 +545,8 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {

mustCompress := false
fileCache := h.cache
if h.compress && ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
byteRange := ctx.Request.Header.peek(strRange)
if len(byteRange) == 0 && h.compress && ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
mustCompress = true
fileCache = h.compressedCache
}
Expand Down Expand Up @@ -570,13 +614,104 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
return
}

hdr := &ctx.Response.Header
if ff.compressed {
ctx.Response.Header.SetCanonical(strContentEncoding, strGzip)
hdr.SetCanonical(strContentEncoding, strGzip)
}
ctx.Response.Header.SetCanonical(strLastModified, ff.lastModifiedStr)
ctx.SetBodyStream(r, ff.contentLength)

statusCode := StatusOK
contentLength := ff.contentLength
if h.acceptByteRange {
hdr.SetCanonical(strAcceptRanges, strBytes)
if len(byteRange) > 0 {
startPos, endPos, err := ParseByteRange(byteRange, contentLength)
if err != nil {
r.(io.Closer).Close()
ctx.Logger().Printf("cannot parse byte range %q for path=%q: %s", byteRange, path, err)
ctx.Error("Range Not Satisfiable", StatusRequestedRangeNotSatisfiable)
return
}

if err = r.(byteRangeUpdater).UpdateByteRange(startPos, endPos); err != nil {
r.(io.Closer).Close()
ctx.Logger().Printf("cannot seek byte range %q for path=%q: %s", byteRange, path, err)
ctx.Error("Internal Server Error", StatusInternalServerError)
return
}

hdr.SetContentRange(startPos, endPos, contentLength)
contentLength = endPos - startPos + 1
statusCode = StatusPartialContent
}
}

hdr.SetCanonical(strLastModified, ff.lastModifiedStr)
ctx.SetBodyStream(r, contentLength)
ctx.SetContentType(ff.contentType)
ctx.SetStatusCode(StatusOK)
ctx.SetStatusCode(statusCode)
}

type byteRangeUpdater interface {
UpdateByteRange(startPos, endPos int) error
}

var (
errUnsupportedRangeUnits = errors.New("unsupported range units")
errInvalidByteRange = errors.New("invalid byte range")
)

// ParseByteRange parses 'Range: bytes=...' header value.
func ParseByteRange(byteRange []byte, contentLength int) (startPos, endPos int, err error) {
b := byteRange
if !bytes.HasPrefix(b, strBytes) {
return 0, 0, errUnsupportedRangeUnits
}

b = b[len(strBytes):]
if len(b) == 0 || b[0] != '=' {
return 0, 0, errInvalidByteRange
}
b = b[1:]

n := bytes.IndexByte(b, '-')
if n < 0 {
return 0, 0, errInvalidByteRange
}

if n == 0 {
v, err := ParseUint(b[n+1:])
if err != nil {
return 0, 0, err
}
startPos := contentLength - v
if startPos < 0 {
return 0, 0, errInvalidByteRange
}
return startPos, contentLength - 1, nil
}

if startPos, err = ParseUint(b[:n]); err != nil {
return 0, 0, err
}
if startPos >= contentLength {
return 0, 0, errInvalidByteRange
}

b = b[n+1:]
if len(b) == 0 {
return startPos, contentLength - 1, nil
}

if endPos, err = ParseUint(b); err != nil {
return 0, 0, err
}
if endPos >= contentLength {
return 0, 0, errInvalidByteRange
}
if endPos < startPos {
return 0, 0, errInvalidByteRange
}
return startPos, endPos, nil
}

func (h *fsHandler) openIndexFile(ctx *RequestCtx, dirPath string, mustCompress bool) (*fsFile, error) {
Expand Down
Loading

0 comments on commit 8a83396

Please sign in to comment.