Skip to content

Commit

Permalink
Multipart decoder (clj-commons#411)
Browse files Browse the repository at this point in the history
* Basic implementation for multipart chunks decoder

* Convert in-memory and file upload Netty's attributes to maps

* Correct implementation for reading chunks from body and offering them to decoder

* Get content for file upload attribute

* Prevent destroying to be called twice

* Minor formatting changes

* Test case to cover multipart decoder functionality

* Do not try to get content type and transfer encoding for string attributes

* Remove debug inspection

* Read data from input stream, not only manifold stream

* Test cases to cover both raw and ring handlers

* Rename decoder, optional configuration, do not read file content to memory

* Fix test cases to use new decoder name

* Do not get attribute once again, use locally saved version

* Flexible memory limit configuration

* Reimplement stream processing using s/connect-via

* Release chunk of request body after offering it to a decoder

* Acquire buffer before reading content to the input stream to prevent failure on cleanup

* Rewrite TODO comment
  • Loading branch information
kachayev authored and ztellman committed Nov 7, 2018
1 parent 9f886bd commit 0aef199
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 28 deletions.
148 changes: 125 additions & 23 deletions src/aleph/http/multipart.clj
Original file line number Diff line number Diff line change
@@ -1,29 +1,41 @@
(ns aleph.http.multipart
(:require
[clojure.core :as cc]
[byte-streams :as bs]
[aleph.http.encoding :refer [encode]]
[aleph.netty :as netty])
[clojure.core :as cc]
[byte-streams :as bs]
[aleph.http.encoding :refer [encode]]
[aleph.http.core :as http-core]
[aleph.netty :as netty]
[manifold.stream :as s]
[clojure.tools.logging :as log]
[manifold.deferred :as d])
(:import
[java.util
Locale]
[java.io
File]
[java.nio
ByteBuffer]
[java.nio.charset
Charset]
[java.net
URLConnection]
[io.netty.util.internal
ThreadLocalRandom]
[io.netty.handler.codec.http
DefaultHttpRequest
FullHttpRequest
HttpConstants]
[io.netty.handler.codec.http.multipart
HttpPostRequestEncoder
MemoryAttribute]))
[java.util
Locale]
[java.io
File]
[java.nio
ByteBuffer]
[java.nio.charset
Charset]
[java.net
URLConnection]
[io.netty.util.internal
ThreadLocalRandom]
[io.netty.handler.codec.http
DefaultHttpContent
DefaultHttpRequest
FullHttpRequest
HttpConstants]
[io.netty.handler.codec.http.multipart
Attribute
MemoryAttribute
FileUpload
HttpDataFactory
DefaultHttpDataFactory
HttpPostRequestDecoder
HttpPostRequestEncoder
InterfaceHttpData
InterfaceHttpData$HttpDataType]))

(defn boundary []
(-> (ThreadLocalRandom/current) .nextLong Long/toHexString .toLowerCase))
Expand Down Expand Up @@ -146,3 +158,93 @@
(.addBodyHttpData encoder attr))))
(let [req' (.finalizeRequest encoder)]
[req' (when (.isChunked encoder) encoder)])))

(defmulti http-data->map
(fn [^InterfaceHttpData data]
(.getHttpDataType data)))

(defmethod http-data->map InterfaceHttpData$HttpDataType/Attribute
[^Attribute attr]
(let [content (.getValue attr)]
{:part-name (.getName attr)
:content content
:name nil
:charset (-> attr .getCharset .toString)
:mime-type nil
:transfer-encoding nil
:memory? (.isInMemory attr)
:file? false
:file nil
:size (count content)}))

(defmethod http-data->map InterfaceHttpData$HttpDataType/FileUpload
[^FileUpload data]
(let [memory? (.isInMemory data)]
{:part-name (.getName data)
:content (when memory?
(bs/to-input-stream (netty/acquire (.content data))))
:name (.getFilename data)
:charset (-> data .getCharset .toString)
:mime-type (.getContentType data)
:transfer-encoding (.getContentTransferEncoding data)
:memory? memory?
:file? true
:file (when-not memory? (.getFile data))
:size (.length data)}))

(defn- read-attributes [^HttpPostRequestDecoder decoder parts]
(while (.hasNext decoder)
(s/put! parts (http-data->map (.next decoder)))))

(defn decode-request
"Takes a ring request and returns a manifold stream which yields
parts of the mutlipart/form-data encoded body. In case the size of
a part content exceeds `:memory-limit` limit (16KB by default),
corresponding payload would be written to a temp file. Check `:memory?`
flag to know whether content might be read directly from `:content` or
should be fetched from the file specified in `:file`.
Note, that if your handler works with multipart requests only,
it's better to set `:raw-stream?` to `true` to avoid additional
input stream coercion."
([req] (decode-request req {}))
([{:keys [body] :as req}
{:keys [body-buffer-size
memory-limit]
:or {body-buffer-size 65536
memory-limit DefaultHttpDataFactory/MINSIZE}}]
(let [body (if (s/stream? body)
body
(netty/to-byte-buf-stream body body-buffer-size))
destroyed? (atom false)
req' (http-core/ring-request->netty-request req)
factory (DefaultHttpDataFactory. (long memory-limit))
decoder (HttpPostRequestDecoder. factory req')
parts (s/stream)]

;; on each HttpContent chunk, put it into the decoder
;; and resume our attempts to get the next attribute available
(s/connect-via
body
(fn [chunk]
(let [content (DefaultHttpContent. chunk)]
(.offer decoder content)
(read-attributes decoder parts)
;; note, that releasing chunk right here relies on
;; the internals of the decoder. in case those
;; internal are changed in future, this flow of
;; manipulations should be also reconsidered
(netty/release chunk)
(d/success-deferred true)))
parts)

(s/on-closed
parts
(fn []
(when (compare-and-set! destroyed? false true)
(try
(.destroy decoder)
(catch Exception e
(log/warn e "exception when cleaning up multipart decoder"))))))

parts)))
66 changes: 61 additions & 5 deletions test/aleph/http/multipart_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
[aleph.http :as http]
[aleph.http.multipart :as mp]
[byte-streams :as bs]
[manifold.deferred :as d])
[manifold.deferred :as d]
[manifold.stream :as s]
[clojure.string :as str]
[clojure.edn :as edn])
(:import
[java.io
File]))
Expand Down Expand Up @@ -113,8 +116,12 @@
(is (.contains body-str "Content-Type: application/png\r\n"))
(is (.contains body-str "Content-Transfer-Encoding: base64\r\n"))))

(def port 26003)
(def url (str "http://localhost:" port))
(def port1 26023)
(def port2 26024)
(def port3 26025)
(def url1 (str "http://localhost:" port1))
(def url2 (str "http://localhost:" port2))
(def url3 (str "http://localhost:" port3))

(def parts [{:part-name "#0-string"
:content "CONTENT1"}
Expand All @@ -137,9 +144,9 @@
:body body})

(deftest test-send-multipart-request
(let [s (http/start-server echo-handler {:port port})
(let [s (http/start-server echo-handler {:port port1})
^String resp @(d/chain'
(http/post url {:multipart parts})
(http/post url1 {:multipart parts})
:body
bs/to-string)]
;; part names
Expand All @@ -161,3 +168,52 @@
(is (.contains resp "filename=\"text-file-to-send.txt\""))

(.close ^java.io.Closeable s)))

(defn- pack-chunk [{:keys [content] :as chunk}]
(cond-> (dissoc chunk :file)
(not (string? content))
(dissoc :content)))

(defn- decode-handler [req]
(let [chunks (-> req
mp/decode-request
s/stream->seq)]
{:status 200
:body (pr-str (map pack-chunk chunks))}))

(defn- test-decoder [port url raw-stream?]
(let [s (http/start-server decode-handler {:port port
:raw-stream? raw-stream?})
chunks (-> (http/post url {:multipart parts})
(deref 1e3 {:body "timeout"})
:body
bs/to-string
clojure.edn/read-string
vec)]
(is (= 6 (count chunks)))

;; part-names
(is (= (map :part-name parts)
(map :part-name chunks)))

;; content
(is (= "CONTENT1" (get-in chunks [0 :content])))

;; mime type
(is (= "text/plain" (get-in chunks [2 :mime-type])))
(is (= "application/png" (get-in chunks [3 :mime-type])))

;; filename
(is (= "file.txt" (get-in chunks [3 :name])))
(is (= "text-file-to-send.txt" (get-in chunks [4 :name])))

;; charset
(is (= "ISO-8859-1" (get-in chunks [5 :charset])))

(.close ^java.io.Closeable s)))

(deftest test-mutlipart-request-decode-with-ring-handler
(test-decoder port2 url2 false))

(deftest test-mutlipart-request-decode-with-raw-handler
(test-decoder port3 url3 true))

0 comments on commit 0aef199

Please sign in to comment.