Skip to content

Commit

Permalink
Support gaps in qts updates.
Browse files Browse the repository at this point in the history
GitOrigin-RevId: afcae4aa4ac456f5b8d8b2e46b92126a606bdca9
  • Loading branch information
levlam committed Aug 3, 2020
1 parent 3be95dd commit e9d3b48
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 37 deletions.
130 changes: 96 additions & 34 deletions td/telegram/UpdatesManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ void UpdatesManager::fill_seq_gap(void *td) {
fill_gap(td, "seq");
}

void UpdatesManager::fill_qts_gap(void *td) {
fill_gap(td, "qts");
}

void UpdatesManager::fill_get_difference_gap(void *td) {
fill_gap(td, "getDifference");
}
Expand Down Expand Up @@ -298,22 +302,6 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
return result;
}

Promise<> UpdatesManager::set_qts(int32 qts) {
Promise<> result;
if (qts > get_qts() || (0 < qts && qts < get_qts() - 399999)) { // qts can only go up or drop cardinally
if (qts < get_qts() - 399999) {
LOG(WARNING) << "Qts decreases from " << get_qts() << " to " << qts;
} else {
LOG(INFO) << "Update qts from " << get_qts() << " to " << qts;
}

result = add_qts(qts);
} else if (qts < get_qts()) {
LOG(ERROR) << "Receive wrong qts = " << qts << " less than current qts = " << get_qts();
}
return result;
}

void UpdatesManager::set_date(int32 date, bool from_update, string date_source) {
if (date > date_) {
LOG(INFO) << "Update date to " << date;
Expand Down Expand Up @@ -811,7 +799,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptr<telegram_api::updates_st
string full_source = "on_get_updates_state " + oneline(to_string(state)) + " from " + source;
set_pts(state->pts_, full_source.c_str()).set_value(Unit());
set_date(state->date_, false, std::move(full_source));
// set_qts(state->qts_).set_value(Unit());
add_qts(state->qts_).set_value(Unit());

seq_ = state->seq_;
}
Expand Down Expand Up @@ -993,7 +981,7 @@ void UpdatesManager::on_server_pong(tl_object_ptr<telegram_api::updates_state> &

void UpdatesManager::process_get_difference_updates(
vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages, int32 qts,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
vector<tl_object_ptr<telegram_api::Update>> &&other_updates) {
VLOG(get_difference) << "In get difference receive " << new_messages.size() << " messages, "
<< new_encrypted_messages.size() << " encrypted messages and " << other_updates.size()
Expand Down Expand Up @@ -1038,8 +1026,6 @@ void UpdatesManager::process_get_difference_updates(
}

process_updates(std::move(other_updates), true);

set_qts(qts).set_value(Unit());
}

void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Difference> &&difference_ptr) {
Expand Down Expand Up @@ -1068,7 +1054,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "updates.difference");

process_get_difference_updates(std::move(difference->new_messages_),
std::move(difference->new_encrypted_messages_), difference->state_->qts_,
std::move(difference->new_encrypted_messages_),
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
Expand All @@ -1092,7 +1078,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe

process_get_difference_updates(std::move(difference->new_messages_),
std::move(difference->new_encrypted_messages_),
difference->intermediate_state_->qts_, std::move(difference->other_updates_));
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
break;
Expand Down Expand Up @@ -1315,7 +1301,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (id == telegram_api::updateNewMessage::ID || id == telegram_api::updateReadMessagesContents::ID ||
id == telegram_api::updateEditMessage::ID || id == telegram_api::updateDeleteMessages::ID ||
id == telegram_api::updateReadHistoryInbox::ID || id == telegram_api::updateReadHistoryOutbox::ID ||
id == telegram_api::updateWebPage::ID) {
id == telegram_api::updateWebPage::ID || id == telegram_api::updateNewEncryptedMessage::ID ||
id == telegram_api::updateChannelParticipant::ID) {
if (!downcast_call(*update, OnUpdate(this, update, false))) {
LOG(ERROR) << "Can't call on some update received from " << source;
}
Expand Down Expand Up @@ -1370,6 +1357,44 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
}

void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts) {
CHECK(update != nullptr);
if (qts <= 1) {
LOG(ERROR) << "Receive wrong qts " << qts << " in " << oneline(to_string(update));
return;
}

int32 old_qts = get_qts();
if (qts < old_qts - 1000001) {
LOG(WARNING) << "Restore qts after qts overflow from " << old_qts << " to " << qts << " by "
<< oneline(to_string(update));
add_qts(qts - 1).set_value(Unit());
CHECK(get_qts() == qts - 1);
old_qts = qts - 1;
}

if (qts <= old_qts) {
LOG(INFO) << "Skip already applied update with qts = " << qts;
return;
}

CHECK(!running_get_difference_);

if (qts > old_qts + 1) {
if (pending_qts_updates_.empty()) {
set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
bool is_inserted = pending_qts_updates_.emplace(qts, std::move(update)).second;
if (!is_inserted) {
LOG(INFO) << "Receive duplicate update with qts = " << qts;
}
return;
}

process_qts_update(std::move(update), qts);
process_pending_qts_updates();
}

void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply) {
tl_object_ptr<telegram_api::updatePtsChanged> update_pts_changed;
/*
Expand Down Expand Up @@ -1435,6 +1460,22 @@ void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
}
}

void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts) {
switch (update->get_id()) {
case telegram_api::updateNewEncryptedMessage::ID: {
auto message = std::move(move_tl_object_as<telegram_api::updateNewEncryptedMessage>(update)->message_);
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(message), add_qts(qts));
break;
}
case telegram_api::updateChannelParticipant::ID:
// TODO
break;
default:
UNREACHABLE();
break;
}
}

void UpdatesManager::process_pending_seq_updates() {
while (!pending_seq_updates_.empty() && !running_get_difference_) {
auto update_it = pending_seq_updates_.begin();
Expand All @@ -1458,6 +1499,27 @@ void UpdatesManager::process_pending_seq_updates() {
}
}

void UpdatesManager::process_pending_qts_updates() {
if (pending_qts_updates_.empty()) {
return;
}
while (!pending_qts_updates_.empty()) {
CHECK(!running_get_difference_);
auto update_it = pending_qts_updates_.begin();
auto qts = update_it->first;
if (qts > get_qts() + 1) {
return;
}
if (qts == get_qts() + 1) {
process_qts_update(std::move(update_it->second), qts);
}
pending_qts_updates_.erase(update_it);
}
if (pending_qts_updates_.empty()) {
qts_gap_timeout_.cancel_timeout();
}
}

void UpdatesManager::set_seq_gap_timeout(double timeout) {
if (!seq_gap_timeout_.has_timeout()) {
seq_gap_timeout_.set_callback(std::move(fill_seq_gap));
Expand All @@ -1466,6 +1528,13 @@ void UpdatesManager::set_seq_gap_timeout(double timeout) {
}
}

void UpdatesManager::set_qts_gap_timeout(double timeout) {
CHECK(!qts_gap_timeout_.has_timeout());
qts_gap_timeout_.set_callback(std::move(fill_qts_gap));
qts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
qts_gap_timeout_.set_timeout_in(timeout);
}

void UpdatesManager::on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, const char *source) {
vector<tl_object_ptr<telegram_api::Update>> updates;
updates.push_back(std::move(update));
Expand Down Expand Up @@ -1941,19 +2010,12 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryption> upd
}

void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply) {
if (!force_apply) {
if (update->qts_ <= get_qts()) {
LOG(INFO) << "Ignore already processed update with qts " << update->qts_;
return;
}
if (update->qts_ != get_qts() + 1) {
// TODO fill gap
return;
}
if (force_apply) {
return process_qts_update(std::move(update), 0);
}

send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_),
add_qts(update->qts_));
auto qts = update->qts_;
add_pending_qts_update(std::move(update), qts);
}

void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update, bool /*force_apply*/) {
Expand Down
18 changes: 15 additions & 3 deletions td/telegram/UpdatesManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class UpdatesManager : public Actor {

Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT;

Promise<> set_qts(int32 qts) TD_WARN_UNUSED_RESULT;

static const double MAX_UNFILLED_GAP_TIME;

static void fill_pts_gap(void *td);
Expand Down Expand Up @@ -112,8 +110,12 @@ class UpdatesManager : public Actor {
std::multimap<int32, PendingUpdates> postponed_updates_; // updates received during getDifference
std::multimap<int32, PendingUpdates> pending_seq_updates_; // updates with too big seq

std::map<int32, tl_object_ptr<telegram_api::Update>> pending_qts_updates_; // updates with too big qts

Timeout seq_gap_timeout_;

Timeout qts_gap_timeout_;

int32 retry_time_ = 1;
Timeout retry_timeout_;

Expand All @@ -139,27 +141,37 @@ class UpdatesManager : public Actor {

void process_get_difference_updates(vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
int32 qts, vector<tl_object_ptr<telegram_api::Update>> &&other_updates);
vector<tl_object_ptr<telegram_api::Update>> &&other_updates);

void on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, const char *source);

void add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts);

void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end,
int32 date, const char *source);

void process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply);

void process_seq_updates(int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates);

void process_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts);

void process_pending_seq_updates();

void process_pending_qts_updates();

static void fill_seq_gap(void *td);

static void fill_qts_gap(void *td);

static void fill_get_difference_gap(void *td);

static void fill_gap(void *td, const char *source);

void set_seq_gap_timeout(double timeout);

void set_qts_gap_timeout(double timeout);

void on_failed_get_difference();

void before_get_difference(bool is_initial);
Expand Down

0 comments on commit e9d3b48

Please sign in to comment.