Skip to content

Commit

Permalink
Fix for late-joiners KEEP_LAST, RELIABLE, TRANSIENT_LOCAL (eProsima#1314
Browse files Browse the repository at this point in the history
)

* Refs #8896 Add Blackbox Tests

* Large data volatile tests
* Large data transient local tests
* Tests to check #8896 bug (Transient Local, Reliable, Keep Last 1)

Signed-off-by: Laura Martin <[email protected]>

* Refs #8896 Fix history full check in SubscriberHistory

Signed-off-by: Laura Martin <[email protected]>

* Refs #8986 Requested Changes

Signed-off-by: Laura Martin <[email protected]>

* Refs #8986 Separate HistoryAttributes calculation to a separate method

Signed-off-by: Laura Martin <[email protected]>
  • Loading branch information
lauramg15 authored Jul 31, 2020
1 parent 636754a commit ae85f36
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 80 deletions.
53 changes: 32 additions & 21 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,35 @@ static void get_sample_info(
info->related_sample_identity = change->write_params.sample_identity();
}

static HistoryAttributes to_history_attributes(
const TopicAttributes& topic_att,
uint32_t payloadMaxSize,
MemoryManagementPolicy_t mempolicy)
{
auto initial_samples = topic_att.resourceLimitsQos.allocated_samples;
auto max_samples = topic_att.resourceLimitsQos.max_samples;

if (topic_att.historyQos.kind != KEEP_ALL_HISTORY_QOS)
{
max_samples = topic_att.historyQos.depth;
if (topic_att.getTopicKind() != NO_KEY)
{
max_samples *= topic_att.resourceLimitsQos.max_instances;
}

initial_samples = std::min(initial_samples, max_samples);
}

return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples);
}

SubscriberHistory::SubscriberHistory(
const TopicAttributes& topic_att,
TopicDataType* type,
const ReaderQos& qos,
uint32_t payloadMaxSize,
MemoryManagementPolicy_t mempolicy)
: ReaderHistory(HistoryAttributes(mempolicy, payloadMaxSize,
topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ?
topic_att.resourceLimitsQos.allocated_samples :
topic_att.getTopicKind() == NO_KEY ?
std::min(topic_att.resourceLimitsQos.allocated_samples, topic_att.historyQos.depth) :
std::min(topic_att.resourceLimitsQos.allocated_samples, topic_att.historyQos.depth
* topic_att.resourceLimitsQos.max_instances),
topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ?
topic_att.resourceLimitsQos.max_samples :
topic_att.getTopicKind() == NO_KEY ?
topic_att.historyQos.depth :
topic_att.historyQos.depth * topic_att.resourceLimitsQos.max_instances))
: ReaderHistory(to_history_attributes(topic_att, payloadMaxSize, mempolicy))
, history_qos_(topic_att.historyQos)
, resource_limited_qos_(topic_att.resourceLimitsQos)
, topic_att_(topic_att)
Expand Down Expand Up @@ -220,7 +231,7 @@ bool SubscriberHistory::add_received_change(

if (add_change(a_change))
{
if (m_changes.size() == static_cast<size_t>(resource_limited_qos_.max_samples) )
if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
{
m_isHistoryFull = true;
}
Expand Down Expand Up @@ -248,7 +259,7 @@ bool SubscriberHistory::add_received_change_with_key(

if (add_change(a_change))
{
if (m_changes.size() == static_cast<size_t>(resource_limited_qos_.max_samples))
if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
{
m_isHistoryFull = true;
}
Expand Down Expand Up @@ -280,7 +291,7 @@ bool SubscriberHistory::find_key_for_change(
bool is_key_protected = false;
#if HAVE_SECURITY
is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected;
#endif
#endif // if HAVE_SECURITY
if (!type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected))
{
return false;
Expand Down Expand Up @@ -314,13 +325,13 @@ bool SubscriberHistory::deserialize_change(
if (info != nullptr)
{
if (topic_att_.topicKind == WITH_KEY &&
change->instanceHandle == c_InstanceHandle_Unknown &&
change->kind == ALIVE)
change->instanceHandle == c_InstanceHandle_Unknown &&
change->kind == ALIVE)
{
bool is_key_protected = false;
#if HAVE_SECURITY
is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected;
#endif
#endif // if HAVE_SECURITY
type_->getKey(data, &change->instanceHandle, is_key_protected);
}

Expand Down Expand Up @@ -534,9 +545,9 @@ bool SubscriberHistory::get_next_deadline(
[](
const std::pair<InstanceHandle_t, KeyedChanges>& lhs,
const std::pair<InstanceHandle_t, KeyedChanges>& rhs)
{
return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
});
{
return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
});
handle = min->first;
next_deadline_us = min->second.next_deadline_us;
return true;
Expand Down
17 changes: 16 additions & 1 deletion test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class PubSubReader
return total_msgs_;
}

void startReception(
eprosima::fastrtps::rtps::SequenceNumber_t startReception(
std::list<type>& msgs)
{
mutex_.lock();
Expand All @@ -395,6 +395,7 @@ class PubSubReader
while (ret);

receiving_.store(true);
return last_seq;
}

void stopReception()
Expand All @@ -410,6 +411,15 @@ class PubSubReader
});
}

void block_for_seq(
eprosima::fastrtps::rtps::SequenceNumber_t seq)
{
block([this, seq]() -> bool
{
return last_seq == seq;
});
}

size_t block_for_at_least(
size_t at_least)
{
Expand Down Expand Up @@ -598,6 +608,11 @@ class PubSubReader
return current_received_count_;
}

eprosima::fastrtps::rtps::SequenceNumber_t get_last_sequence_received()
{
return last_seq;
}

PubSubReader& deactivate_status_listener(
eprosima::fastdds::dds::StatusMask mask)
{
Expand Down
Loading

0 comments on commit ae85f36

Please sign in to comment.