Skip to content

Commit

Permalink
add stream upload
Browse files Browse the repository at this point in the history
  • Loading branch information
longbai committed Nov 4, 2016
1 parent 59868ee commit eb5c15e
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#Changelog

## 7.2.2 (2016-11-04)
### 增加
* stream 方式上传

## 7.2.1 (2016-11-03)
### 修正
* streaming publish url 过期时间单位问题
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def versionName() {
return version
}

def versionNameToCode(String version) {
static def versionNameToCode(String version) {
String v = version.replaceAll(/\./, '')
return v.toLong()
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/qiniu/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public final class Constants {
/**
* 版本号
*/
public static final String VERSION = "7.2.1";
public static final String VERSION = "7.2.2";
/**
* 块大小,不能改变
*/
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/qiniu/common/Zone.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static Zone zone2() {
}

// 北美
public static Zone zone_na0() {
public static Zone zoneNa0() {
return new FixedZone("http://up-na0.qiniu.com", "http://upload-na0.qiniu.com",
"", "http://rs-na0.qbox.me", "http://rsf-na0.qbox.me", "http://iovip-na0.qbox.me",
"https://up-na0.qbox.me", "http://api-na0.qiniu.com");
Expand Down
170 changes: 170 additions & 0 deletions src/main/java/com/qiniu/storage/StreamUploader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package com.qiniu.storage;

import com.qiniu.common.Constants;
import com.qiniu.common.QiniuException;
import com.qiniu.http.Client;
import com.qiniu.http.Response;
import com.qiniu.storage.model.ResumeBlockInfo;
import com.qiniu.util.StringMap;
import com.qiniu.util.StringUtils;
import com.qiniu.util.UrlSafeBase64;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;

/**
* Created by long on 2016/11/4.
*/
public final class StreamUploader {
private final String upToken;
private final String key;
private final StringMap params;
private final String mime;
private final ArrayList<String> contexts;
private final Configuration configuration;
private final Client client;
private final byte[] blockBuffer;
private final InputStream stream;
private long size;
private String host;

StreamUploader(Client client, String upToken, String key, InputStream stream,
StringMap params, String mime, Configuration configuration) {
this.configuration = configuration;
this.client = client;
this.upToken = upToken;
this.key = key;
this.params = params;
this.mime = mime == null ? Client.DefaultMime : mime;
this.contexts = new ArrayList<>();
this.blockBuffer = new byte[Constants.BLOCK_SIZE];
this.stream = stream;
}

public Response upload() throws QiniuException {
if (host == null) {
this.host = configuration.zone.upHost(upToken);
}

long uploaded = 0;
int ret = 0;
boolean retry = false;
int contextIndex = 0;

while (size == 0) {
int bufferIndex = 0;
while (ret != -1 && bufferIndex != blockBuffer.length) {
try {
ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex);
} catch (IOException e) {
close();
throw new QiniuException(e);
}
if (ret != -1) {
bufferIndex += ret;
if (ret == 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
size = uploaded + bufferIndex;
}
}

Response response = null;
try {
response = makeBlock(blockBuffer, bufferIndex);
} catch (QiniuException e) {
if (e.code() < 0) {
host = configuration.zone.upHostBackup(upToken);
}
if (e.response == null || e.response.needRetry()) {
retry = true;
} else {
close();
throw e;
}
}
if (retry) {
try {
response = makeBlock(blockBuffer, bufferIndex);
retry = false;
} catch (QiniuException e) {
close();
throw e;
}

}
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
//TODO check return crc32
// if blockInfo.crc32 != crc{}
contexts.add(blockInfo.ctx);
uploaded += bufferIndex;
}
close();

try {
return makeFile();
} catch (QiniuException e) {
try {
return makeFile();
} catch (QiniuException e1) {
throw e1;
}
}
}

private Response makeBlock(byte[] block, int blockSize) throws QiniuException {
String url = host + "/mkblk/" + blockSize;
return post(url, block, 0, blockSize);
}

private void close() {
try {
stream.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private String fileUrl() {
String url = host + "/mkfile/" + size + "/mimeType/"
+ UrlSafeBase64.encodeToString(mime);
final StringBuilder b = new StringBuilder(url);
if (key != null) {
b.append("/key/");
b.append(UrlSafeBase64.encodeToString(key));
}
if (params != null) {
params.forEach(new StringMap.Consumer() {
@Override
public void accept(String key, Object value) {
b.append("/");
b.append(key);
b.append("/");
b.append(UrlSafeBase64.encodeToString("" + value));
}
});
}
return b.toString();
}

private Response makeFile() throws QiniuException {
String url = fileUrl();
String s = StringUtils.join(contexts, ",");
return post(url, StringUtils.utf8Bytes(s));
}

private Response post(String url, byte[] data) throws QiniuException {
return client.post(url, data, new StringMap().put("Authorization", "UpToken " + upToken));
}

private Response post(String url, byte[] data, int offset, int size) throws QiniuException {
return client.post(url, data, offset, size, new StringMap().put("Authorization", "UpToken " + upToken),
Client.DefaultMime);
}
}
38 changes: 37 additions & 1 deletion src/main/java/com/qiniu/storage/UploadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;

/**
* 七牛文件上传管理器
Expand Down Expand Up @@ -173,7 +174,17 @@ public Response put(File file, String key, String token, StringMap params,
return uploader.upload();
}


/**
* 异步上传数据
*
* @param data 上传的数据
* @param key 上传数据保存的文件名
* @param token 上传凭证
* @param params 自定义参数,如 params.put("x:foo", "foo")
* @param mime 指定文件mimetype
* @param checkCrc 是否验证crc32
* @param handler 上传完成的回调函数
*/
public void asyncPut(final byte[] data, final String key, final String token, StringMap params,
String mime, boolean checkCrc, UpCompletionHandler handler) throws IOException {
checkArgs(key, data, null, token);
Expand All @@ -183,4 +194,29 @@ public void asyncPut(final byte[] data, final String key, final String token, St
params = filterParam(params);
new FormUploader(client, token, key, data, params, mime, checkCrc, configuration).asyncUpload(handler);
}

/**
* 流式上传,通常情况建议文件上传,可以使用持久化的断点记录。
*
* @param stream sha
* @param key 上传文件保存的文件名
* @param token 上传凭证
* @param params 自定义参数,如 params.put("x:foo", "foo")
* @param mime 指定文件mimetype
*/
public Response put(InputStream stream, String key, String token, StringMap params,
String mime) throws QiniuException {
String message = null;
if (stream == null) {
message = "no input data";
} else if (token == null || token.equals("")) {
message = "no token";
}
if (message != null) {
throw new IllegalArgumentException(message);
}
StreamUploader uploader = new StreamUploader(client, token, key, stream,
params, mime, configuration);
return uploader.upload();
}
}
10 changes: 10 additions & 0 deletions src/test/java/com/qiniu/storage/BucketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,15 @@ public void testBatch() {
e.printStackTrace();
fail();
}

BucketManager.Batch opsDel = new BucketManager.Batch().delete(TestConfig.bucket,
key, key1, key2, key3, key4);

try {
bucketManager.batch(opsDel);
} catch (QiniuException e) {
e.printStackTrace();
fail();
}
}
}
3 changes: 3 additions & 0 deletions src/test/java/com/qiniu/storage/ResumeUploadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.qiniu.common.Zone;
import com.qiniu.http.Client;
import com.qiniu.http.Response;
import com.qiniu.util.Etag;
import com.qiniu.util.StringMap;
import org.junit.Test;

Expand Down Expand Up @@ -52,6 +53,7 @@ private void template(int size, boolean https) throws IOException {
UploadManager uploadManager = new UploadManager(c);
final String expectKey = "\r\n?&r=" + size + "k";
final File f = TempFile.createFile(size);
final String etag = Etag.file(f);
final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\""
+ ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}";
String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey, 3600,
Expand All @@ -64,6 +66,7 @@ private void template(int size, boolean https) throws IOException {
MyRet ret = r.jsonToObject(MyRet.class);
assertEquals(expectKey, ret.key);
assertEquals(f.getName(), ret.fname);
assertEquals(etag, ret.hash);
} catch (QiniuException e) {
assertEquals("", e.response.bodyString());
fail();
Expand Down
Loading

0 comments on commit eb5c15e

Please sign in to comment.