Skip to content

Commit

Permalink
Remove todo from code and provide some preliminary peer review config…
Browse files Browse the repository at this point in the history
… values (FuelLabs#1380)

This was left behind from this work:
FuelLabs#1356

As we did for the syncing peer config, we left hard-coded values for
each variant. The followup work for that is tracked here:
FuelLabs#1340

---------

Co-authored-by: Green Baneling <[email protected]>
  • Loading branch information
MitchTurner and xgreenx authored Sep 25, 2023
1 parent 2558946 commit 2152e39
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 17 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Description of the upcoming release here.

### Changed

- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`
- [#1380](https://github.com/FuelLabs/fuel-core/pull/1380): Add preliminary, hard-coded config values for heartbeat peer reputation, removing `todo`.
- [#1377](https://github.com/FuelLabs/fuel-core/pull/1377): Remove `DiscoveryEvent` and use `KademliaEvent` directly in `DiscoveryBehavior`.
- [#1366](https://github.com/FuelLabs/fuel-core/pull/1366): Improve caching during docker builds in CI by replacing gha
- [#1358](https://github.com/FuelLabs/fuel-core/pull/1358): Upgraded the Rust version used in CI to 1.72.0. Also includes associated Clippy changes.
- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Modified block synchronization to use asynchronous task execution when retrieving block headers.
Expand Down
79 changes: 63 additions & 16 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,6 @@ pub enum HeartBeatPeerReportReason {
LowHeartBeatFrequency,
}

impl PeerReport for HeartBeatPeerReportReason {
fn get_score_from_report(&self) -> AppScore {
todo!()
}
}

pub trait TaskP2PService: Send {
fn get_peer_ids(&self) -> Vec<PeerId>;
fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)>;
Expand Down Expand Up @@ -242,7 +236,7 @@ pub trait Broadcast: Send {
fn report_peer(
&self,
peer_id: FuelPeerId,
report: HeartBeatPeerReportReason,
report: AppScore,
reporting_service: &'static str,
) -> anyhow::Result<()>;

Expand All @@ -258,7 +252,7 @@ impl Broadcast for SharedState {
fn report_peer(
&self,
peer_id: FuelPeerId,
report: HeartBeatPeerReportReason,
report: AppScore,
reporting_service: &'static str,
) -> anyhow::Result<()> {
self.report_peer(peer_id, report, reporting_service)
Expand Down Expand Up @@ -293,6 +287,13 @@ pub struct Task<P, D, B> {
heartbeat_max_avg_interval: Duration,
heartbeat_max_time_since_last: Duration,
next_check_time: Instant,
heartbeat_peer_reputation_config: HeartbeatPeerReputationConfig,
}

#[derive(Clone)]
pub struct HeartbeatPeerReputationConfig {
old_heartbeat_penalty: AppScore,
low_heartbeat_frequency_penalty: AppScore,
}

impl<D> Task<FuelP2PService<PostcardCodec>, D, SharedState> {
Expand All @@ -313,6 +314,13 @@ impl<D> Task<FuelP2PService<PostcardCodec>, D, SharedState> {
let (tx_broadcast, _) = broadcast::channel(100);
let (block_height_broadcast, _) = broadcast::channel(100);

// Hardcoded for now, but left here to be configurable in the future.
// TODO: https://github.com/FuelLabs/fuel-core/issues/1340
let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
old_heartbeat_penalty: -5.,
low_heartbeat_frequency_penalty: -5.,
};

let next_block_height = block_importer.next_block_height();
let p2p_service = FuelP2PService::new(config, PostcardCodec::new(max_block_size));

Expand All @@ -337,6 +345,7 @@ impl<D> Task<FuelP2PService<PostcardCodec>, D, SharedState> {
heartbeat_max_avg_interval,
heartbeat_max_time_since_last,
next_check_time,
heartbeat_peer_reputation_config,
}
}
}
Expand All @@ -348,21 +357,39 @@ impl<P: TaskP2PService, D, B: Broadcast> Task<P, D, B> {
{
tracing::debug!("Peer {:?} has old heartbeat", peer_id);
let report = HeartBeatPeerReportReason::OldHeartBeat;
let service = "p2p";
let peer_id = convert_peer_id(peer_id)?;
self.broadcast.report_peer(peer_id, report, service)?;
self.report_peer(peer_id, report)?;
} else if peer_info.heartbeat_data.average_time_between_heartbeats()
> self.heartbeat_max_avg_interval
{
tracing::debug!("Peer {:?} has low heartbeat frequency", peer_id);
let report = HeartBeatPeerReportReason::LowHeartBeatFrequency;
let service = "p2p";
let peer_id = convert_peer_id(peer_id)?;
self.broadcast.report_peer(peer_id, report, service)?;
self.report_peer(peer_id, report)?;
}
}
Ok(())
}

fn report_peer(
&self,
peer_id: FuelPeerId,
report: HeartBeatPeerReportReason,
) -> anyhow::Result<()> {
let app_score = match report {
HeartBeatPeerReportReason::OldHeartBeat => {
self.heartbeat_peer_reputation_config.old_heartbeat_penalty
}
HeartBeatPeerReportReason::LowHeartBeatFrequency => {
self.heartbeat_peer_reputation_config
.low_heartbeat_frequency_penalty
}
};
let reporting_service = "p2p";
self.broadcast
.report_peer(peer_id, app_score, reporting_service)?;
Ok(())
}
}

fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
Expand Down Expand Up @@ -976,14 +1003,14 @@ pub mod tests {
}

struct FakeBroadcast {
pub peer_reports: mpsc::Sender<(FuelPeerId, HeartBeatPeerReportReason, String)>,
pub peer_reports: mpsc::Sender<(FuelPeerId, AppScore, String)>,
}

impl Broadcast for FakeBroadcast {
fn report_peer(
&self,
peer_id: FuelPeerId,
report: HeartBeatPeerReportReason,
report: AppScore,
reporting_service: &'static str,
) -> anyhow::Result<()> {
self.peer_reports.try_send((
Expand Down Expand Up @@ -1044,6 +1071,12 @@ pub mod tests {
// Greater than actual
let heartbeat_max_time_since_last = Duration::from_secs(40);

// Arbitrary values
let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
old_heartbeat_penalty: 5.6,
low_heartbeat_frequency_penalty: 20.45,
};

let mut task = Task {
p2p_service,
db: Arc::new(FakeDB),
Expand All @@ -1055,6 +1088,7 @@ pub mod tests {
heartbeat_max_avg_interval,
heartbeat_max_time_since_last,
next_check_time: Instant::now(),
heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(),
};
let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started);
let mut watcher = StateWatcher::from(watch_receiver);
Expand All @@ -1072,7 +1106,10 @@ pub mod tests {
FuelPeerId::from(peer_id.to_bytes().to_vec()),
report_peer_id
);
assert_eq!(report, HeartBeatPeerReportReason::LowHeartBeatFrequency);
assert_eq!(
report,
heartbeat_peer_reputation_config.low_heartbeat_frequency_penalty
);
assert_eq!(reporting_service, "p2p");
}

Expand Down Expand Up @@ -1112,6 +1149,12 @@ pub mod tests {
// Less than actual
let heartbeat_max_time_since_last = Duration::from_secs(40);

// Arbitrary values
let heartbeat_peer_reputation_config = HeartbeatPeerReputationConfig {
old_heartbeat_penalty: 5.6,
low_heartbeat_frequency_penalty: 20.45,
};

let mut task = Task {
p2p_service,
db: Arc::new(FakeDB),
Expand All @@ -1123,6 +1166,7 @@ pub mod tests {
heartbeat_max_avg_interval,
heartbeat_max_time_since_last,
next_check_time: Instant::now(),
heartbeat_peer_reputation_config: heartbeat_peer_reputation_config.clone(),
};
let (watch_sender, watch_receiver) = tokio::sync::watch::channel(State::Started);
let mut watcher = StateWatcher::from(watch_receiver);
Expand All @@ -1140,7 +1184,10 @@ pub mod tests {
FuelPeerId::from(peer_id.to_bytes().to_vec()),
report_peer_id
);
assert_eq!(report, HeartBeatPeerReportReason::OldHeartBeat);
assert_eq!(
report,
heartbeat_peer_reputation_config.old_heartbeat_penalty
);
assert_eq!(reporting_service, "p2p");
}
}
6 changes: 6 additions & 0 deletions crates/types/src/services/p2p/peer_reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ pub trait PeerReport {
/// Extracts PeerScore from the Report
fn get_score_from_report(&self) -> AppScore;
}

impl PeerReport for AppScore {
fn get_score_from_report(&self) -> AppScore {
*self
}
}

0 comments on commit 2152e39

Please sign in to comment.