Skip to content

Commit

Permalink
SIGBUS in boost::interprocess when /dev/shm is full (eProsima#1248)
Browse files Browse the repository at this point in the history
* Refs #8593. Fix boost::interprocess by using fallocate() instead ftruncate().

Signed-off-by: AdolfoMartinez <[email protected]>

* Refs #8593. Regression test.

Signed-off-by: AdolfoMartinez <[email protected]>

* Refs #8593. Uncrustify.

Signed-off-by: AdolfoMartinez <[email protected]>
  • Loading branch information
adolfomarver authored Jul 7, 2020
1 parent 3cba049 commit 38a4a5e
Show file tree
Hide file tree
Showing 4 changed files with 531 additions and 343 deletions.
106 changes: 65 additions & 41 deletions src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SharedMemManager
private:

struct BufferNode
{
{
struct Status
{
// When buffers are enqued in a port this validity_id is copied to the BufferDescriptor in the port.
Expand All @@ -50,23 +50,23 @@ class SharedMemManager
// This counter is incremented each time the buffer is enqueued in a port, an decremented when pop
// from the port to be processed
uint64_t enqueued_count : 20;
// When listener processes start processing this buffer increments the processing_count. This way
// When listener processes start processing this buffer increments the processing_count. This way
// the sender can know whether the buffer is been processed or is just only enqueued in some ports.
uint64_t processing_count : 20;
};

std::atomic<Status> status;
uint32_t data_size;
SharedMemSegment::Offset data_offset;

/**
* Atomically invalidates a buffer.
*/
inline void invalidate_buffer()
{
auto s = status.load(std::memory_order_relaxed);
while (!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id+1, (uint64_t)0u, (uint64_t)0u },
{ (uint64_t)s.validity_id + 1, (uint64_t)0u, (uint64_t)0u },
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -83,7 +83,7 @@ class SharedMemManager
auto s = status.load(std::memory_order_relaxed);
while (listener_validity_id == s.validity_id &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id+1, (uint64_t)0u, (uint64_t)0u },
{ (uint64_t)s.validity_id + 1, (uint64_t)0u, (uint64_t)0u },
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -102,7 +102,7 @@ class SharedMemManager
// If the buffer is not beeing processed by any listener => is invalidated
while (s.processing_count == 0 &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id+1, (uint64_t)0u, (uint64_t)0u},
{ (uint64_t)s.validity_id + 1, (uint64_t)0u, (uint64_t)0u},
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -119,7 +119,7 @@ class SharedMemManager
{
return (status.load(std::memory_order_relaxed).validity_id == listener_validity_id);
}

/**
* Atomically decrease enqueued count & increase the buffer processing counts, only, if the buffer is valid.
* @return true when succedded, false when the buffer has been invalidated.
Expand All @@ -130,7 +130,7 @@ class SharedMemManager
auto s = status.load(std::memory_order_relaxed);
while (listener_validity_id == s.validity_id &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count-1, (uint64_t)s.processing_count+1 },
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count - 1, (uint64_t)s.processing_count + 1 },
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -149,7 +149,7 @@ class SharedMemManager
auto s = status.load(std::memory_order_relaxed);
while (listener_validity_id == s.validity_id &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count, (uint64_t)s.processing_count+1 },
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count, (uint64_t)s.processing_count + 1 },
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -168,7 +168,7 @@ class SharedMemManager
auto s = status.load(std::memory_order_relaxed);
while (listener_validity_id == s.validity_id &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count+1, (uint64_t)s.processing_count },
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count + 1, (uint64_t)s.processing_count },
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -187,7 +187,7 @@ class SharedMemManager
auto s = status.load(std::memory_order_relaxed);
while (listener_validity_id == s.validity_id &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count-1, (uint64_t)s.processing_count },
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count - 1, (uint64_t)s.processing_count },
std::memory_order_release,
std::memory_order_relaxed))
{
Expand All @@ -212,14 +212,15 @@ class SharedMemManager
auto s = status.load(std::memory_order_relaxed);
while (listener_validity_id == s.validity_id &&
!status.compare_exchange_weak(s,
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count, (uint64_t)s.processing_count-1 },
{ (uint64_t)s.validity_id, (uint64_t)s.enqueued_count, (uint64_t)s.processing_count - 1 },
std::memory_order_release,
std::memory_order_relaxed))
{
}

return (listener_validity_id == s.validity_id);
}
}

};

public:
Expand All @@ -241,7 +242,7 @@ class SharedMemManager

per_allocation_extra_size_ =
SharedMemSegment::compute_per_allocation_extra_size(std::alignment_of<BufferNode>::value,
domain_name);
domain_name);
}

class Buffer
Expand Down Expand Up @@ -303,12 +304,14 @@ class SharedMemManager
return original_validity_id_;
}

void inc_enqueued_count(uint32_t validity_id)
void inc_enqueued_count(
uint32_t validity_id)
{
buffer_node_->inc_enqueued_count(validity_id);
}

void dec_enqueued_count(uint32_t validity_id)
void dec_enqueued_count(
uint32_t validity_id)
{
buffer_node_->dec_enqueued_count(validity_id);
}
Expand Down Expand Up @@ -362,7 +365,7 @@ class SharedMemManager
(boost::interprocess::anonymous_instance)[max_allocations]();

// All buffer nodes are free
for (uint32_t i = 0; i<max_allocations; i++)
for (uint32_t i = 0; i < max_allocations; i++)
{
buffers_nodes[i].status.exchange({0, 0, 0});
buffers_nodes[i].data_size = 0;
Expand Down Expand Up @@ -417,16 +420,16 @@ class SharedMemManager
buffer_node = pop_free_node();

data = segment_->get().allocate(size);
free_bytes_-= size;
free_bytes_ -= size;

buffer_node->data_offset = segment_->get_offset_from_address(data);
buffer_node->data_size = size;

auto validity_id = buffer_node->status.load(std::memory_order_relaxed).validity_id;

// TODO(Adolfo) : Dynamic allocation. Use foonathan to convert it to static allocation
new_buffer = std::make_shared<SharedMemBuffer>(segment_, segment_id_, buffer_node,
static_cast<uint32_t>(validity_id));
new_buffer = std::make_shared<SharedMemBuffer>(segment_, segment_id_, buffer_node,
static_cast<uint32_t>(validity_id));

if (new_buffer)
{
Expand All @@ -436,7 +439,7 @@ class SharedMemManager
{
throw std::runtime_error("alloc_buffer: out of memory");
}

// TODO(Adolfo) : Dynamic allocation. Use foonathan to convert it to static allocation
allocated_buffers_.push_back(buffer_node);
}
Expand Down Expand Up @@ -521,7 +524,7 @@ class SharedMemManager

void release_buffer(
BufferNode* buffer_node)
{
{
segment_->get().deallocate(
segment_->get_address_from_offset(buffer_node->data_offset));

Expand All @@ -533,11 +536,12 @@ class SharedMemManager
* processed by any listener (until enough free bytes is gathered to solve the overflow).
* @return true if at least required_data_size bytes are free after the recovery, false otherwise.
*/
bool recover_buffers(uint32_t required_data_size)
bool recover_buffers(
uint32_t required_data_size)
{
auto it = allocated_buffers_.begin();

while(it != allocated_buffers_.end())
while (it != allocated_buffers_.end())
{
// There is enough space to allocate the buffer
if (free_bytes_ >= required_data_size)
Expand Down Expand Up @@ -600,13 +604,13 @@ class SharedMemManager

~Listener()
{
if(global_port_)
if (global_port_)
{
try
{
global_port_->unregister_listener(&global_listener_, listener_index_);
}
catch(const std::exception& e)
catch (const std::exception& e)
{
logWarning(RTPS_TRANSPORT_SHM, e.what());
}
Expand Down Expand Up @@ -641,7 +645,7 @@ class SharedMemManager

try
{
while(!is_buffer_valid)
while (!is_buffer_valid)
{
bool was_cell_freed;

Expand All @@ -668,7 +672,8 @@ class SharedMemManager

auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id);
auto buffer_node =
static_cast<BufferNode*>(segment->get_address_from_offset(buffer_descriptor.buffer_node_offset));
static_cast<BufferNode*>(segment->get_address_from_offset(buffer_descriptor.
buffer_node_offset));

// TODO(Adolfo) : Dynamic allocation. Use foonathan to convert it to static allocation
buffer_ref = std::make_shared<SharedMemBuffer>(segment, buffer_descriptor.source_segment_id,
Expand All @@ -682,7 +687,7 @@ class SharedMemManager
{
if (was_cell_freed)
{
// Atomically increase processing & decrease enqueued
// Atomically increase processing & decrease enqueued
is_buffer_valid = buffer_node->dec_enqueued_inc_processing_counts(
buffer_descriptor.validity_id);
}
Expand All @@ -697,7 +702,7 @@ class SharedMemManager
{
buffer_node->dec_enqueued_count(buffer_descriptor.validity_id);
}

throw std::runtime_error("pop() : out of memory");
}
}
Expand Down Expand Up @@ -837,7 +842,7 @@ class SharedMemManager
private:

void regenerate_port()
{
{
auto new_port = shared_mem_manager_->regenerate_port(global_port_, open_mode_);

*this = std::move(*new_port);
Expand All @@ -860,15 +865,27 @@ class SharedMemManager
std::shared_ptr<Segment> create_segment(
uint32_t size,
uint32_t max_allocations)
{
return std::make_shared<Segment>(size + segment_allocation_extra_size(max_allocations), size, max_allocations,
global_segment_.domain_name());
}

/**
* Computes the segment's extra size needed to store allocator internal structures
* @param in max_allocations The maximum buffer allocations supported.
* @return the extra size in bytes.
*/
uint32_t segment_allocation_extra_size(
uint32_t max_allocations) const
{
// Every buffer allocation of 'n-bytes', consumes an extra 'per_allocation_extra_size_' bytes.
// This is due to the allocator internal structures (also residing in the shared-memory segment)
// used to manage the allocation algorithm.
// used to manage the allocation algorithm.
// So with an estimation of 'max_allocations' user buffers, the total segment extra size is computed.
uint32_t allocation_extra_size = (max_allocations * sizeof(BufferNode)) + per_allocation_extra_size_ +
max_allocations * per_allocation_extra_size_;
max_allocations * per_allocation_extra_size_;

return std::make_shared<Segment>(size + allocation_extra_size, size, max_allocations, global_segment_.domain_name());
return allocation_extra_size;
}

std::shared_ptr<Port> open_port(
Expand All @@ -885,7 +902,8 @@ class SharedMemManager
/**
* Remove a port from the system.
*/
void remove_port(uint32_t port_id)
void remove_port(
uint32_t port_id)
{
global_segment_.remove_port(port_id);
}
Expand All @@ -901,7 +919,9 @@ class SharedMemManager

private:

std::shared_ptr<Port> regenerate_port(std::shared_ptr<SharedMemGlobal::Port> port, SharedMemGlobal::Port::OpenMode open_mode)
std::shared_ptr<Port> regenerate_port(
std::shared_ptr<SharedMemGlobal::Port> port,
SharedMemGlobal::Port::OpenMode open_mode)
{
return std::make_shared<Port>(this, global_segment_.regenerate_port(port, open_mode), open_mode);
}
Expand All @@ -925,7 +945,7 @@ class SharedMemManager
{
}

SegmentWrapper& operator=(
SegmentWrapper& operator =(
SegmentWrapper&& other)
{
segment_ = other.segment_;
Expand All @@ -936,7 +956,10 @@ class SharedMemManager
return *this;
}

std::shared_ptr<SharedMemSegment> segment() { return segment_; }
std::shared_ptr<SharedMemSegment> segment()
{
return segment_;
}

private:

Expand All @@ -959,7 +982,7 @@ class SharedMemManager

std::shared_ptr<SharedMemSegment> segment;

// TODO (Adolfo): Garbage collector for opened but unused segments????
// TODO(Adolfo): Garbage collector for opened but unused segments????

try
{
Expand All @@ -977,6 +1000,7 @@ class SharedMemManager

return segment;
}

};

} // namespace rtps
Expand Down
Loading

0 comments on commit 38a4a5e

Please sign in to comment.