Skip to content

Commit

Permalink
primary: add interface for querying information about a primary's wor…
Browse files Browse the repository at this point in the history
…kers
  • Loading branch information
bmwill committed Aug 1, 2022
1 parent b4a41fd commit 90d7fe0
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 14 deletions.
10 changes: 5 additions & 5 deletions narwhal/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub struct PrimaryAddresses {
}

#[derive(Clone, Serialize, Deserialize, Eq, Hash, PartialEq, Debug)]
pub struct WorkerAddresses {
pub struct WorkerInfo {
/// Address to receive client transactions (WAN).
pub transactions: Multiaddr,
/// Address to receive messages from other workers (WAN).
Expand All @@ -316,7 +316,7 @@ pub struct Authority {
/// The network addresses of the primary.
pub primary: PrimaryAddresses,
/// Map of workers' id and their network addresses.
pub workers: HashMap<WorkerId, WorkerAddresses>,
pub workers: HashMap<WorkerId, WorkerInfo>,
}

pub type SharedCommittee = Arc<ArcSwap<Committee>>;
Expand Down Expand Up @@ -402,7 +402,7 @@ impl Committee {
}

/// Returns the addresses of a specific worker (`id`) of a specific authority (`to`).
pub fn worker(&self, to: &PublicKey, id: &WorkerId) -> Result<WorkerAddresses, ConfigError> {
pub fn worker(&self, to: &PublicKey, id: &WorkerId) -> Result<WorkerInfo, ConfigError> {
self.authorities
.iter()
.find(|(name, _)| *name == to)
Expand All @@ -417,7 +417,7 @@ impl Committee {
.ok_or_else(|| ConfigError::NotInCommittee((*to).encode_base64()))
}
/// Returns the addresses of all our workers.
pub fn our_workers(&self, myself: &PublicKey) -> Result<Vec<WorkerAddresses>, ConfigError> {
pub fn our_workers(&self, myself: &PublicKey) -> Result<Vec<WorkerInfo>, ConfigError> {
let res = self
.authorities
.iter()
Expand All @@ -437,7 +437,7 @@ impl Committee {
&self,
myself: &PublicKey,
id: &WorkerId,
) -> Vec<(PublicKey, WorkerAddresses)> {
) -> Vec<(PublicKey, WorkerInfo)> {
self.authorities
.iter()
.filter(|(name, _)| *name != myself)
Expand Down
4 changes: 2 additions & 2 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use config::{Authority, Committee, Epoch, PrimaryAddresses, WorkerAddresses};
use config::{Authority, Committee, Epoch, PrimaryAddresses, WorkerInfo};
use crypto::{
traits::{KeyPair as _, Signer},
Digest, Hash, KeyPair,
Expand Down Expand Up @@ -52,7 +52,7 @@ fn get_registry() -> Result<Registry> {
};
let workers = vec![(
0,
WorkerAddresses {
WorkerInfo {
primary_to_worker: format!("/ip4/127.0.0.1/tcp/{}/http", 300 + i)
.parse()
.unwrap(),
Expand Down
4 changes: 2 additions & 2 deletions narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Authority:
MAP:
KEY: U32
VALUE:
TYPENAME: WorkerAddresses
TYPENAME: WorkerInfo
Batch:
NEWTYPESTRUCT:
SEQ:
Expand Down Expand Up @@ -104,7 +104,7 @@ ReconfigureNotification:
TYPENAME: Committee
1:
Shutdown: UNIT
WorkerAddresses:
WorkerInfo:
STRUCT:
- transactions: BYTES
- worker_to_worker: BYTES
Expand Down
31 changes: 28 additions & 3 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
BlockRemover, CertificatesResponse, DeleteBatchMessage, PayloadAvailabilityResponse,
};
use async_trait::async_trait;
use config::{Parameters, SharedCommittee, WorkerId};
use config::{Parameters, SharedCommittee, WorkerId, WorkerInfo};
use consensus::dag::Dag;
use crypto::{
traits::{EncodeDecodeBase64, Signer},
Expand All @@ -28,6 +28,7 @@ use multiaddr::{Multiaddr, Protocol};
use network::{metrics::Metrics, PrimaryNetwork, PrimaryToWorkerNetwork};
use prometheus::Registry;
use std::{
collections::HashMap,
net::Ipv4Addr,
sync::{atomic::AtomicU64, Arc},
time::Duration,
Expand All @@ -45,8 +46,8 @@ use tracing::{info, log::error};
use types::{
error::DagError, BatchDigest, BatchMessage, BincodeEncodedPayload, Certificate,
CertificateDigest, Empty, Header, HeaderDigest, PrimaryToPrimary, PrimaryToPrimaryServer,
ReconfigureNotification, WorkerPrimaryError, WorkerPrimaryMessage, WorkerToPrimary,
WorkerToPrimaryServer,
ReconfigureNotification, WorkerInfoResponse, WorkerPrimaryError, WorkerPrimaryMessage,
WorkerToPrimary, WorkerToPrimaryServer,
};
pub use types::{PrimaryMessage, PrimaryWorkerMessage};

Expand Down Expand Up @@ -190,6 +191,14 @@ impl Primary {
// used for cleanup. The only task that write into this variable is `GarbageCollector`.
let consensus_round = Arc::new(AtomicU64::new(0));

let our_workers = committee
.load()
.authorities
.get(&name)
.expect("Our public key or worker id is not in the committee")
.workers
.clone();

// Spawn the network receiver listening to messages from the other primaries.
let address = committee
.load()
Expand Down Expand Up @@ -232,6 +241,7 @@ impl Primary {
tx_batches,
tx_batch_removal,
tx_state_handler,
our_workers,
metrics: node_metrics.clone(),
}
.spawn(address.clone(), tx_reconfigure.subscribe());
Expand Down Expand Up @@ -599,6 +609,7 @@ struct WorkerReceiverHandler {
tx_batches: Sender<BatchResult>,
tx_batch_removal: Sender<DeleteBatchResult>,
tx_state_handler: Sender<ReconfigureNotification>,
our_workers: HashMap<WorkerId, WorkerInfo>,
metrics: Arc<PrimaryMetrics>,
}

Expand Down Expand Up @@ -703,4 +714,18 @@ impl WorkerToPrimary for WorkerReceiverHandler {

Ok(Response::new(Empty {}))
}

async fn worker_info(
&self,
_request: Request<Empty>,
) -> Result<Response<BincodeEncodedPayload>, Status> {
let workers = WorkerInfoResponse {
workers: self.our_workers.clone(),
};

let response = BincodeEncodedPayload::try_from(&workers)
.map_err(|e| Status::internal(e.to_string()))?;

Ok(Response::new(response))
}
}
2 changes: 2 additions & 0 deletions narwhal/types/proto/narwhal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ service WorkerToWorker {
service WorkerToPrimary {
// Sends a message
rpc SendMessage(BincodeEncodedPayload) returns (Empty) {}

rpc WorkerInfo(Empty) returns (BincodeEncodedPayload) {}
}

// The primary-to-worker interface
Expand Down
10 changes: 8 additions & 2 deletions narwhal/types/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use blake2::{digest::Update, VarBlake2b};
use bytes::Bytes;
use config::{Committee, Epoch, WorkerId};
use config::{Committee, Epoch, WorkerId, WorkerInfo};
use crypto::{
traits::{EncodeDecodeBase64, Signer, VerifyingKey},
Digest, Hash, PublicKey, Signature, SignatureService, Verifier, DIGEST_LEN,
Expand All @@ -17,7 +17,7 @@ use derive_builder::Builder;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
fmt,
fmt::Formatter,
};
Expand Down Expand Up @@ -659,3 +659,9 @@ pub enum WorkerPrimaryError {
#[error("An error occurred while deleting batches. None deleted")]
ErrorWhileDeletingBatches(Vec<BatchDigest>),
}

#[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct WorkerInfoResponse {
/// Map of workers' id and their network addresses.
pub workers: HashMap<WorkerId, WorkerInfo>,
}

0 comments on commit 90d7fe0

Please sign in to comment.