Skip to content

Commit

Permalink
Refs eProsima#2013. Removed history record on ReaderHistory
Browse files Browse the repository at this point in the history
  • Loading branch information
richiprosima committed May 3, 2017
1 parent 42b4408 commit 26ddec3
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 87 deletions.
13 changes: 0 additions & 13 deletions include/fastrtps/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

#include "History.h"
#include "../common/CacheChange.h"
#include <map>
#include <set>
#include <fastrtps/utils/Semaphore.h>

namespace eprosima {
Expand Down Expand Up @@ -96,22 +94,11 @@ class ReaderHistory : public History {
//!Wait for the semaphore
RTPS_DllAPI void waitSemaphore();

RTPS_DllAPI bool thereIsRecordOf(GUID_t& guid, SequenceNumber_t& seq);

RTPS_DllAPI bool thereIsUpperRecordOf(GUID_t& guid, SequenceNumber_t& seq);

protected:
//!Pointer to the reader
RTPSReader* mp_reader;
//!Pointer to the semaphore, used to halt execution until new message arrives.
Semaphore* mp_semaphore;
//!Information about changes already in History
private:
std::map<GUID_t, std::set<SequenceNumber_t>> m_historyRecord;//TODO sustituir por una clase que sea más efectiva,
//que no guarde todos los numeros de secuencia rebidiso sino los que falten
//
std::set<SequenceNumber_t>* m_cachedRecordLocation;
GUID_t m_cachedGUID;
};

}
Expand Down
9 changes: 7 additions & 2 deletions include/fastrtps/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
#define STATELESSREADER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <mutex>

#include "RTPSReader.h"

#include <mutex>
#include <map>

namespace eprosima {
namespace fastrtps{
namespace rtps {
Expand Down Expand Up @@ -140,10 +141,14 @@ class StatelessReader: public RTPSReader

bool acceptMsgFrom(GUID_t& entityId);

bool thereIsUpperRecordOf(GUID_t& guid, SequenceNumber_t& seq);

//!List of GUID_t os matched writers.
//!Is only used in the Discovery, to correctly notify the user using SubscriptionListener::onSubscriptionMatched();
std::vector<RemoteWriterAttributes> m_matched_writers;

//!Information about changes already in History
std::map<GUID_t, SequenceNumber_t> m_historyRecord;
};

}
Expand Down
2 changes: 2 additions & 0 deletions include/fastrtps/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ namespace eprosima
*/
SequenceNumber_t nextCacheChangeToBeNotified();

bool change_was_received(const SequenceNumber_t& seq_num);

private:

/*!
Expand Down
67 changes: 6 additions & 61 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,8 @@ inline bool sort_ReaderHistoryCache(CacheChange_t*c1, CacheChange_t*c2)
ReaderHistory::ReaderHistory(const HistoryAttributes& att):
History(att),
mp_reader(nullptr),
mp_semaphore(new Semaphore(0)),
m_cachedRecordLocation(nullptr),
m_cachedGUID()

mp_semaphore(new Semaphore(0))
{

}

ReaderHistory::~ReaderHistory()
Expand All @@ -62,22 +58,6 @@ bool ReaderHistory::received_change(CacheChange_t* change, size_t)
return add_change(change);
}

// TODO(Javier) Maybe a mechanism to check if a change has to be stored should be outside, using WriterProxy info.
static void CleanSequentials(std::set<SequenceNumber_t>& set, size_t maximum)
{
auto end = set.end();
auto set_it = set.begin();
if (set_it == end)
return;

while ( next(set_it) != end &&
((*next(set_it) == (*set_it + 1) ||
(set.size() > maximum && *set_it == SequenceNumber_t()))))
set_it++;

set.erase(set.begin(), set_it);
}

bool ReaderHistory::add_change(CacheChange_t* a_change)
{

Expand All @@ -101,29 +81,12 @@ bool ReaderHistory::add_change(CacheChange_t* a_change)
logError(RTPS_HISTORY,"The Writer GUID_t must be defined");
}

m_changes.push_back(a_change);
sortCacheChanges();
updateMaxMinSeqNum();
logInfo(RTPS_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");

if (a_change->writerGUID != m_cachedGUID || !m_cachedRecordLocation)
{
m_cachedRecordLocation = &m_historyRecord[a_change->writerGUID];
if (m_cachedRecordLocation->empty())
m_cachedRecordLocation->insert(SequenceNumber_t());
m_cachedGUID = a_change->writerGUID;
}

if(*m_cachedRecordLocation->begin() < a_change->sequenceNumber && m_cachedRecordLocation->insert(a_change->sequenceNumber).second)
{
m_changes.push_back(a_change);
sortCacheChanges();
updateMaxMinSeqNum();
logInfo(RTPS_HISTORY, "Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");

CleanSequentials(*m_cachedRecordLocation, m_att.maximumReservedCaches);

return true;
}

logInfo(RTPS_HISTORY, "Change "<< a_change->sequenceNumber << " from "<< a_change->writerGUID << " not added.");
return false;
return true;
}

bool ReaderHistory::remove_change(CacheChange_t* a_change)
Expand Down Expand Up @@ -228,24 +191,6 @@ void ReaderHistory::waitSemaphore() //TODO CAMBIAR NOMBRE PARA que el usuario se
return mp_semaphore->wait();
}

bool ReaderHistory::thereIsRecordOf(GUID_t& guid, SequenceNumber_t& seq)
{
std::lock_guard<std::recursive_mutex> guard(*mp_mutex);
if (guid == m_cachedGUID)
return m_cachedRecordLocation->find(seq) != m_cachedRecordLocation->end();

return m_historyRecord.find(guid) != m_historyRecord.end() && m_historyRecord[guid].find(seq) != m_historyRecord[guid].end();
}

bool ReaderHistory::thereIsUpperRecordOf(GUID_t& guid, SequenceNumber_t& seq)
{
std::lock_guard<std::recursive_mutex> guard(*mp_mutex);
if (guid == m_cachedGUID)
return m_cachedRecordLocation->upper_bound(seq) != m_cachedRecordLocation->end();

return m_historyRecord.find(guid) != m_historyRecord.end() && m_historyRecord[guid].upper_bound(seq) != m_historyRecord[guid].end();
}

}
} /* namespace rtps */
} /* namespace eprosima */
7 changes: 3 additions & 4 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ bool StatefulReader::processDataFragMsg(CacheChange_t *incomingChange, uint32_t
if(acceptMsgFrom(incomingChange->writerGUID, &pWP))
{
// Check if CacheChange was received.
if(!getHistory()->thereIsRecordOf(incomingChange->writerGUID, incomingChange->sequenceNumber))
if(!pWP->change_was_received(incomingChange->sequenceNumber))
{
logInfo(RTPS_MSG_IN, IDSTRING"Trying to add fragment " << incomingChange->sequenceNumber.to64long() << " TO reader: " << getGuid().entityId);

Expand Down Expand Up @@ -486,10 +486,9 @@ bool StatefulReader::change_received(CacheChange_t* a_change, WriterProxy* prox,

size_t unknown_missing_changes_up_to = prox->unknown_missing_changes_up_to(a_change->sequenceNumber);

// TODO Check order
if(this->mp_history->received_change(a_change, unknown_missing_changes_up_to))
if(prox->received_change_set(a_change->sequenceNumber))
{
if(prox->received_change_set(a_change->sequenceNumber))
if(this->mp_history->received_change(a_change, unknown_missing_changes_up_to))
{
GUID_t proxGUID = prox->m_att.guid;
writerProxyLock.unlock();
Expand Down
13 changes: 11 additions & 2 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ bool StatelessReader::matched_writer_remove(RemoteWriterAttributes& wdata)
{
logInfo(RTPS_READER,"Writer " <<wdata.guid<< " removed from "<<m_guid.entityId);
m_matched_writers.erase(it);
m_historyRecord.erase(wdata.guid);
return true;
}
}
Expand All @@ -95,10 +96,12 @@ bool StatelessReader::change_received(CacheChange_t* change, std::unique_lock<st
{
// Only make visible the change if there is not other with bigger sequence number.
// TODO Revisar si no hay que incluirlo.
if(!mp_history->thereIsUpperRecordOf(change->writerGUID, change->sequenceNumber))
if(!thereIsUpperRecordOf(change->writerGUID, change->sequenceNumber))
{
if(mp_history->received_change(change, 0))
{
m_historyRecord[change->writerGUID] = change->sequenceNumber;

if(getListener() != nullptr)
{
lock.unlock();
Expand Down Expand Up @@ -223,7 +226,7 @@ bool StatelessReader::processDataFragMsg(CacheChange_t *incomingChange, uint32_t
if (acceptMsgFrom(incomingChange->writerGUID))
{
// Check if CacheChange was received.
if(!getHistory()->thereIsRecordOf(incomingChange->writerGUID, incomingChange->sequenceNumber))
if(!thereIsUpperRecordOf(incomingChange->writerGUID, incomingChange->sequenceNumber))
{
logInfo(RTPS_MSG_IN, IDSTRING"Trying to add fragment " << incomingChange->sequenceNumber.to64long() << " TO reader: " << getGuid().entityId);

Expand Down Expand Up @@ -318,3 +321,9 @@ bool StatelessReader::acceptMsgFrom(GUID_t& writerId)

return false;
}

bool StatelessReader::thereIsUpperRecordOf(GUID_t& guid, SequenceNumber_t& seq)
{
std::lock_guard<std::recursive_mutex> guard(*mp_mutex);
return m_historyRecord.find(guid) != m_historyRecord.end() && m_historyRecord[guid] >= seq;
}
29 changes: 24 additions & 5 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,22 @@ bool WriterProxy::received_change_set(const SequenceNumber_t& seqNum, bool is_re

if(chit != m_changesFromW.begin())
{
ChangeFromWriter_t newch(*chit);
newch.setStatus(RECEIVED);
newch.setRelevance(is_relevance);
if(chit->getStatus() != RECEIVED)
{
ChangeFromWriter_t newch(*chit);
newch.setStatus(RECEIVED);
newch.setRelevance(is_relevance);

auto hint = m_changesFromW.erase(chit);
auto hint = m_changesFromW.erase(chit);

m_changesFromW.insert(hint, newch);
m_changesFromW.insert(hint, newch);
}
else
return false;
}
else
{
assert(chit->getStatus() != RECEIVED);
changesFromWLowMark_ = seqNum;
m_changesFromW.erase(chit);
cleanup();
Expand Down Expand Up @@ -323,7 +329,20 @@ const std::vector<ChangeFromWriter_t> WriterProxy::missing_changes()
return returnedValue;
}

bool WriterProxy::change_was_received(const SequenceNumber_t& seq_num)
{
std::lock_guard<std::recursive_mutex> guard(*mp_mutex);

if(seq_num <= changesFromWLowMark_)
return true;

auto chit = m_changesFromW.find(ChangeFromWriter_t(seq_num));

if(chit != m_changesFromW.end() && chit->getStatus() == RECEIVED)
return true;

return false;
}

const SequenceNumber_t WriterProxy::available_changes_max() const
{
Expand Down

0 comments on commit 26ddec3

Please sign in to comment.