From eb5c15eb4bac09d15a185fa614b88bf74ee1eea6 Mon Sep 17 00:00:00 2001 From: longbai Date: Fri, 4 Nov 2016 21:15:39 +0800 Subject: [PATCH] add stream upload --- CHANGELOG.md | 4 + build.gradle | 2 +- src/main/java/com/qiniu/common/Constants.java | 2 +- src/main/java/com/qiniu/common/Zone.java | 2 +- .../com/qiniu/storage/StreamUploader.java | 170 ++++++++++++++++++ .../java/com/qiniu/storage/UploadManager.java | 38 +++- .../java/com/qiniu/storage/BucketTest.java | 10 ++ .../com/qiniu/storage/ResumeUploadTest.java | 3 + .../com/qiniu/storage/StreamUploadTest.java | 126 +++++++++++++ 9 files changed, 353 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/qiniu/storage/StreamUploader.java create mode 100644 src/test/java/com/qiniu/storage/StreamUploadTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 09a0514c4..31a9bf1ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ #Changelog +## 7.2.2 (2016-11-04) +### 增加 +* stream 方式上传 + ## 7.2.1 (2016-11-03) ### 修正 * streaming publish url 过期时间单位问题 diff --git a/build.gradle b/build.gradle index 4cce28edb..570cf0eec 100755 --- a/build.gradle +++ b/build.gradle @@ -42,7 +42,7 @@ def versionName() { return version } -def versionNameToCode(String version) { +static def versionNameToCode(String version) { String v = version.replaceAll(/\./, '') return v.toLong() } diff --git a/src/main/java/com/qiniu/common/Constants.java b/src/main/java/com/qiniu/common/Constants.java index 51cf764a0..730a6afc8 100644 --- a/src/main/java/com/qiniu/common/Constants.java +++ b/src/main/java/com/qiniu/common/Constants.java @@ -9,7 +9,7 @@ public final class Constants { /** * 版本号 */ - public static final String VERSION = "7.2.1"; + public static final String VERSION = "7.2.2"; /** * 块大小,不能改变 */ diff --git a/src/main/java/com/qiniu/common/Zone.java b/src/main/java/com/qiniu/common/Zone.java index 5e398dbe7..f620519fa 100644 --- a/src/main/java/com/qiniu/common/Zone.java +++ b/src/main/java/com/qiniu/common/Zone.java @@ -27,7 +27,7 @@ public static Zone zone2() { } // 北美 - public static Zone zone_na0() { + public static Zone zoneNa0() { return new FixedZone("http://up-na0.qiniu.com", "http://upload-na0.qiniu.com", "", "http://rs-na0.qbox.me", "http://rsf-na0.qbox.me", "http://iovip-na0.qbox.me", "https://up-na0.qbox.me", "http://api-na0.qiniu.com"); diff --git a/src/main/java/com/qiniu/storage/StreamUploader.java b/src/main/java/com/qiniu/storage/StreamUploader.java new file mode 100644 index 000000000..c600b63a0 --- /dev/null +++ b/src/main/java/com/qiniu/storage/StreamUploader.java @@ -0,0 +1,170 @@ +package com.qiniu.storage; + +import com.qiniu.common.Constants; +import com.qiniu.common.QiniuException; +import com.qiniu.http.Client; +import com.qiniu.http.Response; +import com.qiniu.storage.model.ResumeBlockInfo; +import com.qiniu.util.StringMap; +import com.qiniu.util.StringUtils; +import com.qiniu.util.UrlSafeBase64; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +/** + * Created by long on 2016/11/4. + */ +public final class StreamUploader { + private final String upToken; + private final String key; + private final StringMap params; + private final String mime; + private final ArrayList contexts; + private final Configuration configuration; + private final Client client; + private final byte[] blockBuffer; + private final InputStream stream; + private long size; + private String host; + + StreamUploader(Client client, String upToken, String key, InputStream stream, + StringMap params, String mime, Configuration configuration) { + this.configuration = configuration; + this.client = client; + this.upToken = upToken; + this.key = key; + this.params = params; + this.mime = mime == null ? Client.DefaultMime : mime; + this.contexts = new ArrayList<>(); + this.blockBuffer = new byte[Constants.BLOCK_SIZE]; + this.stream = stream; + } + + public Response upload() throws QiniuException { + if (host == null) { + this.host = configuration.zone.upHost(upToken); + } + + long uploaded = 0; + int ret = 0; + boolean retry = false; + int contextIndex = 0; + + while (size == 0) { + int bufferIndex = 0; + while (ret != -1 && bufferIndex != blockBuffer.length) { + try { + ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex); + } catch (IOException e) { + close(); + throw new QiniuException(e); + } + if (ret != -1) { + bufferIndex += ret; + if (ret == 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } else { + size = uploaded + bufferIndex; + } + } + + Response response = null; + try { + response = makeBlock(blockBuffer, bufferIndex); + } catch (QiniuException e) { + if (e.code() < 0) { + host = configuration.zone.upHostBackup(upToken); + } + if (e.response == null || e.response.needRetry()) { + retry = true; + } else { + close(); + throw e; + } + } + if (retry) { + try { + response = makeBlock(blockBuffer, bufferIndex); + retry = false; + } catch (QiniuException e) { + close(); + throw e; + } + + } + ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class); + //TODO check return crc32 + // if blockInfo.crc32 != crc{} + contexts.add(blockInfo.ctx); + uploaded += bufferIndex; + } + close(); + + try { + return makeFile(); + } catch (QiniuException e) { + try { + return makeFile(); + } catch (QiniuException e1) { + throw e1; + } + } + } + + private Response makeBlock(byte[] block, int blockSize) throws QiniuException { + String url = host + "/mkblk/" + blockSize; + return post(url, block, 0, blockSize); + } + + private void close() { + try { + stream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private String fileUrl() { + String url = host + "/mkfile/" + size + "/mimeType/" + + UrlSafeBase64.encodeToString(mime); + final StringBuilder b = new StringBuilder(url); + if (key != null) { + b.append("/key/"); + b.append(UrlSafeBase64.encodeToString(key)); + } + if (params != null) { + params.forEach(new StringMap.Consumer() { + @Override + public void accept(String key, Object value) { + b.append("/"); + b.append(key); + b.append("/"); + b.append(UrlSafeBase64.encodeToString("" + value)); + } + }); + } + return b.toString(); + } + + private Response makeFile() throws QiniuException { + String url = fileUrl(); + String s = StringUtils.join(contexts, ","); + return post(url, StringUtils.utf8Bytes(s)); + } + + private Response post(String url, byte[] data) throws QiniuException { + return client.post(url, data, new StringMap().put("Authorization", "UpToken " + upToken)); + } + + private Response post(String url, byte[] data, int offset, int size) throws QiniuException { + return client.post(url, data, offset, size, new StringMap().put("Authorization", "UpToken " + upToken), + Client.DefaultMime); + } +} diff --git a/src/main/java/com/qiniu/storage/UploadManager.java b/src/main/java/com/qiniu/storage/UploadManager.java index 2491d4ba9..01d4e9c97 100644 --- a/src/main/java/com/qiniu/storage/UploadManager.java +++ b/src/main/java/com/qiniu/storage/UploadManager.java @@ -7,6 +7,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; /** * 七牛文件上传管理器 @@ -173,7 +174,17 @@ public Response put(File file, String key, String token, StringMap params, return uploader.upload(); } - + /** + * 异步上传数据 + * + * @param data 上传的数据 + * @param key 上传数据保存的文件名 + * @param token 上传凭证 + * @param params 自定义参数,如 params.put("x:foo", "foo") + * @param mime 指定文件mimetype + * @param checkCrc 是否验证crc32 + * @param handler 上传完成的回调函数 + */ public void asyncPut(final byte[] data, final String key, final String token, StringMap params, String mime, boolean checkCrc, UpCompletionHandler handler) throws IOException { checkArgs(key, data, null, token); @@ -183,4 +194,29 @@ public void asyncPut(final byte[] data, final String key, final String token, St params = filterParam(params); new FormUploader(client, token, key, data, params, mime, checkCrc, configuration).asyncUpload(handler); } + + /** + * 流式上传,通常情况建议文件上传,可以使用持久化的断点记录。 + * + * @param stream sha + * @param key 上传文件保存的文件名 + * @param token 上传凭证 + * @param params 自定义参数,如 params.put("x:foo", "foo") + * @param mime 指定文件mimetype + */ + public Response put(InputStream stream, String key, String token, StringMap params, + String mime) throws QiniuException { + String message = null; + if (stream == null) { + message = "no input data"; + } else if (token == null || token.equals("")) { + message = "no token"; + } + if (message != null) { + throw new IllegalArgumentException(message); + } + StreamUploader uploader = new StreamUploader(client, token, key, stream, + params, mime, configuration); + return uploader.upload(); + } } diff --git a/src/test/java/com/qiniu/storage/BucketTest.java b/src/test/java/com/qiniu/storage/BucketTest.java index 20cf4290a..f43544b3f 100644 --- a/src/test/java/com/qiniu/storage/BucketTest.java +++ b/src/test/java/com/qiniu/storage/BucketTest.java @@ -322,5 +322,15 @@ public void testBatch() { e.printStackTrace(); fail(); } + + BucketManager.Batch opsDel = new BucketManager.Batch().delete(TestConfig.bucket, + key, key1, key2, key3, key4); + + try { + bucketManager.batch(opsDel); + } catch (QiniuException e) { + e.printStackTrace(); + fail(); + } } } diff --git a/src/test/java/com/qiniu/storage/ResumeUploadTest.java b/src/test/java/com/qiniu/storage/ResumeUploadTest.java index 7809f4e1b..dc0d62b2f 100644 --- a/src/test/java/com/qiniu/storage/ResumeUploadTest.java +++ b/src/test/java/com/qiniu/storage/ResumeUploadTest.java @@ -6,6 +6,7 @@ import com.qiniu.common.Zone; import com.qiniu.http.Client; import com.qiniu.http.Response; +import com.qiniu.util.Etag; import com.qiniu.util.StringMap; import org.junit.Test; @@ -52,6 +53,7 @@ private void template(int size, boolean https) throws IOException { UploadManager uploadManager = new UploadManager(c); final String expectKey = "\r\n?&r=" + size + "k"; final File f = TempFile.createFile(size); + final String etag = Etag.file(f); final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey, 3600, @@ -64,6 +66,7 @@ private void template(int size, boolean https) throws IOException { MyRet ret = r.jsonToObject(MyRet.class); assertEquals(expectKey, ret.key); assertEquals(f.getName(), ret.fname); + assertEquals(etag, ret.hash); } catch (QiniuException e) { assertEquals("", e.response.bodyString()); fail(); diff --git a/src/test/java/com/qiniu/storage/StreamUploadTest.java b/src/test/java/com/qiniu/storage/StreamUploadTest.java new file mode 100644 index 000000000..04ea000c8 --- /dev/null +++ b/src/test/java/com/qiniu/storage/StreamUploadTest.java @@ -0,0 +1,126 @@ +package com.qiniu.storage; + +import com.qiniu.TempFile; +import com.qiniu.TestConfig; +import com.qiniu.common.QiniuException; +import com.qiniu.common.Zone; +import com.qiniu.http.Client; +import com.qiniu.http.Response; +import com.qiniu.util.Etag; +import com.qiniu.util.StringMap; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Created by long on 2016/11/4. + */ +public class StreamUploadTest { + + @Test + public void testXVar() throws IOException { + final String expectKey = "世/界"; + File f = null; + try { + f = TempFile.createFile(1024 * 4 + 2341); + } catch (IOException e) { + e.printStackTrace(); + } + assert f != null; + StringMap params = new StringMap().put("x:foo", "foo_val"); + final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\",\"foo\":\"$(x:foo)\"}"; + String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey, 3600, + new StringMap().put("returnBody", returnBody)); + + try { + UploadManager uploadManager = new UploadManager(new Configuration(Zone.zone0())); + Response res = uploadManager.put(new FileInputStream(f), expectKey, token, params, null); + StringMap m = res.jsonToMap(); + assertEquals("foo_val", m.get("foo")); + } catch (QiniuException e) { + assertEquals("", e.response.bodyString()); + fail(); + } finally { + TempFile.remove(f); + } + } + + private void template(int size, boolean https) throws IOException { + Configuration c = new Configuration(Zone.zone0()); + c.uploadByHttps = https; + UploadManager uploadManager = new UploadManager(c); + final String expectKey = "\r\n?&r=" + size + "k"; + final File f = TempFile.createFile(size); + final String etag = Etag.file(f); + final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\"" + + ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}"; + String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey, 3600, + new StringMap().put("returnBody", returnBody)); + + try { + StreamUploader up = new StreamUploader(new Client(), token, expectKey, + new FileInputStream(f), null, null, new Configuration(Zone.zone0())); + Response r = up.upload(); + StreamUploadTest.MyRet ret = r.jsonToObject(StreamUploadTest.MyRet.class); + assertEquals(expectKey, ret.key); + assertEquals(etag, ret.hash); + } catch (QiniuException e) { + assertEquals("", e.response.bodyString()); + fail(); + } + TempFile.remove(f); + } + + @Test + public void test1K() throws Throwable { + template(1, false); + } + + @Test + public void test600k() throws Throwable { + template(600, true); + } + + @Test + public void test600k2() throws IOException { + template(600, false); + } + + @Test + public void test4M() throws Throwable { + if (TestConfig.isTravis()) { + return; + } + template(1024 * 4, false); + } + + @Test + public void test8M1k() throws Throwable { + if (TestConfig.isTravis()) { + return; + } + template(1024 * 8 + 1, false); + } + + @Test + public void test8M1k2() throws Throwable { + if (TestConfig.isTravis()) { + return; + } + template(1024 * 8 + 1, true); + } + + class MyRet { + public String hash; + public String key; + public String fsize; + public String fname; + public String mimeType; + } +}