From 0aef19946ba526b486a1d59cc7e53d1f9111487f Mon Sep 17 00:00:00 2001 From: Oleksii Kachaiev Date: Wed, 7 Nov 2018 22:31:54 +0200 Subject: [PATCH] Multipart decoder (#411) * Basic implementation for multipart chunks decoder * Convert in-memory and file upload Netty's attributes to maps * Correct implementation for reading chunks from body and offering them to decoder * Get content for file upload attribute * Prevent destroying to be called twice * Minor formatting changes * Test case to cover multipart decoder functionality * Do not try to get content type and transfer encoding for string attributes * Remove debug inspection * Read data from input stream, not only manifold stream * Test cases to cover both raw and ring handlers * Rename decoder, optional configuration, do not read file content to memory * Fix test cases to use new decoder name * Do not get attribute once again, use locally saved version * Flexible memory limit configuration * Reimplement stream processing using s/connect-via * Release chunk of request body after offering it to a decoder * Acquire buffer before reading content to the input stream to prevent failure on cleanup * Rewrite TODO comment --- src/aleph/http/multipart.clj | 148 ++++++++++++++++++++++++----- test/aleph/http/multipart_test.clj | 66 ++++++++++++- 2 files changed, 186 insertions(+), 28 deletions(-) diff --git a/src/aleph/http/multipart.clj b/src/aleph/http/multipart.clj index dd6ac125..8e531ffa 100644 --- a/src/aleph/http/multipart.clj +++ b/src/aleph/http/multipart.clj @@ -1,29 +1,41 @@ (ns aleph.http.multipart (:require - [clojure.core :as cc] - [byte-streams :as bs] - [aleph.http.encoding :refer [encode]] - [aleph.netty :as netty]) + [clojure.core :as cc] + [byte-streams :as bs] + [aleph.http.encoding :refer [encode]] + [aleph.http.core :as http-core] + [aleph.netty :as netty] + [manifold.stream :as s] + [clojure.tools.logging :as log] + [manifold.deferred :as d]) (:import - [java.util - Locale] - [java.io - File] - [java.nio - ByteBuffer] - [java.nio.charset - Charset] - [java.net - URLConnection] - [io.netty.util.internal - ThreadLocalRandom] - [io.netty.handler.codec.http - DefaultHttpRequest - FullHttpRequest - HttpConstants] - [io.netty.handler.codec.http.multipart - HttpPostRequestEncoder - MemoryAttribute])) + [java.util + Locale] + [java.io + File] + [java.nio + ByteBuffer] + [java.nio.charset + Charset] + [java.net + URLConnection] + [io.netty.util.internal + ThreadLocalRandom] + [io.netty.handler.codec.http + DefaultHttpContent + DefaultHttpRequest + FullHttpRequest + HttpConstants] + [io.netty.handler.codec.http.multipart + Attribute + MemoryAttribute + FileUpload + HttpDataFactory + DefaultHttpDataFactory + HttpPostRequestDecoder + HttpPostRequestEncoder + InterfaceHttpData + InterfaceHttpData$HttpDataType])) (defn boundary [] (-> (ThreadLocalRandom/current) .nextLong Long/toHexString .toLowerCase)) @@ -146,3 +158,93 @@ (.addBodyHttpData encoder attr)))) (let [req' (.finalizeRequest encoder)] [req' (when (.isChunked encoder) encoder)]))) + +(defmulti http-data->map + (fn [^InterfaceHttpData data] + (.getHttpDataType data))) + +(defmethod http-data->map InterfaceHttpData$HttpDataType/Attribute + [^Attribute attr] + (let [content (.getValue attr)] + {:part-name (.getName attr) + :content content + :name nil + :charset (-> attr .getCharset .toString) + :mime-type nil + :transfer-encoding nil + :memory? (.isInMemory attr) + :file? false + :file nil + :size (count content)})) + +(defmethod http-data->map InterfaceHttpData$HttpDataType/FileUpload + [^FileUpload data] + (let [memory? (.isInMemory data)] + {:part-name (.getName data) + :content (when memory? + (bs/to-input-stream (netty/acquire (.content data)))) + :name (.getFilename data) + :charset (-> data .getCharset .toString) + :mime-type (.getContentType data) + :transfer-encoding (.getContentTransferEncoding data) + :memory? memory? + :file? true + :file (when-not memory? (.getFile data)) + :size (.length data)})) + +(defn- read-attributes [^HttpPostRequestDecoder decoder parts] + (while (.hasNext decoder) + (s/put! parts (http-data->map (.next decoder))))) + +(defn decode-request + "Takes a ring request and returns a manifold stream which yields + parts of the mutlipart/form-data encoded body. In case the size of + a part content exceeds `:memory-limit` limit (16KB by default), + corresponding payload would be written to a temp file. Check `:memory?` + flag to know whether content might be read directly from `:content` or + should be fetched from the file specified in `:file`. + + Note, that if your handler works with multipart requests only, + it's better to set `:raw-stream?` to `true` to avoid additional + input stream coercion." + ([req] (decode-request req {})) + ([{:keys [body] :as req} + {:keys [body-buffer-size + memory-limit] + :or {body-buffer-size 65536 + memory-limit DefaultHttpDataFactory/MINSIZE}}] + (let [body (if (s/stream? body) + body + (netty/to-byte-buf-stream body body-buffer-size)) + destroyed? (atom false) + req' (http-core/ring-request->netty-request req) + factory (DefaultHttpDataFactory. (long memory-limit)) + decoder (HttpPostRequestDecoder. factory req') + parts (s/stream)] + + ;; on each HttpContent chunk, put it into the decoder + ;; and resume our attempts to get the next attribute available + (s/connect-via + body + (fn [chunk] + (let [content (DefaultHttpContent. chunk)] + (.offer decoder content) + (read-attributes decoder parts) + ;; note, that releasing chunk right here relies on + ;; the internals of the decoder. in case those + ;; internal are changed in future, this flow of + ;; manipulations should be also reconsidered + (netty/release chunk) + (d/success-deferred true))) + parts) + + (s/on-closed + parts + (fn [] + (when (compare-and-set! destroyed? false true) + (try + (.destroy decoder) + (catch Exception e + (log/warn e "exception when cleaning up multipart decoder")))))) + + parts))) diff --git a/test/aleph/http/multipart_test.clj b/test/aleph/http/multipart_test.clj index b07f6076..26e413d3 100644 --- a/test/aleph/http/multipart_test.clj +++ b/test/aleph/http/multipart_test.clj @@ -5,7 +5,10 @@ [aleph.http :as http] [aleph.http.multipart :as mp] [byte-streams :as bs] - [manifold.deferred :as d]) + [manifold.deferred :as d] + [manifold.stream :as s] + [clojure.string :as str] + [clojure.edn :as edn]) (:import [java.io File])) @@ -113,8 +116,12 @@ (is (.contains body-str "Content-Type: application/png\r\n")) (is (.contains body-str "Content-Transfer-Encoding: base64\r\n")))) -(def port 26003) -(def url (str "http://localhost:" port)) +(def port1 26023) +(def port2 26024) +(def port3 26025) +(def url1 (str "http://localhost:" port1)) +(def url2 (str "http://localhost:" port2)) +(def url3 (str "http://localhost:" port3)) (def parts [{:part-name "#0-string" :content "CONTENT1"} @@ -137,9 +144,9 @@ :body body}) (deftest test-send-multipart-request - (let [s (http/start-server echo-handler {:port port}) + (let [s (http/start-server echo-handler {:port port1}) ^String resp @(d/chain' - (http/post url {:multipart parts}) + (http/post url1 {:multipart parts}) :body bs/to-string)] ;; part names @@ -161,3 +168,52 @@ (is (.contains resp "filename=\"text-file-to-send.txt\"")) (.close ^java.io.Closeable s))) + +(defn- pack-chunk [{:keys [content] :as chunk}] + (cond-> (dissoc chunk :file) + (not (string? content)) + (dissoc :content))) + +(defn- decode-handler [req] + (let [chunks (-> req + mp/decode-request + s/stream->seq)] + {:status 200 + :body (pr-str (map pack-chunk chunks))})) + +(defn- test-decoder [port url raw-stream?] + (let [s (http/start-server decode-handler {:port port + :raw-stream? raw-stream?}) + chunks (-> (http/post url {:multipart parts}) + (deref 1e3 {:body "timeout"}) + :body + bs/to-string + clojure.edn/read-string + vec)] + (is (= 6 (count chunks))) + + ;; part-names + (is (= (map :part-name parts) + (map :part-name chunks))) + + ;; content + (is (= "CONTENT1" (get-in chunks [0 :content]))) + + ;; mime type + (is (= "text/plain" (get-in chunks [2 :mime-type]))) + (is (= "application/png" (get-in chunks [3 :mime-type]))) + + ;; filename + (is (= "file.txt" (get-in chunks [3 :name]))) + (is (= "text-file-to-send.txt" (get-in chunks [4 :name]))) + + ;; charset + (is (= "ISO-8859-1" (get-in chunks [5 :charset]))) + + (.close ^java.io.Closeable s))) + +(deftest test-mutlipart-request-decode-with-ring-handler + (test-decoder port2 url2 false)) + +(deftest test-mutlipart-request-decode-with-raw-handler + (test-decoder port3 url3 true))