Skip to content

Commit

Permalink
Reduce usage of unwrap in p2p crate (mimblewimble#2627)
Browse files Browse the repository at this point in the history
Also change store crate a bit
  • Loading branch information
hashmap authored Feb 25, 2019
1 parent 224a315 commit fe9fa51
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 132 deletions.
13 changes: 6 additions & 7 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,18 @@ impl<'a> Response<'a> {
resp_type: Type,
body: T,
stream: &'a mut dyn Write,
) -> Response<'a> {
let body = ser::ser_vec(&body).unwrap();
Response {
) -> Result<Response<'a>, Error> {
let body = ser::ser_vec(&body)?;
Ok(Response {
resp_type,
body,
stream,
attachment: None,
}
})
}

fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
let mut msg =
ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap();
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?;
msg.append(&mut self.body);
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
// Increase sent bytes counter
Expand Down Expand Up @@ -177,7 +176,7 @@ impl Tracker {
where
T: ser::Writeable,
{
let buf = write_to_buf(body, msg_type);
let buf = write_to_buf(body, msg_type)?;
let buf_len = buf.len();
self.send_channel.try_send(buf)?;

Expand Down
38 changes: 19 additions & 19 deletions p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,26 @@ pub fn read_message<T: Readable>(stream: &mut dyn Read, msg_type: Type) -> Resul
read_body(&header, stream)
}

pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Vec<u8> {
pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Result<Vec<u8>, Error> {
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg).unwrap();
ser::serialize(&mut body_buf, &msg)?;

// build and serialize the header using the body size
let mut msg_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen)).unwrap();
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen))?;
msg_buf.append(&mut body_buf);

msg_buf
Ok(msg_buf)
}

pub fn write_message<T: Writeable>(
stream: &mut dyn Write,
msg: T,
msg_type: Type,
) -> Result<(), Error> {
let buf = write_to_buf(msg, msg_type);
let buf = write_to_buf(msg, msg_type)?;
stream.write_all(&buf[..])?;
Ok(())
}
Expand Down Expand Up @@ -268,11 +268,11 @@ impl Writeable for Hand {
[write_u32, self.capabilities.bits()],
[write_u64, self.nonce]
);
self.total_difficulty.write(writer).unwrap();
self.sender_addr.write(writer).unwrap();
self.receiver_addr.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.sender_addr.write(writer)?;
self.receiver_addr.write(writer)?;
writer.write_bytes(&self.user_agent)?;
self.genesis.write(writer)?;
Ok(())
}
}
Expand Down Expand Up @@ -323,9 +323,9 @@ impl Writeable for Shake {
[write_u32, self.version],
[write_u32, self.capabilities.bits()]
);
self.total_difficulty.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
self.total_difficulty.write(writer)?;
writer.write_bytes(&self.user_agent)?;
self.genesis.write(writer)?;
Ok(())
}
}
Expand Down Expand Up @@ -379,7 +379,7 @@ impl Writeable for PeerAddrs {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u32(self.peers.len() as u32)?;
for p in &self.peers {
p.write(writer).unwrap();
p.write(writer)?;
}
Ok(())
}
Expand Down Expand Up @@ -484,8 +484,8 @@ pub struct Ping {

impl Writeable for Ping {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
self.height.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.height.write(writer)?;
Ok(())
}
}
Expand All @@ -511,8 +511,8 @@ pub struct Pong {

impl Writeable for Pong {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
self.height.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.height.write(writer)?;
Ok(())
}
}
Expand All @@ -537,7 +537,7 @@ pub struct BanReason {
impl Writeable for BanReason {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
let ban_reason_i32 = self.ban_reason as i32;
ban_reason_i32.write(writer).unwrap();
ban_reason_i32.write(writer)?;
Ok(())
}
}
Expand Down
105 changes: 32 additions & 73 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}

macro_rules! connection {
($holder:expr) => {
match $holder.connection.as_ref() {
Some(conn) => conn.lock(),
None => return Err(Error::Internal),
}
};
}

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
Expand Down Expand Up @@ -233,41 +242,23 @@ impl Peer {
total_difficulty,
height,
};
self.connection
.as_ref()
.unwrap()
.lock()
.send(ping_msg, msg::Type::Ping)
connection!(self).send(ping_msg, msg::Type::Ping)
}

/// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) {
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
match self
.connection
.as_ref()
.unwrap()
.lock()
connection!(self)
.send(ban_reason_msg, msg::Type::BanReason)
{
Ok(_) => debug!("Sent ban reason {:?} to {}", ban_reason, self.info.addr),
Err(e) => error!(
"Could not send ban reason {:?} to {}: {:?}",
ban_reason, self.info.addr, e
),
};
.map(|_| ())
}

/// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::Block)?;
connection!(self).send(b, msg::Type::Block)?;
Ok(true)
} else {
debug!(
Expand All @@ -282,11 +273,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::CompactBlock)?;
connection!(self).send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
Expand All @@ -301,11 +288,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(bh, msg::Type::Header)?;
connection!(self).send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
Expand All @@ -320,11 +303,7 @@ impl Peer {
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(h, msg::Type::TransactionKernel)?;
connection!(self).send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
Expand Down Expand Up @@ -352,11 +331,7 @@ impl Peer {

if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::Transaction)?;
connection!(self).send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
debug!(
Expand All @@ -373,59 +348,38 @@ impl Peer {
/// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::StemTransaction)?;
Ok(())
connection!(self).send(tx, msg::Type::StemTransaction)
}

/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.connection
.as_ref()
.unwrap()
.lock()
.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}

pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetTransaction)
connection!(self).send(&h, msg::Type::GetTransaction)
}

/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetBlock)
connection!(self).send(&h, msg::Type::GetBlock)
}

/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetCompactBlock)
connection!(self).send(&h, msg::Type::GetCompactBlock)
}

pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
self.connection.as_ref().unwrap().lock().send(
connection!(self).send(
&GetPeerAddrs {
capabilities: capab,
},
Expand All @@ -438,19 +392,24 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
self.connection.as_ref().unwrap().lock().send(
connection!(self).send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
)
}

/// Stops the peer, closing its connection
pub fn stop(&self) {
stop_with_connection(&self.connection.as_ref().unwrap().lock());
if let Some(conn) = self.connection.as_ref() {
stop_with_connection(&conn.lock());
}
}

fn check_connection(&self) -> bool {
let connection = self.connection.as_ref().unwrap().lock();
let connection = match self.connection.as_ref() {
Some(conn) => conn.lock(),
None => return false,
};
match connection.error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let need_stop = {
Expand Down
Loading

0 comments on commit fe9fa51

Please sign in to comment.