Skip to content

Commit

Permalink
common/RefCountedObj: cleanup con/des
Browse files Browse the repository at this point in the history
Also, don't allow children to set nref (to 0). This is the more significant
change as it required fixing various code to not do this:

    <reftype> ptr = new RefCountedObjectFoo(..., 0);

as a way to create a starting reference with nref==1. This is a pretty
bad code smell so I've converted all the code doing this to use the new
factory method which produces the reference safely:

    auto ptr = ceph::make_ref<RefCountedObjectFoo>(...);

libradosstriper was particularly egregious in its abuse of setting the starting
nref. :(

Signed-off-by: Patrick Donnelly <[email protected]>
  • Loading branch information
batrick authored and tchaikov committed Sep 16, 2019
1 parent 487d344 commit 517bdca
Show file tree
Hide file tree
Showing 71 changed files with 861 additions and 979 deletions.
35 changes: 33 additions & 2 deletions src/common/RefCountedObj.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,27 @@

#include <atomic>

struct RefCountedObject {
/* This class provides mechanisms to make a sub-class work with
* boost::intrusive_ptr (aka ceph::ref_t).
*
* Generally, you'll want to inherit from RefCountedObjectSafe and not from
* RefCountedObject directly. This is because the ::get and ::put methods are
* public and can be used to create/delete references outside of the
* ceph::ref_t pointers with the potential to leak memory.
*
* It is also suggested that you make constructors and destructors private in
* your final class. This prevents instantiation of the object with assignment
* to a raw pointer. Consequently, you'll want to use ceph::make_ref<> to
* create a ceph::ref_t<> holding your object:
*
* auto ptr = ceph::make_ref<Foo>(...);
*
* Use FRIEND_MAKE_REF(ClassName) to allow ceph::make_ref to call the private
* constructors.
*
*/

class RefCountedObject {
public:
void set_cct(class CephContext *c) {
cct = c;
Expand All @@ -46,7 +66,7 @@ struct RefCountedObject {
RefCountedObject& operator=(const RefCountedObject& o) = delete;
RefCountedObject(RefCountedObject&&) = delete;
RefCountedObject& operator=(RefCountedObject&&) = delete;
RefCountedObject(class CephContext* c = nullptr, int n = 1) : cct(c), nref(n) {}
RefCountedObject(class CephContext* c) : cct(c) {}

virtual ~RefCountedObject();

Expand All @@ -62,6 +82,17 @@ struct RefCountedObject {
class CephContext *cct{nullptr};
};

class RefCountedObjectSafe : public RefCountedObject {
public:
RefCountedObject *get() = delete;
const RefCountedObject *get() const = delete;
void put() const = delete;
protected:
template<typename... Args>
RefCountedObjectSafe(Args&&... args) : RefCountedObject(std::forward<Args>(args)...) {}
virtual ~RefCountedObjectSafe() override {}
};

#ifndef WITH_SEASTAR

/**
Expand Down
18 changes: 9 additions & 9 deletions src/journal/Future.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@

namespace journal {

Future::Future() = default;
Future::Future(const Future& o) = default;
Future& Future::operator=(const Future& o) = default;
Future::Future(Future&& o) = default;
Future& Future::operator=(Future&& o) = default;
Future::Future(ceph::ref_t<FutureImpl> future_impl) : m_future_impl(std::move(future_impl)) {}
Future::~Future() = default;

void Future::flush(Context *on_safe) {
m_future_impl->flush(on_safe);
}
Expand All @@ -24,16 +32,8 @@ int Future::get_return_value() const {
return m_future_impl->get_return_value();
}

void intrusive_ptr_add_ref(FutureImpl *p) {
p->get();
}

void intrusive_ptr_release(FutureImpl *p) {
p->put();
}

std::ostream &operator<<(std::ostream &os, const Future &future) {
return os << *future.m_future_impl.get();
return os << *future.m_future_impl;
}

} // namespace journal
Expand Down
33 changes: 16 additions & 17 deletions src/journal/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
#ifndef CEPH_JOURNAL_FUTURE_H
#define CEPH_JOURNAL_FUTURE_H

#include "include/int_types.h"
#include <string>
#include <iosfwd>
#include <boost/intrusive_ptr.hpp>
#include <string>

#include "include/ceph_assert.h"
#include "include/int_types.h"
#include "common/ref.h"

class Context;

Expand All @@ -18,13 +19,16 @@ class FutureImpl;

class Future {
public:
typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;

Future() {}
Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {}

inline bool is_valid() const {
return m_future_impl.get() != nullptr;
Future();
Future(const Future&);
Future& operator=(const Future&);
Future(Future&&);
Future& operator=(Future&&);
Future(ceph::ref_t<FutureImpl> future_impl);
~Future();

bool is_valid() const {
return bool(m_future_impl);
}

void flush(Context *on_safe);
Expand All @@ -37,22 +41,17 @@ class Future {
friend class Journaler;
friend std::ostream& operator<<(std::ostream&, const Future&);

inline FutureImplPtr get_future_impl() const {
const auto& get_future_impl() const {
return m_future_impl;
}

FutureImplPtr m_future_impl;
ceph::ref_t<FutureImpl> m_future_impl;
};

void intrusive_ptr_add_ref(FutureImpl *p);
void intrusive_ptr_release(FutureImpl *p);

std::ostream &operator<<(std::ostream &os, const Future &future);

} // namespace journal

using journal::intrusive_ptr_add_ref;
using journal::intrusive_ptr_release;
using journal::operator<<;

#endif // CEPH_JOURNAL_FUTURE_H
33 changes: 13 additions & 20 deletions src/journal/FutureImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ namespace journal {

FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
: m_tag_tid(tag_tid),
m_entry_tid(entry_tid),
m_commit_tid(commit_tid),
m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
m_consistent_ack(this)
{
}

void FutureImpl::init(const FutureImplPtr &prev_future) {
void FutureImpl::init(const ceph::ref_t<FutureImpl> &prev_future) {
// chain ourself to the prior future (if any) to that we known when the
// journal is consistent
if (prev_future) {
Expand All @@ -30,7 +30,7 @@ void FutureImpl::flush(Context *on_safe) {

bool complete;
FlushHandlers flush_handlers;
FutureImplPtr prev_future;
ceph::ref_t<FutureImpl> prev_future;
{
std::lock_guard locker{m_lock};
complete = (m_safe && m_consistent);
Expand Down Expand Up @@ -60,20 +60,21 @@ void FutureImpl::flush(Context *on_safe) {
}
}

FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
std::lock_guard locker{m_lock};
return prepare_flush(flush_handlers, m_lock);
}

FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
ceph::ref_t<FutureImpl> FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
ceph::mutex &lock) {
ceph_assert(ceph_mutex_is_locked(m_lock));

if (m_flush_state == FLUSH_STATE_NONE) {
m_flush_state = FLUSH_STATE_REQUESTED;

if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
flush_handlers->insert({m_flush_handler, this});
auto h = m_flush_handler;
if (h) {
flush_handlers->try_emplace(std::move(h), this);
}
}
return m_prev_future;
Expand Down Expand Up @@ -103,10 +104,10 @@ int FutureImpl::get_return_value() const {
return m_return_value;
}

bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
bool FutureImpl::attach(FlushHandler::ref flush_handler) {
std::lock_guard locker{m_lock};
ceph_assert(!m_flush_handler);
m_flush_handler = flush_handler;
m_flush_handler = std::move(flush_handler);
return m_flush_state != FLUSH_STATE_NONE;
}

Expand Down Expand Up @@ -163,12 +164,4 @@ std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
return os;
}

void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
p->get();
}

void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
p->put();
}

} // namespace journal
55 changes: 23 additions & 32 deletions src/journal/FutureImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,21 @@
#include <list>
#include <map>
#include <boost/noncopyable.hpp>
#include <boost/intrusive_ptr.hpp>
#include "include/ceph_assert.h"

class Context;

namespace journal {

class FutureImpl;
typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;

class FutureImpl : public RefCountedObject, boost::noncopyable {
public:
struct FlushHandler {
virtual ~FlushHandler() {}
virtual void flush(const FutureImplPtr &future) = 0;
virtual void get() = 0;
virtual void put() = 0;
using ref = std::shared_ptr<FlushHandler>;
virtual void flush(const ceph::ref_t<FutureImpl> &future) = 0;
virtual ~FlushHandler() = default;
};
typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;

FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);

void init(const FutureImplPtr &prev_future);
void init(const ceph::ref_t<FutureImpl> &prev_future);

inline uint64_t get_tag_tid() const {
return m_tag_tid;
Expand All @@ -56,19 +48,17 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
}
inline void set_flush_in_progress() {
auto h = std::move(m_flush_handler);
ceph_assert(h);
std::lock_guard locker{m_lock};
ceph_assert(m_flush_handler);
m_flush_handler.reset();
m_flush_state = FLUSH_STATE_IN_PROGRESS;
}

bool attach(const FlushHandlerPtr &flush_handler);
bool attach(FlushHandler::ref flush_handler);
inline void detach() {
std::lock_guard locker{m_lock};
m_flush_handler.reset();
}
inline FlushHandlerPtr get_flush_handler() const {
std::lock_guard locker{m_lock};
inline FlushHandler::ref get_flush_handler() const {
return m_flush_handler;
}

Expand All @@ -77,7 +67,7 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
private:
friend std::ostream &operator<<(std::ostream &, const FutureImpl &);

typedef std::map<FlushHandlerPtr, FutureImplPtr> FlushHandlers;
typedef std::map<FlushHandler::ref, ceph::ref_t<FutureImpl>> FlushHandlers;
typedef std::list<Context *> Contexts;

enum FlushState {
Expand All @@ -87,41 +77,42 @@ class FutureImpl : public RefCountedObject, boost::noncopyable {
};

struct C_ConsistentAck : public Context {
FutureImplPtr future;
C_ConsistentAck(FutureImpl *_future) : future(_future) {}
ceph::ref_t<FutureImpl> future;
C_ConsistentAck(ceph::ref_t<FutureImpl> _future) : future(std::move(_future)) {}
void complete(int r) override {
future->consistent(r);
future.reset();
}
void finish(int r) override {}
};

FRIEND_MAKE_REF(FutureImpl);
FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);
~FutureImpl() override = default;

uint64_t m_tag_tid;
uint64_t m_entry_tid;
uint64_t m_commit_tid;

mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false);
FutureImplPtr m_prev_future;
bool m_safe;
bool m_consistent;
int m_return_value;
ceph::ref_t<FutureImpl> m_prev_future;
bool m_safe = false;
bool m_consistent = false;
int m_return_value = 0;

FlushHandlerPtr m_flush_handler;
FlushState m_flush_state;
FlushHandler::ref m_flush_handler;
FlushState m_flush_state = FLUSH_STATE_NONE;

C_ConsistentAck m_consistent_ack;
Contexts m_contexts;

FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);
FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);
ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers);
ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);

void consistent(int r);
void finish_unlock();
};

void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
void intrusive_ptr_release(FutureImpl::FlushHandler *p);

std::ostream &operator<<(std::ostream &os, const FutureImpl &future);

} // namespace journal
Expand Down
11 changes: 4 additions & 7 deletions src/journal/JournalMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,11 @@ JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
const std::string &oid,
const std::string &client_id,
const Settings &settings)
: RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
m_client_id(client_id), m_settings(settings), m_order(0),
m_splay_width(0), m_pool_id(-1), m_initialized(false),
: m_oid(oid),
m_client_id(client_id), m_settings(settings),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_commit_tid(0), m_watch_ctx(this),
m_watch_handle(0), m_minimum_set(0), m_active_set(0),
m_update_notifications(0), m_commit_position_ctx(NULL),
m_commit_position_task_ctx(NULL) {
m_watch_ctx(this)
{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
Expand Down
Loading

0 comments on commit 517bdca

Please sign in to comment.