Skip to content

Commit

Permalink
Fix datarace on WriterProxy stop while TimedEvent being triggered (eP…
Browse files Browse the repository at this point in the history
…rosima#3097)

* Revert "Revert eProsima#3046 (eProsima#3083)"

This reverts commit a374872.

* Add regression test

Signed-off-by: Juan López Fernández <[email protected]>

* New writer proxies manipulation mutex

Signed-off-by: Juan López Fernández <[email protected]>

* Revert "New writer proxies manipulation mutex"

This reverts commit b18a5272dd8f7d2bc1f01eda0134b8d572ea86a1.

* Unlock reader's mutex before stopping WriterProxy

Signed-off-by: Juan López Fernández <[email protected]>

* Refs #16341. Fix link error on unit test

Signed-off-by: Miguel Company <[email protected]>

Signed-off-by: Juan López Fernández <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Miguel Company <[email protected]>
  • Loading branch information
juanlofer-eprosima and MiguelCompany authored Dec 13, 2022
1 parent 5ebca9e commit edf8ef7
Show file tree
Hide file tree
Showing 15 changed files with 374 additions and 33 deletions.
6 changes: 6 additions & 0 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ class StatefulReader : public RTPSReader
return mp_RTPSParticipant;
}

/**
* Get reference to associated RTPS partiicipant's \c ResourceEvent
* @return Reference to associated RTPS partiicipant's \c ResourceEvent
*/
ResourceEvent& getEventResource() const;

/**
* Read the next unread CacheChange_t from the history
* @param change Pointer to pointer of CacheChange_t
Expand Down
6 changes: 6 additions & 0 deletions include/fastdds/rtps/resources/TimedEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ class TimedEvent
void restart_timer(
const std::chrono::steady_clock::time_point& timeout);

/*!
* @brief Unregisters the event, sets its state to INACTIVE, and re-registers it.
* It may be seen as a blocking version of \c cancel_timer
*/
void recreate_timer();

/**
* Update event interval.
* When updating the interval, the timer is not restarted and the new interval will only be used the next time you call restart_timer().
Expand Down
21 changes: 19 additions & 2 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ bool StatefulReader::matched_writer_add(
EPROSIMA_LOG_ERROR(RTPS_READER, "Failed to add Writer Proxy " << wdata.guid()
<< " to " << this->m_guid.entityId
<< " with data sharing.");
wp->stop();
{
// Release reader's lock to avoid deadlock when waiting for event (requiring mutex) to finish
guard.unlock();
assert(!guard.owns_lock());
wp->stop();
guard.lock();
}
matched_writers_pool_.push_back(wp);
return false;
}
Expand Down Expand Up @@ -389,7 +395,13 @@ bool StatefulReader::matched_writer_remove(
(void)removed_from_listener;
remove_changes_from(writer_guid, true);
}
wproxy->stop();
{
// Release reader's lock to avoid deadlock when waiting for event (requiring mutex) to finish
lock.unlock();
assert(!lock.owns_lock());
wproxy->stop();
lock.lock();
}
matched_writers_pool_.push_back(wproxy);
if (nullptr != mp_listener)
{
Expand Down Expand Up @@ -1221,6 +1233,11 @@ void StatefulReader::remove_changes_from(
}
}

ResourceEvent& StatefulReader::getEventResource() const
{
return mp_RTPSParticipant->getEventResource();
}

bool StatefulReader::nextUntakenCache(
CacheChange_t** change,
WriterProxy** wpout)
Expand Down
39 changes: 36 additions & 3 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ WriterProxy::WriterProxy(
, locators_entry_(loc_alloc.max_unicast_locators, loc_alloc.max_multicast_locators)
, is_datasharing_writer_(false)
, received_at_least_one_heartbeat_(false)
, state_(StateCode::STOPPED)
{
//Create Events
ResourceEvent& event_manager = reader_->getRTPSParticipant()->getEventResource();
ResourceEvent& event_manager = reader_->getEventResource();
auto heartbeat_lambda = [this]() -> bool
{
perform_heartbeat_response();
Expand Down Expand Up @@ -144,6 +145,7 @@ void WriterProxy::start(
filter_remote_locators(locators_entry_,
reader_->getAttributes().external_unicast_locators, reader_->getAttributes().ignore_non_matching_locators);
is_datasharing_writer_ = is_datasharing;
state_.store(StateCode::IDLE);
initial_acknack_->restart_timer();
loaded_from_storage(initial_sequence);
received_at_least_one_heartbeat_ = false;
Expand All @@ -168,7 +170,18 @@ void WriterProxy::update(

void WriterProxy::stop()
{
initial_acknack_->cancel_timer();
StateCode prev_code;
if ((prev_code = state_.exchange(StateCode::STOPPED)) == StateCode::BUSY)
{
// TimedEvent being performed, wait for it to finish.
// It does not matter which of the two events is the one on execution, but we must wait on initial_acknack_ as
// it could be restarted if only cancelled while its callback is being triggered.
initial_acknack_->recreate_timer();
}
else
{
initial_acknack_->cancel_timer();
}
heartbeat_response_->cancel_timer();

clear();
Expand Down Expand Up @@ -477,6 +490,13 @@ bool WriterProxy::perform_initial_ack_nack()
{
bool ret_value = false;

StateCode expected = StateCode::IDLE;
if (!state_.compare_exchange_strong(expected, StateCode::BUSY))
{
// Stopped from another thread -> abort
return ret_value;
}

if (!is_datasharing_writer_)
{
// Send initial NACK.
Expand All @@ -500,12 +520,25 @@ bool WriterProxy::perform_initial_ack_nack()
}
}

expected = StateCode::BUSY;
state_.compare_exchange_strong(expected, StateCode::IDLE);

return ret_value;
}

void WriterProxy::perform_heartbeat_response()
{
StateCode expected = StateCode::IDLE;
if (!state_.compare_exchange_strong(expected, StateCode::BUSY))
{
// Stopped from another thread -> abort
return;
}

reader_->send_acknack(this, this, heartbeat_final_flag_.load());

expected = StateCode::BUSY;
state_.compare_exchange_strong(expected, StateCode::IDLE);
}

bool WriterProxy::process_heartbeat(
Expand All @@ -523,7 +556,7 @@ bool WriterProxy::process_heartbeat(
#endif // SHOULD_DEBUG_LINUX

assert_liveliness = false;
if (last_heartbeat_count_ < count)
if (state_ != StateCode::STOPPED && last_heartbeat_count_ < count)
{
// If it is the first heartbeat message, we can try to cancel initial ack.
// TODO: This timer cancelling should be checked if needed with the liveliness implementation.
Expand Down
9 changes: 9 additions & 0 deletions src/cpp/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ class WriterProxy : public RTPSMessageSenderInterface

private:

enum StateCode
{
IDLE = 0, //! Writer Proxy is not performing any critical operations.
BUSY, //! Writer Proxy is performing a critical operation. Some actions (e.g. stop) should wait for its completion.
STOPPED, //! Writer Proxy has been requested to \c stop.
};

/**
* Set initial value for last acked sequence number.
* @param[in] seq_num last acked sequence number.
Expand Down Expand Up @@ -408,6 +415,8 @@ class WriterProxy : public RTPSMessageSenderInterface
bool is_datasharing_writer_;
//! Wether at least one heartbeat was recevied.
bool received_at_least_one_heartbeat_;
//! Current state of this Writer Proxy
std::atomic<StateCode> state_;

using ChangeIterator = decltype(changes_received_)::iterator;

Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/resources/TimedEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ void TimedEvent::restart_timer(
}
}

void TimedEvent::recreate_timer()
{
service_.unregister_timer(impl_);
impl_->go_cancel();
service_.register_timer(impl_);
}

bool TimedEvent::update_interval(
const Duration_t& inter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef _FASTDDS_RTPS_READER_STATEFULREADER_H_
#define _FASTDDS_RTPS_READER_STATEFULREADER_H_

#include <fastdds/rtps/resources/ResourceEvent.h>
#include <fastrtps/rtps/reader/RTPSReader.h>
#include <fastrtps/rtps/attributes/ReaderAttributes.h>
#include <fastrtps/rtps/common/Guid.h>
Expand All @@ -33,7 +34,11 @@ class StatefulReader : public RTPSReader
{
public:

StatefulReader() = default;
StatefulReader()
{
ON_CALL(*this, getEventResource())
.WillByDefault(::testing::ReturnRef(service_));
}

StatefulReader(
ReaderHistory* history,
Expand Down Expand Up @@ -88,6 +93,8 @@ class StatefulReader : public RTPSReader
return nullptr;
}

MOCK_METHOD0(getEventResource, ResourceEvent & ());

bool send_sync_nts(
CDRMessage_t* /*message*/,
const LocatorsIterator& /*destination_locators_begin*/,
Expand All @@ -100,6 +107,7 @@ class StatefulReader : public RTPSReader
private:

ReaderTimes times_;
ResourceEvent service_;
};

} // namespace rtps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TimedEvent
MOCK_METHOD0(restart_timer, void());
MOCK_METHOD1(restart_timer, void(const std::chrono::steady_clock::time_point& timeout));
MOCK_METHOD0(cancel_timer, void());
MOCK_METHOD0(recreate_timer, void());
MOCK_METHOD1(update_interval, bool(const Duration_t&));
MOCK_METHOD1(update_interval_millisec, bool(double));
};
Expand Down
1 change: 1 addition & 0 deletions test/unittest/rtps/history/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ target_compile_definitions(ReaderHistoryTests PRIVATE FASTRTPS_NO_LIB
)
target_include_directories(ReaderHistoryTests PRIVATE
${PROJECT_SOURCE_DIR}/test/mock/rtps/Endpoint
${PROJECT_SOURCE_DIR}/test/mock/rtps/ResourceEvent
${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSReader
${PROJECT_SOURCE_DIR}/test/mock/rtps/StatefulReader
${PROJECT_SOURCE_DIR}/src/cpp
Expand Down
59 changes: 57 additions & 2 deletions test/unittest/rtps/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.


###########################################################################
# WriterProxyTests
###########################################################################
set(WRITERPROXYTESTS_SOURCE WriterProxyTests.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp
${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp)
${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp
)

if(WIN32)
add_definitions(-D_WIN32_WINNT=0x0601)
Expand Down Expand Up @@ -53,4 +58,54 @@ add_gtest(WriterProxyTests SOURCES ${WRITERPROXYTESTS_SOURCE})

if(ANDROID)
set_property(TARGET WriterProxyTests PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&")
endif()
endif()


###########################################################################
# WriterProxyStopTest
###########################################################################
set(WRITERPROXYSTOPTEST_SOURCE WriterProxyStopTest.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/TimedEventImpl.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/TimedEvent.cpp
${PROJECT_SOURCE_DIR}/src/cpp/rtps/resources/ResourceEvent.cpp
${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp
${PROJECT_SOURCE_DIR}/src/cpp/utils/TimedConditionVariable.cpp
)

if(WIN32)
add_definitions(-D_WIN32_WINNT=0x0601)
endif()

add_executable(WriterProxyStopTest ${WRITERPROXYSTOPTEST_SOURCE})
target_compile_definitions(WriterProxyStopTest PRIVATE FASTRTPS_NO_LIB
$<$<AND:$<NOT:$<BOOL:${WIN32}>>,$<STREQUAL:"${CMAKE_BUILD_TYPE}","Debug">>:__DEBUG>
$<$<BOOL:${INTERNAL_DEBUG}>:__INTERNALDEBUG> # Internal debug activated.
)
target_include_directories(WriterProxyStopTest PRIVATE
${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader
${PROJECT_SOURCE_DIR}/test/mock/rtps/Endpoint
${PROJECT_SOURCE_DIR}/test/mock/rtps/ExternalLocatorsProcessor
${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSReader
${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSWriter
${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSParticipantImpl
${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSDomainImpl
${PROJECT_SOURCE_DIR}/test/mock/rtps/StatefulReader
${PROJECT_SOURCE_DIR}/test/mock/rtps/WriterProxyData
${PROJECT_SOURCE_DIR}/test/mock/dds/QosPolicies
${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include
${PROJECT_SOURCE_DIR}/src/cpp
)
target_link_libraries(WriterProxyStopTest foonathan_memory
GTest::gmock
${CMAKE_DL_LIBS})
add_gtest(WriterProxyStopTest SOURCES ${WRITERPROXYSTOPTEST_SOURCE})

if(ANDROID)
set_property(TARGET WriterProxyStopTest PROPERTY CROSSCOMPILING_EMULATOR "adb;shell;cd;${CMAKE_CURRENT_BINARY_DIR};&&")
endif()
Loading

0 comments on commit edf8ef7

Please sign in to comment.