Skip to content

Commit

Permalink
Make waypoint_addresses vec (istio#147)
Browse files Browse the repository at this point in the history
* Update waypoint_addresses to Vec

* Fix liny
  • Loading branch information
hzxuzhonghu authored Nov 17, 2022
1 parent 9aa39bd commit f4cb475
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion proto/workload.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ message Workload {
string service_account = 7;

// If present, the waypoint proxy for this workload.
bytes waypoint_address = 8;
repeated bytes waypoint_addresses = 8;

// Name of the node the workload runs on
string node = 9;
Expand Down
9 changes: 5 additions & 4 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ impl OutboundConnection {

let us = us.unwrap();
// For case source client has enabled waypoint
if source_workload.waypoint_address.is_some() {
let waypoint_address = source_workload.waypoint_address.unwrap();
if !source_workload.waypoint_addresses.is_empty() {
let waypoint_address = source_workload.choose_waypoint_address().unwrap();
return Ok(Request {
// Always use HBONE here
protocol: Protocol::Hbone,
Expand All @@ -258,7 +258,8 @@ impl OutboundConnection {
});
}
// For case upstream server has enabled waypoint
if us.workload.waypoint_address.is_some() {
if !us.workload.waypoint_addresses.is_empty() {
let waypoint_address = source_workload.choose_waypoint_address().unwrap();
// Even in this case, we are picking a single upstream pod and deciding if it has a remote proxy.
// Typically this is all or nothing, but if not we should probably send to remote proxy if *any* upstream has one.
return Ok(Request {
Expand All @@ -267,7 +268,7 @@ impl OutboundConnection {
source: source_workload,
// Use the original VIP, not translated
destination: target,
gateway: SocketAddr::from((us.workload.waypoint_address.unwrap(), 15006)),
gateway: SocketAddr::from((waypoint_address, 15006)),
// Let the client remote know we are on the inbound path.
direction: Direction::Inbound,
request_type: RequestType::ToServerWaypoint,
Expand Down
27 changes: 18 additions & 9 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl TryFrom<Option<xds::istio::workload::Protocol>> for Protocol {
pub struct Workload {
pub workload_ip: IpAddr,
#[serde(default)]
pub waypoint_address: Option<IpAddr>,
pub waypoint_addresses: Vec<IpAddr>,
#[serde(default)]
pub gateway_address: Option<SocketAddr>,
#[serde(default)]
Expand Down Expand Up @@ -98,6 +98,12 @@ impl Workload {
service_account: self.service_account.clone(),
}
}
pub fn choose_waypoint_address(&self) -> Option<IpAddr> {
self.waypoint_addresses
.iter()
.choose(&mut rand::thread_rng())
.copied()
}
}

impl fmt::Display for Workload {
Expand Down Expand Up @@ -157,12 +163,16 @@ impl TryFrom<&XdsWorkload> for Workload {
type Error = WorkloadError;
fn try_from(resource: &XdsWorkload) -> Result<Self, Self::Error> {
let resource: XdsWorkload = resource.to_owned();
let waypoint = byte_to_ip(&resource.waypoint_address)?;

let mut waypoint_addresses: Vec<IpAddr> = Vec::new();
for addr in &resource.waypoint_addresses {
waypoint_addresses.push(byte_to_ip(addr)?.ok_or(WorkloadError::ByteAddressParse(0))?)
}
let address = byte_to_ip(&resource.address)?.ok_or(WorkloadError::ByteAddressParse(0))?;
let workload_type = resource.workload_type().as_str_name().to_lowercase();
Ok(Workload {
workload_ip: address,
waypoint_address: waypoint,
waypoint_addresses,
gateway_address: None,

protocol: Protocol::try_from(xds::istio::workload::Protocol::from_i32(
Expand Down Expand Up @@ -443,10 +453,10 @@ impl WorkloadStore {
if us.workload.gateway_address.is_none() {
us.workload.gateway_address = Some(match us.workload.protocol {
Protocol::Hbone => {
let mut ip = us.workload.workload_ip;
if let Some(addr) = us.workload.waypoint_address {
ip = addr;
}
let ip = us
.workload
.choose_waypoint_address()
.unwrap_or(us.workload.workload_ip);
SocketAddr::from((ip, 15008))
}
Protocol::Tcp => SocketAddr::from((us.workload.workload_ip, us.port)),
Expand Down Expand Up @@ -568,8 +578,7 @@ mod tests {
fn workload_information() {
let default = Workload {
workload_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),

waypoint_address: None,
waypoint_addresses: Vec::new(),
gateway_address: None,
protocol: Default::default(),
name: "".to_string(),
Expand Down

0 comments on commit f4cb475

Please sign in to comment.