Skip to content

Commit

Permalink
Fixed issue #21.
Browse files Browse the repository at this point in the history
  • Loading branch information
admin committed Feb 22, 2022
1 parent abfff2b commit 7097b1b
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 235 deletions.
3 changes: 2 additions & 1 deletion include/teemo/teemo.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ enum Result {
HASH_VERIFY_NOT_PASS,
CALCULATE_HASH_FAILED,
FETCH_FILE_INFO_FAILED,
REDIRECT_URL_DIFFERENT
REDIRECT_URL_DIFFERENT,
NOT_CLEARLY_RESULT,
};

enum DownloadState { STOPPED = 0, DOWNLODING = 1, PAUSED = 2 };
Expand Down
1 change: 1 addition & 0 deletions src/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SortUsingDeclarations: false
BreakStringLiterals: false
BreakConstructorInitializersBeforeComma: true
ReflowComments: false
ColumnLimit: 0
BreakBeforeBraces: Custom
BraceWrapping:
BeforeElse: true
Expand Down
194 changes: 103 additions & 91 deletions src/entry_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,24 @@ Result EntryHandler::asyncTaskProcess() {
user_stopped_.store(false);
state_.store(DownloadState::DOWNLODING);

Result ret = _asyncTaskProcess();
const Result ret = _asyncTaskProcess();

options_->internal_stop_event.set();

if (speed_handler_)
speed_handler_.reset();

if (progress_handler_)
progress_handler_.reset();

if (slice_manager_) {
slice_manager_->cleanup();
slice_manager_.reset();
}

if (options_->result_functor)
options_->result_functor(ret);

return ret;
}

Expand All @@ -171,110 +174,97 @@ Result EntryHandler::_asyncTaskProcess() {
} while (++try_times <= options_->fetch_file_info_retry);

if (!fetch_size_ret) {
OutputVerbose(options_->verbose_functor,
"[teemo] Fetch file size failed.\n");
OutputVerbose(options_->verbose_functor, u8"[teemo] Fetch file size failed.\n");
return FETCH_FILE_INFO_FAILED;
}

// If target file is an empty file, create it.
if (file_info.fileSize == 0) {
OutputVerbose(options_->verbose_functor, "[teemo] File size is 0.\n");
OutputVerbose(options_->verbose_functor, u8"[teemo] File size is 0.\n");
return FileUtil::CreateFixedSizeFile(options_->target_file_path, 0)
? SUCCESSED
: CREATE_TARGET_FILE_FAILED;
}

OutputVerbose(options_->verbose_functor, "[teemo] URL: %s.\n",
options_->url.c_str());
OutputVerbose(options_->verbose_functor, "[teemo] Content MD5: %s.\n",
file_info.contentMd5.c_str());
OutputVerbose(options_->verbose_functor, "[teemo] Redirect URL: %s.\n",
file_info.redirect_url.c_str());
OutputVerbose(options_->verbose_functor, "[teemo] Thread number: %d.\n",
options_->thread_num);
OutputVerbose(options_->verbose_functor, "[teemo] Disk Cache Size: %ld.\n",
options_->disk_cache_size);
OutputVerbose(options_->verbose_functor, "[teemo] Target file path: %s.\n",
options_->target_file_path.c_str());
OutputVerbose(options_->verbose_functor, u8"[teemo] URL: %s.\n", options_->url.c_str());
OutputVerbose(options_->verbose_functor, u8"[teemo] Content MD5: %s.\n", file_info.contentMd5.c_str());
OutputVerbose(options_->verbose_functor, u8"[teemo] Redirect URL: %s.\n", file_info.redirect_url.c_str());
OutputVerbose(options_->verbose_functor, u8"[teemo] Thread number: %d.\n", options_->thread_num);
OutputVerbose(options_->verbose_functor, u8"[teemo] Disk Cache Size: %ld.\n", options_->disk_cache_size);
OutputVerbose(options_->verbose_functor, u8"[teemo] Target file path: %s.\n", options_->target_file_path.c_str());

assert(!slice_manager_);
slice_manager_ =
std::make_shared<SliceManager>(options_, file_info.redirect_url);
slice_manager_ = std::make_shared<SliceManager>(options_, file_info.redirect_url);

if (slice_manager_->loadExistSlice(file_info.fileSize,
file_info.contentMd5) != SUCCESSED) {
if (slice_manager_->loadExistSlice(file_info.fileSize, file_info.contentMd5) != SUCCESSED) {
slice_manager_->setOriginFileSize(file_info.fileSize);
slice_manager_->setContentMd5(file_info.contentMd5);

Result ms_ret = slice_manager_->makeSlices(file_info.acceptRanges);
const Result ms_ret = slice_manager_->makeSlices(file_info.acceptRanges);
if (ms_ret != SUCCESSED) {
return ms_ret;
}
}

Result all_completed_ret = slice_manager_->isAllSliceCompleted(false);
Result all_completed_ret = slice_manager_->isAllSliceCompletedClearly(false);
if (all_completed_ret == SUCCESSED) {
OutputVerbose(options_->verbose_functor,
"[teemo] All of slices been downloaded.\n");
Result ret = slice_manager_->finishDownloadProgress(false, multi_);
return ret;
OutputVerbose(options_->verbose_functor, u8"[teemo] All of slices been downloaded.\n");
return slice_manager_->finishDownloadProgress(false, multi_);
}

multi_ = curl_multi_init();
if (!multi_) {
OutputVerbose(options_->verbose_functor,
"[teemo] curl_multi_init failed.\n");
OutputVerbose(options_->verbose_functor, u8"[teemo] curl_multi_init failed.\n");
return INIT_CURL_MULTI_FAILED;
}

int32_t disk_cache_per_slice = 0L;
int32_t max_speed_per_slice = 0L;
calculateSliceInfo(
std::min(slice_manager_->getUnfetchAndUncompletedSliceNum(),
options_->thread_num),
std::min(slice_manager_->getUnfetchAndUncompletedSliceNum(), options_->thread_num),
&disk_cache_per_slice, &max_speed_per_slice);

OutputVerbose(options_->verbose_functor,
"[teemo] Disk cache per slice: %ld.\n", disk_cache_per_slice);
OutputVerbose(options_->verbose_functor,
"[teemo] Max speed per slice: %ld.\n", max_speed_per_slice);
OutputVerbose(options_->verbose_functor, u8"[teemo] Disk cache per slice: %ld.\n", disk_cache_per_slice);
OutputVerbose(options_->verbose_functor, u8"[teemo] Max speed per slice: %ld.\n", max_speed_per_slice);

Result ss_ret = SUCCESSED;
int32_t selected = 0;
while (true) {
if (selected >= options_->thread_num)
break;
std::shared_ptr<Slice> slice =
slice_manager_->getUncompletedSlice(Slice::UNFETCH);

std::shared_ptr<Slice> slice = slice_manager_->getSlice(Slice::UNFETCH);
if (!slice)
break;

slice->setStatus(Slice::FETCHED);
ss_ret = slice->start(multi_, disk_cache_per_slice, max_speed_per_slice);
if (ss_ret != SUCCESSED) {
OutputVerbose(options_->verbose_functor,
"[teemo] Slice<%d> start downloading failed: %s.\n",
u8"[teemo] Slice<%d> start downloading failed: %s.\n",
slice->index(), GetResultString(ss_ret));

continue;
// fatal error, return immediately!
curl_multi_cleanup(multi_);
multi_ = nullptr;
return ss_ret;
}
OutputVerbose(options_->verbose_functor,
"[teemo] Slice<%d> start downloading.\n", slice->index());
OutputVerbose(options_->verbose_functor, u8"[teemo] Slice<%d> start downloading.\n", slice->index());
selected++;
}

if (selected == 0) {
OutputVerbose(options_->verbose_functor, "[teemo] No available slice.\n");
OutputVerbose(options_->verbose_functor, u8"[teemo] No available slice.\n");
curl_multi_cleanup(multi_);
multi_ = nullptr;
return UNKNOWN_ERROR;
}

if (options_->progress_functor)
progress_handler_ =
std::make_shared<ProgressHandler>(options_, slice_manager_);
progress_handler_ = std::make_shared<ProgressHandler>(options_, slice_manager_);

if (options_->speed_functor)
speed_handler_ = std::make_shared<SpeedHandler>(
slice_manager_->totalDownloaded(), options_, slice_manager_);
speed_handler_ = std::make_shared<SpeedHandler>(slice_manager_->totalDownloaded(), options_, slice_manager_);

// https://curl.haxx.se/libcurl/c/curl_multi_fdset.html
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
Expand Down Expand Up @@ -306,7 +296,7 @@ Result EntryHandler::_asyncTaskProcess() {
int still_running = 0;

CURLMcode mcode = curl_multi_perform(multi_, &still_running);
OutputVerbose(options_->verbose_functor, "[teemo] Start downloading.\n");
OutputVerbose(options_->verbose_functor, u8"[teemo] Start downloading.\n");

TimeMeter flush_time_meter;

Expand All @@ -315,15 +305,16 @@ Result EntryHandler::_asyncTaskProcess() {
while (true) {
if (options_->internal_stop_event.wait(50))
break;

if (options_->user_stop_event && options_->user_stop_event->isSetted())
break;

if (!user_paused_.load())
break;
}
}

if (options_->internal_stop_event.isSetted() ||
(options_->user_stop_event && options_->user_stop_event->isSetted()))
if (options_->internal_stop_event.isSetted() || (options_->user_stop_event && options_->user_stop_event->isSetted()))
break;

if (flush_time_meter.Elapsed() >= 10000) { // 10s
Expand All @@ -348,8 +339,7 @@ Result EntryHandler::_asyncTaskProcess() {
mcode = curl_multi_fdset(multi_, &fdread, &fdwrite, &fdexcep, &maxfd);
if (mcode != CURLM_CALL_MULTI_PERFORM && mcode != CURLM_OK) {
OutputVerbose(options_->verbose_functor,
"[teemo] curl_multi_fdset failed, code: %ld(%s).\n",
(long)mcode, curl_multi_strerror(mcode));
"[teemo] curl_multi_fdset failed, code: %ld(%s).\n", (long)mcode, curl_multi_strerror(mcode));
break;
}

Expand All @@ -362,7 +352,7 @@ Result EntryHandler::_asyncTaskProcess() {
zero if the time limit expired, or SOCKET_ERROR if an error occurred.
If the return value is SOCKET_ERROR, WSAGetLastError can be used to retrieve a specific error code.
*/
int rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &select_timeout);
const int rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &select_timeout);
if (rc == -1) { // SOCKET_ERROR
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
Expand All @@ -373,36 +363,49 @@ Result EntryHandler::_asyncTaskProcess() {
if (still_running < options_->thread_num) {
updateSliceStatus();

// Get a slice that not completed and not be fetched.
std::shared_ptr<Slice> slice =
slice_manager_->getUncompletedSlice(Slice::UNFETCH);
// Get a slice that not be fetched(of cause not completed).
std::shared_ptr<Slice> slice = slice_manager_->getSlice(Slice::UNFETCH);
if (!slice) {
// Try to download the slice that is failed previous again.
slice = slice_manager_->getUncompletedSlice(Slice::DOWNLOAD_FAILED);
if (slice && slice->failedTimes() >= options_->slice_max_failed_times)
slice.reset();
slice = slice_manager_->getSlice(Slice::DOWNLOAD_FAILED);
if (slice) {
if (slice->failedTimes() >= options_->slice_max_failed_times)
slice.reset();
else
OutputVerbose(options_->verbose_functor, u8"[teemo] Re-download slice<%d>.\n", slice->index());
}
else {
if (!slice_manager_->getSlice(Slice::DOWNLOADING)) {
slice = slice_manager_->getSlice(Slice::CURL_OK_BUT_STATUS_NOT_SURE);
if (slice) {
const Result ascc_ret = slice_manager_->isAllSliceCompletedClearly(false);
if (ascc_ret == SUCCESSED) {
slice->setStatus(Slice::DOWNLOAD_COMPLETED);
slice.reset();
}
else {
OutputVerbose(options_->verbose_functor, u8"[teemo] Re-download slice<%d>.\n", slice->index());
}
}
}
}
}

if (slice) {
slice->setStatus(Slice::FETCHED);
int32_t disk_cache_per_slice = 0L;
int32_t max_speed_per_slice = 0L;
calculateSliceInfo(still_running + 1, &disk_cache_per_slice,
&max_speed_per_slice);
Result start_ret =
slice->start(multi_, disk_cache_per_slice, max_speed_per_slice);
disk_cache_per_slice = 0L;
max_speed_per_slice = 0L;
calculateSliceInfo(still_running + 1, &disk_cache_per_slice, &max_speed_per_slice);

const Result start_ret = slice->start(multi_, disk_cache_per_slice, max_speed_per_slice);
if (still_running <= 0) {
if (start_ret == SUCCESSED) {
curl_multi_perform(multi_, &still_running);
OutputVerbose(options_->verbose_functor,
"[teemo] Slice<%d> start downloading.\n",
slice->index());
OutputVerbose(options_->verbose_functor, "[teemo] Slice<%d> start downloading.\n", slice->index());
}
else {
still_running = 1;
OutputVerbose(options_->verbose_functor,
"[teemo] Slice<%d> start downloading failed: %s.\n",
slice->index(), GetResultString(start_ret));
OutputVerbose(options_->verbose_functor, "[teemo] Slice<%d> start downloading failed: %s.\n", slice->index(), GetResultString(start_ret));
}
}
}
Expand Down Expand Up @@ -498,15 +501,12 @@ bool EntryHandler::requestFileInfo(const utf8string& url, FileInfo& fileInfo) {
}

char* redirect_url = nullptr;
if (curl_easy_getinfo(curl, CURLINFO_REDIRECT_URL, &redirect_url) ==
CURLE_OK) {
if (redirect_url)
fileInfo.redirect_url = redirect_url;
if (curl_easy_getinfo(curl, CURLINFO_REDIRECT_URL, &redirect_url) == CURLE_OK && redirect_url) {
fileInfo.redirect_url = redirect_url;
}

int http_code = 0;
if ((ret_code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
&http_code)) != CURLE_OK) {
if ((ret_code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code)) != CURLE_OK) {
OutputVerbose(
options_->verbose_functor,
"[teemo] Get CURLINFO_RESPONSE_CODE failed, CURLcode: %ld(%s).\n",
Expand Down Expand Up @@ -535,7 +535,7 @@ void EntryHandler::cancelFetchFileInfo() {

void EntryHandler::calculateSliceInfo(int32_t concurrency_num,
int32_t* disk_cache_per_slice,
int32_t* max_speed_per_slice) {
int32_t* max_speed_per_slice) const {
if (concurrency_num <= 0) {
if (disk_cache_per_slice) {
*disk_cache_per_slice = options_->disk_cache_size;
Expand All @@ -561,29 +561,41 @@ void EntryHandler::calculateSliceInfo(int32_t concurrency_num,
void EntryHandler::updateSliceStatus() {
struct CURLMsg* m = nullptr;
do {
int msgq = 0;
m = curl_multi_info_read(multi_, &msgq);
int msg_in_queue = 0;
m = curl_multi_info_read(multi_, &msg_in_queue);

if (m && m->msg == CURLMSG_DONE) {
std::shared_ptr<Slice> slice = slice_manager_->getSlice(m->easy_handle);
const std::shared_ptr<Slice> slice = slice_manager_->getSlice(m->easy_handle);
assert(slice);
if (!slice)
continue;

if (m->data.result == CURLE_OK) {
slice->setStatus(Slice::DOWNLOAD_COMPLETED);
slice->stop(multi_);
if (slice->isDataCompletedClearly()) {
slice->setStatus(Slice::DOWNLOAD_COMPLETED);
slice->stop(multi_, false);
}
else {
if (slice->end() == -1) {
slice->setStatus(Slice::CURL_OK_BUT_STATUS_NOT_SURE);
slice->stop(multi_, false);
}
else {
slice->setStatus(Slice::DOWNLOAD_FAILED);
slice->increaseFailedTimes();
slice->stop(multi_, true);
}
}
}
else {
if (!slice->isDataCompleted()) {
OutputVerbose(options_->verbose_functor,
"[teemo] Slice<%d> download failed %ld(%s).\n",
slice->index(), m->data.result,
curl_easy_strerror(m->data.result));

slice->setStatus(Slice::DOWNLOAD_FAILED);
slice->increaseFailedTimes();
slice->stop(multi_);
}
OutputVerbose(options_->verbose_functor,
u8"[teemo] Slice<%d> download failed %ld(%s).\n",
slice->index(), m->data.result,
curl_easy_strerror(m->data.result));

slice->setStatus(Slice::DOWNLOAD_FAILED);
slice->increaseFailedTimes();
slice->stop(multi_, true);
}
}
} while (m);
Expand Down
2 changes: 1 addition & 1 deletion src/entry_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class EntryHandler {
void cancelFetchFileInfo();
void calculateSliceInfo(int32_t concurrency_num,
int32_t* disk_cache_per_slice,
int32_t* max_speed_per_slice);
int32_t* max_speed_per_slice) const;
void updateSliceStatus();

protected:
Expand Down
Loading

0 comments on commit 7097b1b

Please sign in to comment.