Skip to content

Commit

Permalink
Report consensus phase changes in the server subscription stream
Browse files Browse the repository at this point in the history
  • Loading branch information
movitto authored and manojsdoshi committed Oct 30, 2019
1 parent 726dd69 commit 15c5f9c
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/ripple/app/consensus/RCLConsensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,12 @@ class RCLConsensus
return adaptor_.mode();
}

ConsensusPhase
phase() const
{
return consensus_.phase();
}

//! @see Consensus::getJson
Json::Value
getJson(bool full) const;
Expand Down
75 changes: 73 additions & 2 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class NetworkOPsImp final
boost::optional<std::chrono::milliseconds> consensusDelay) override;
uint256 getConsensusLCL () override;
void reportFeeChange () override;
void reportConsensusStateChange(ConsensusPhase phase);

void updateLocalTx (ReadView const& view) override
{
Expand Down Expand Up @@ -486,6 +487,9 @@ class NetworkOPsImp final
bool unsubPeerStatus (std::uint64_t uListener) override;
void pubPeerStatus (std::function<Json::Value(void)> const&) override;

bool subConsensus (InfoSub::ref ispListener) override;
bool unsubConsensus (std::uint64_t uListener) override;

InfoSub::pointer findRpcSub (std::string const& strUrl) override;
InfoSub::pointer addRpcSub (
std::string const& strUrl, InfoSub::ref) override;
Expand Down Expand Up @@ -543,6 +547,7 @@ class NetworkOPsImp final
bool isAccepted);

void pubServer ();
void pubConsensus (ConsensusPhase phase);

std::string getHostId (bool forAdmin);

Expand Down Expand Up @@ -570,6 +575,8 @@ class NetworkOPsImp final

RCLConsensus mConsensus;

ConsensusPhase mLastConsensusPhase;

LedgerMaster& m_ledgerMaster;
std::shared_ptr<InboundLedger> mAcquiringLedger;

Expand All @@ -587,9 +594,10 @@ class NetworkOPsImp final
sRTTransactions, // All proposed and accepted transactions.
sValidations, // Received validations.
sPeerStatus, // Peer status changes.
sConsensusPhase, // Consensus phase

sLastEntry = sPeerStatus // as this name implies, any new entry must
// be ADDED ABOVE this one
sLastEntry = sConsensusPhase // as this name implies, any new entry must
// be ADDED ABOVE this one
};
std::array<SubMapType, SubTypes::sLastEntry+1> mStreamMaps;

Expand Down Expand Up @@ -769,6 +777,13 @@ void NetworkOPsImp::processHeartbeatTimer ()

mConsensus.timerEntry (app_.timeKeeper().closeTime());

const ConsensusPhase currPhase = mConsensus.phase();
if (mLastConsensusPhase != currPhase)
{
reportConsensusStateChange(currPhase);
mLastConsensusPhase = currPhase;
}

setHeartbeatTimer ();
}

Expand Down Expand Up @@ -1464,6 +1479,13 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
prevLedger,
changes.removed);

const ConsensusPhase currPhase = mConsensus.phase();
if (mLastConsensusPhase != currPhase)
{
reportConsensusStateChange(currPhase);
mLastConsensusPhase = currPhase;
}

JLOG(m_journal.debug()) << "Initiating consensus engine";
return true;
}
Expand Down Expand Up @@ -1711,6 +1733,33 @@ void NetworkOPsImp::pubServer ()
}
}

void NetworkOPsImp::pubConsensus (ConsensusPhase phase)
{
std::lock_guard sl (mSubLock);

auto& streamMap = mStreamMaps[sConsensusPhase];
if (!streamMap.empty ())
{
Json::Value jvObj (Json::objectValue);
jvObj [jss::type] = "consensusPhase";
jvObj [jss::consensus] = to_string(phase);

for (auto i = streamMap.begin ();
i != streamMap.end (); )
{
if (auto p = i->second.lock())
{
p->send (jvObj, true);
++i;
}
else
{
i = streamMap.erase (i);
}
}
}
}


void NetworkOPsImp::pubValidation (STValidation::ref val)
{
Expand Down Expand Up @@ -2517,6 +2566,13 @@ void NetworkOPsImp::reportFeeChange ()
}
}

void NetworkOPsImp::reportConsensusStateChange (ConsensusPhase phase)
{
m_job_queue.addJob (
jtCLIENT, "reportConsensusStateChange->pubConsensus",
[this, phase] (Job&) { pubConsensus(phase); });
}

// This routine should only be used to publish accepted or validated
// transactions.
Json::Value NetworkOPsImp::transJson(
Expand Down Expand Up @@ -2973,6 +3029,21 @@ bool NetworkOPsImp::unsubPeerStatus (std::uint64_t uSeq)
return mStreamMaps[sPeerStatus].erase (uSeq);
}

// <-- bool: true=added, false=already there
bool NetworkOPsImp::subConsensus (InfoSub::ref isrListener)
{
std::lock_guard sl (mSubLock);
return mStreamMaps[sConsensusPhase].emplace (
isrListener->getSeq (), isrListener).second;
}

// <-- bool: true=erased, false=was not there
bool NetworkOPsImp::unsubConsensus (std::uint64_t uSeq)
{
std::lock_guard sl (mSubLock);
return mStreamMaps[sConsensusPhase].erase (uSeq);
}

InfoSub::pointer NetworkOPsImp::findRpcSub (std::string const& strUrl)
{
std::lock_guard sl (mSubLock);
Expand Down
6 changes: 6 additions & 0 deletions src/ripple/consensus/Consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ class Consensus
return prevLedgerID_;
}

ConsensusPhase
phase() const
{
return phase_;
}

/** Get the Json state of the consensus process.
Called by the consensus_info RPC.
Expand Down
3 changes: 3 additions & 0 deletions src/ripple/net/InfoSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class InfoSub
virtual bool unsubPeerStatus (std::uint64_t uListener) = 0;
virtual void pubPeerStatus (std::function<Json::Value(void)> const&) = 0;

virtual bool subConsensus (ref ispListener) = 0;
virtual bool unsubConsensus (std::uint64_t uListener) = 0;

// VFALCO TODO Remove
// This was added for one particular partner, it
// "pushes" subscription data to a particular URL.
Expand Down
1 change: 1 addition & 0 deletions src/ripple/net/impl/InfoSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ InfoSub::~InfoSub ()
m_source.unsubServer (mSeq);
m_source.unsubValidations (mSeq);
m_source.unsubPeerStatus (mSeq);
m_source.unsubConsensus (mSeq);

// Use the internal unsubscribe so that it won't call
// back to us and modify its own parameter
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/rpc/handlers/Subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ Json::Value doSubscribe (RPC::Context& context)
return rpcError(rpcNO_PERMISSION);
context.netOps.subPeerStatus (ispSub);
}
else if (streamName == "consensus")
{
context.netOps.subConsensus (ispSub);
}
else
{
return rpcError(rpcSTREAM_MALFORMED);
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/rpc/handlers/Unsubscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ Json::Value doUnsubscribe (RPC::Context& context)
{
context.netOps.unsubPeerStatus (ispSub->getSeq ());
}
else if (streamName == "consensus")
{
context.netOps.unsubConsensus (ispSub->getSeq());
}
else
{
return rpcError(rpcSTREAM_MALFORMED);
Expand Down

0 comments on commit 15c5f9c

Please sign in to comment.