Skip to content

Commit

Permalink
http/file API to send region of the file (clj-commons#485)
Browse files Browse the repository at this point in the history
* Define public API http/file and internal representation

* Reimplement sender side to use internal coercion

* Do not use internal Netty constant

* Fix typo

* Set correct value for content-length header

* Remove unused imports

* Avoid reflection when creating random access file

* Test cases to cover file regions

* More implementation to core.clj to reuse when coercing message

* bs converter for HttpFile

* Mark failed option when running ssl? tests

* Fix issue with offset & size calculation when converting HttpFile to seq of ByteBuffers

* Convenient print representation for aleph.http.core.HttpFile object

* Rename new-http-file -> http-file

* Avoid reflection warnings for print method
  • Loading branch information
kachayev authored and ztellman committed Apr 19, 2019
1 parent bedc468 commit 28f931a
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 41 deletions.
56 changes: 34 additions & 22 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
(ns aleph.http
(:refer-clojure :exclude [get])
(:require
[clojure.string :as str]
[manifold.deferred :as d]
[manifold.executor :as executor]
[manifold.stream :as s]
[aleph.flow :as flow]
[aleph.http
[server :as server]
[client :as client]
[client-middleware :as middleware]
[core :as http-core]]
[aleph.netty :as netty])
[clojure.string :as str]
[manifold.deferred :as d]
[manifold.executor :as executor]
[manifold.stream :as s]
[aleph.flow :as flow]
[aleph.http
[server :as server]
[client :as client]
[client-middleware :as middleware]
[core :as http-core]]
[aleph.netty :as netty]
[clojure.java.io :as io])
(:import
[io.aleph.dirigiste Pools]
[aleph.utils
PoolTimeoutException
ConnectionTimeoutException
RequestTimeoutException
ReadTimeoutException]
[java.net
URI
InetSocketAddress]
[java.util.concurrent
TimeoutException]))
[io.aleph.dirigiste Pools]
[aleph.utils
PoolTimeoutException
ConnectionTimeoutException
RequestTimeoutException
ReadTimeoutException]
[java.net
URI
InetSocketAddress]
[java.util.concurrent
TimeoutException]))

(defn start-server
"Starts an HTTP server using the provided Ring `handler`. Returns a server object which can be stopped
Expand Down Expand Up @@ -436,3 +437,14 @@
(let [response (d/deferred)]
(handler request #(d/success! response %) #(d/error! response %))
response)))

(defn file
"Specifies a file or a region of the file to be sent over the network.
Accepts string path to the file, instance of `java.io.File` or instance of
`java.nio.file.Path`."
([path]
(http-core/http-file path nil nil nil))
([path offset length]
(http-core/http-file path offset length nil))
([path offset length chunk-size]
(http-core/http-file path offset length chunk-size)))
138 changes: 123 additions & 15 deletions src/aleph/http/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
[clojure.set :as set]
[clojure.string :as str]
[byte-streams :as bs]
[potemkin :as p])
[byte-streams.graph :as g]
[potemkin :as p]
[clojure.java.io :as io])
(:import
[io.netty.channel
Channel
Expand Down Expand Up @@ -36,7 +38,9 @@
IdleStateEvent
IdleStateHandler]
[io.netty.handler.stream
ChunkedInput ChunkedFile ChunkedWriteHandler]
ChunkedInput
ChunkedFile
ChunkedWriteHandler]
[io.netty.handler.codec.http.websocketx
WebSocketFrame
PingWebSocketFrame
Expand All @@ -47,6 +51,10 @@
File
RandomAccessFile
Closeable]
[java.nio.file Path]
[java.nio.channels
FileChannel
FileChannel$MapMode]
[java.util.concurrent
ConcurrentHashMap
ConcurrentLinkedQueue
Expand Down Expand Up @@ -331,34 +339,128 @@

(netty/write-and-flush ch empty-last-content)))

(defn send-chunked-file [ch ^HttpMessage msg ^File file]
(let [raf (RandomAccessFile. file "r")
len (.length raf)
ci (HttpChunkedInput. (ChunkedFile. raf))]
(try-set-content-length! msg len)
(def default-chunk-size 8192)

(deftype HttpFile [^File fd ^long offset ^long length ^long chunk-size])

(defmethod print-method HttpFile [^HttpFile file ^java.io.Writer w]
(.write w (format "HttpFile[fd:%s offset:%s length:%s]"
(.-fd file)
(.-offset file)
(.-length file))))

(defn http-file
([path]
(http-file path nil nil default-chunk-size))
([path offset length]
(http-file path offset length default-chunk-size))
([path offset length chunk-size]
(let [^File
fd (cond
(string? path)
(io/file path)

(instance? File path)
path

(instance? Path path)
(.toFile ^Path path)

:else
(throw
(IllegalArgumentException.
(str "cannot conver " (class path) " to file, "
"expected either string, java.io.File "
"or java.nio.file.Path"))))
region? (or (some? offset) (some? length))]
(when-not (.exists fd)
(throw
(IllegalArgumentException.
(str fd " file does not exist"))))

(when (.isDirectory fd)
(throw
(IllegalArgumentException.
(str fd " is a directory, file expected"))))

(when (and region? (not (<= 0 offset)))
(throw
(IllegalArgumentException.
"offset of the region should be 0 or greater")))

(when (and region? (not (pos? length)))
(throw
(IllegalArgumentException.
"length of the region should be greater than 0")))

(let [len (.length fd)
[p c] (if region?
[offset length]
[0 len])
chunk-size (or chunk-size default-chunk-size)]
(when (and region? (< len (+ offset length)))
(throw
(IllegalArgumentException.
"the region exceeds the size of the file")))

(HttpFile. fd p c chunk-size)))))

(bs/def-conversion ^{:cost 0} [HttpFile (bs/seq-of ByteBuffer)]
[file {:keys [chunk-size writable?]
:or {chunk-size (int default-chunk-size)
writable? false}}]
(let [^RandomAccessFile raf (RandomAccessFile. ^File (.-fd file)
(if writable? "rw" "r"))
^FileChannel fc (.getChannel raf)
end-offset (+ (.-offset file) (.-length file))
buf-seq (fn buf-seq [offset]
(when-not (<= end-offset offset)
(let [remaining (- end-offset offset)]
(lazy-seq
(cons
(.map fc
(if writable?
FileChannel$MapMode/READ_WRITE
FileChannel$MapMode/READ_ONLY)
offset
(min remaining chunk-size))
(buf-seq (+ offset chunk-size)))))))]
(g/closeable-seq
(buf-seq (.-offset file))
false
#(do
(.close raf)
(.close fc)))))

(defn send-chunked-file [ch ^HttpMessage msg ^HttpFile file]
(let [raf (RandomAccessFile. ^File (.-fd file) "r")
cf (ChunkedFile. raf
(.-offset file)
(.-length file)
(.-chunk-size file))]
(try-set-content-length! msg (.-length file))
(netty/write ch msg)
(netty/write-and-flush ch ci)))
(netty/write-and-flush ch (HttpChunkedInput. cf))))

(defn send-chunked-body [ch ^HttpMessage msg ^ChunkedInput body]
(netty/write ch msg)
(netty/write-and-flush ch body))

(defn send-file-region [ch ^HttpMessage msg ^File file]
(let [raf (RandomAccessFile. file "r")
len (.length raf)
(defn send-file-region [ch ^HttpMessage msg ^HttpFile file]
(let [raf (RandomAccessFile. ^File (.-fd file) "r")
fc (.getChannel raf)
fr (DefaultFileRegion. fc 0 len)]
(try-set-content-length! msg len)
fr (DefaultFileRegion. fc (.-offset file) (.-length file))]
(try-set-content-length! msg (.-length file))
(netty/write ch msg)
(netty/write ch fr)
(netty/write-and-flush ch empty-last-content)))

(defn send-file-body [ch ssl? ^HttpMessage msg ^File file]
(defn send-file-body [ch ssl? ^HttpMessage msg ^HttpFile file]
(cond
ssl?
(send-streaming-body ch msg
(-> file
(bs/to-byte-buffers {:chunk-size 1e6})
(bs/to-byte-buffers {:chunk-size (.-chunk-size file)})
s/->source))

(chunked-writer-enabled? ch)
Expand Down Expand Up @@ -415,6 +517,12 @@
(send-chunked-body ch msg body)

(instance? File body)
(send-file-body ch ssl? msg (http-file body))

(instance? Path body)
(send-file-body ch ssl? msg (http-file body))

(instance? HttpFile body)
(send-file-body ch ssl? msg body)

:else
Expand Down
25 changes: 21 additions & 4 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@

(def port 8082)

(def filepath (str (System/getProperty "user.dir") "/test/file.txt"))

(def string-response "String!")
(def seq-response (map identity ["sequence: " 1 " two " 3.0]))
(def file-response (File. (str (System/getProperty "user.dir") "/test/file.txt")))
(def file-response (File. filepath))
(def http-file-response (http/file filepath))
(def http-file-region-response (http/file filepath 5 4))
(def stream-response "Stream!")

(defn string-handler [request]
Expand All @@ -70,6 +74,14 @@
{:status 200
:body file-response})

(defn http-file-handler [request]
{:status 200
:body http-file-response})

(defn http-file-region-handler [request]
{:status 200
:body http-file-region-response})

(defn stream-handler [request]
{:status 200
:body (bs/to-input-stream stream-response)})
Expand Down Expand Up @@ -122,6 +134,8 @@
"/stream" stream-handler
"/slow" slow-handler
"/file" file-handler
"/httpfile" http-file-handler
"/httpfileregion" http-file-region-handler
"/manifold" manifold-handler
"/seq" seq-handler
"/string" string-handler
Expand All @@ -146,6 +160,8 @@
"stream" stream-response
"manifold" stream-response
"file" "this is a file"
"httpfile" "this is a file"
"httpfileregion" "is a"
"seq" (apply str seq-response)]
(repeat 10)
(apply concat)
Expand Down Expand Up @@ -212,10 +228,11 @@
(with-ssl-handler basic-handler
(doseq [[index [path result]] (map-indexed vector expected-results)]
(is
(= result
(= result
(bs/to-string
(:body
@(http-get (str "https://localhost:" port "/" path)))))))))
(:body
@(http-get (str "https://localhost:" port "/" path)))))
(str path "path failed")))))

(def words (slurp "/usr/share/dict/words"))

Expand Down

0 comments on commit 28f931a

Please sign in to comment.