Skip to content

Commit

Permalink
Fix WLP deadlock and EDP discovery performance improvement (eProsima#…
Browse files Browse the repository at this point in the history
…2712)

* Refs #14450: Moved liveliness writer addition/removal outside lock scope

Signed-off-by: Javier Santiago <[email protected]>

* Refs #14450: Reordered writer removal from wlp

Signed-off-by: Javier Santiago <[email protected]>

* Refs #14457: Only check for new IPs when openning new UDP output channels if the user has requested it

Signed-off-by: Eduardo Ponz <[email protected]>

* Refs 14457. Avoid taking the endpoint mutex on discovery callbacks.

Signed-off-by: Miguel Barro <[email protected]>

Co-authored-by: Javier Santiago <[email protected]>
Co-authored-by: Miguel Barro <[email protected]>
  • Loading branch information
3 people authored Jun 8, 2022
1 parent 4270656 commit 39ec819
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 188 deletions.
209 changes: 109 additions & 100 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,118 +185,123 @@ bool StatefulReader::matched_writer_add(
{
assert(wdata.guid() != c_Guid_Unknown);

std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);

if (!is_alive_)
{
return false;
}
std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);

bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
bool is_datasharing = !is_same_process && is_datasharing_compatible_with(wdata);
if (!is_alive_)
{
return false;
}

for (WriterProxy* it : matched_writers_)
{
if (it->guid() == wdata.guid())
bool is_same_process = RTPSDomainImpl::should_intraprocess_between(m_guid, wdata.guid());
bool is_datasharing = !is_same_process && is_datasharing_compatible_with(wdata);

for (WriterProxy* it : matched_writers_)
{
logInfo(RTPS_READER, "Attempting to add existing writer, updating information");
it->update(wdata);
if (!is_same_process)
if (it->guid() == wdata.guid())
{
for (const Locator_t& locator : it->remote_locators_shrinked())
logInfo(RTPS_READER, "Attempting to add existing writer, updating information");
it->update(wdata);
if (!is_same_process)
{
getRTPSParticipant()->createSenderResources(locator);
for (const Locator_t& locator : it->remote_locators_shrinked())
{
getRTPSParticipant()->createSenderResources(locator);
}
}

if (nullptr != mp_listener)
{
// call the listener without the lock taken
guard.unlock();
mp_listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(),
&wdata);
}
return false;
}
if (nullptr != mp_listener)
{
mp_listener->on_writer_discovery(this, WriterDiscoveryInfo::CHANGED_QOS_WRITER, wdata.guid(), &wdata);
}
return false;
}
}

// Get a writer proxy from the inactive pool (or create a new one if necessary and allowed)
WriterProxy* wp = nullptr;
if (matched_writers_pool_.empty())
{
size_t max_readers = matched_writers_pool_.max_size();
if (getMatchedWritersSize() + matched_writers_pool_.size() < max_readers)
// Get a writer proxy from the inactive pool (or create a new one if necessary and allowed)
WriterProxy* wp = nullptr;
if (matched_writers_pool_.empty())
{
const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();
wp = new WriterProxy(this, part_att.allocation.locators, proxy_changes_config_);
size_t max_readers = matched_writers_pool_.max_size();
if (getMatchedWritersSize() + matched_writers_pool_.size() < max_readers)
{
const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();
wp = new WriterProxy(this, part_att.allocation.locators, proxy_changes_config_);
}
else
{
logWarning(RTPS_READER, "Maximum number of reader proxies (" << max_readers << \
") reached for writer " << m_guid);
return false;
}
}
else
{
logWarning(RTPS_READER, "Maximum number of reader proxies (" << max_readers << \
") reached for writer " << m_guid);
return false;
wp = matched_writers_pool_.back();
matched_writers_pool_.pop_back();
}
}
else
{
wp = matched_writers_pool_.back();
matched_writers_pool_.pop_back();
}

SequenceNumber_t initial_sequence;
add_persistence_guid(wdata.guid(), wdata.persistence_guid());
initial_sequence = get_last_notified(wdata.guid());
SequenceNumber_t initial_sequence;
add_persistence_guid(wdata.guid(), wdata.persistence_guid());
initial_sequence = get_last_notified(wdata.guid());

wp->start(wdata, initial_sequence, is_datasharing);
wp->start(wdata, initial_sequence, is_datasharing);

if (!is_same_process)
{
for (const Locator_t& locator : wp->remote_locators_shrinked())
if (!is_same_process)
{
getRTPSParticipant()->createSenderResources(locator);
for (const Locator_t& locator : wp->remote_locators_shrinked())
{
getRTPSParticipant()->createSenderResources(locator);
}
}
}

if (is_datasharing)
{
if (datasharing_listener_->add_datasharing_writer(wdata.guid(),
m_att.durabilityKind == VOLATILE,
mp_history->m_att.maximumReservedCaches))
{
matched_writers_.push_back(wp);
logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId
<< " with data sharing");
}
else
if (is_datasharing)
{
logError(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()
<< " to " << this->m_guid.entityId
<< " with data sharing.");
wp->stop();
matched_writers_pool_.push_back(wp);
return false;
}
if (datasharing_listener_->add_datasharing_writer(wdata.guid(),
m_att.durabilityKind == VOLATILE,
mp_history->m_att.maximumReservedCaches))
{
matched_writers_.push_back(wp);
logInfo(RTPS_READER, "Writer Proxy " << wdata.guid() << " added to " << this->m_guid.entityId
<< " with data sharing");
}
else
{
logError(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()
<< " to " << this->m_guid.entityId
<< " with data sharing.");
wp->stop();
matched_writers_pool_.push_back(wp);
return false;
}

// Intraprocess manages durability itself
if (VOLATILE == m_att.durabilityKind)
{
std::shared_ptr<ReaderPool> pool = datasharing_listener_->get_pool_for_writer(wp->guid());
SequenceNumber_t last_seq = pool->get_last_read_sequence_number();
if (SequenceNumber_t::unknown() != last_seq)
// Intraprocess manages durability itself
if (VOLATILE == m_att.durabilityKind)
{
SequenceNumberSet_t sns(last_seq + 1);
send_acknack(wp, sns, wp, false);
wp->lost_changes_update(last_seq + 1);
std::shared_ptr<ReaderPool> pool = datasharing_listener_->get_pool_for_writer(wp->guid());
SequenceNumber_t last_seq = pool->get_last_read_sequence_number();
if (SequenceNumber_t::unknown() != last_seq)
{
SequenceNumberSet_t sns(last_seq + 1);
send_acknack(wp, sns, wp, false);
wp->lost_changes_update(last_seq + 1);
}
}
else if (!is_same_process)
{
// simulate a notification to force reading of transient changes
datasharing_listener_->notify(false);
}
}
else if (!is_same_process)
else
{
// simulate a notification to force reading of transient changes
datasharing_listener_->notify(false);
matched_writers_.push_back(wp);
logInfo(RTPS_READER, "Writer Proxy " << wp->guid() << " added to " << m_guid.entityId);
}
}
else
{
matched_writers_.push_back(wp);
logInfo(RTPS_READER, "Writer Proxy " << wp->guid() << " added to " << m_guid.entityId);
}

if (liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
Expand All @@ -317,37 +322,39 @@ bool StatefulReader::matched_writer_add(
{
mp_listener->on_writer_discovery(this, WriterDiscoveryInfo::DISCOVERED_WRITER, wdata.guid(), &wdata);
}

return true;
}

bool StatefulReader::matched_writer_remove(
const GUID_t& writer_guid,
bool removed_by_lease)
{

if (is_alive_ && liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_);
}
else
{
logError(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
WriterProxy* wproxy = nullptr;
if (is_alive_)
{
//Remove cachechanges belonging to the unmatched writer
mp_history->writer_unmatched(writer_guid, get_last_notified(writer_guid));

if (liveliness_lease_duration_ < c_TimeInfinite)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
wlp->sub_liveliness_manager_->remove_writer(
writer_guid,
liveliness_kind_,
liveliness_lease_duration_);
}
else
{
logError(RTPS_LIVELINESS,
"Finite liveliness lease duration but WLP not enabled, cannot remove writer");
}
}

for (ResourceLimitedVector<WriterProxy*>::iterator it = matched_writers_.begin();
it != matched_writers_.end();
++it)
Expand Down Expand Up @@ -377,6 +384,8 @@ bool StatefulReader::matched_writer_remove(
matched_writers_pool_.push_back(wproxy);
if (nullptr != mp_listener)
{
// call the listener without the lock taken
lock.unlock();
mp_listener->on_writer_discovery(this, WriterDiscoveryInfo::REMOVED_WRITER, writer_guid, nullptr);
}
}
Expand Down
Loading

0 comments on commit 39ec819

Please sign in to comment.