Skip to content

Commit

Permalink
Improve Prometheus exporter output (postgresml#795)
Browse files Browse the repository at this point in the history
* Prometheus metrics updates:

 * Add username label to deconflict metrics that would otherwise
   have duplicate labels across different pools.
 * Group metrics by name and only print HELP and TYPE once per
   metric name.
 * Sort labels for a deterministic output.

---------

Co-authored-by: Curtis Myzie <[email protected]>
Co-authored-by: Towhid Khan
  • Loading branch information
drdrsh and myzie authored Sep 5, 2024
1 parent 7d047c6 commit f0865ca
Showing 1 changed file with 69 additions and 14 deletions.
83 changes: 69 additions & 14 deletions src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,17 @@ struct PrometheusMetric<Value: fmt::Display> {

impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let formatted_labels = self
.labels
let mut sorted_labels: Vec<_> = self.labels.iter().collect();
sorted_labels.sort_by_key(|&(key, _)| key);
let formatted_labels = sorted_labels
.iter()
.map(|(key, value)| format!("{}=\"{}\"", key, value))
.collect::<Vec<_>>()
.join(",");
write!(
f,
"# HELP {name} {help}\n# TYPE {name} {ty}\n{name}{{{formatted_labels}}} {value}\n",
"{name}{{{formatted_labels}}} {value}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
formatted_labels = formatted_labels,
value = self.value
)
Expand Down Expand Up @@ -247,7 +246,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
labels.insert("username", address.username.clone());

Self::from_name(&format!("databases_{}", name), value, labels)
}
Expand All @@ -264,7 +263,8 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("pool", address.pool_name.clone());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
labels.insert("username", address.username.clone());

Self::from_name(&format!("servers_{}", name), value, labels)
}

Expand All @@ -276,7 +276,7 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
labels.insert("role", address.role.to_string());
labels.insert("index", address.address_index.to_string());
labels.insert("database", address.database.to_string());
labels.insert("user", address.username.clone());
labels.insert("username", address.username.clone());

Self::from_name(&format!("stats_{}", name), value, labels)
}
Expand All @@ -288,6 +288,15 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {

Self::from_name(&format!("pools_{}", name), value, labels)
}

fn get_header(&self) -> String {
format!(
"\n# HELP {name} {help}\n# TYPE {name} {ty}",
name = format_args!("pgcat_{}", self.name),
help = self.help,
ty = self.ty,
)
}
}

async fn prometheus_stats(
Expand All @@ -313,6 +322,7 @@ async fn prometheus_stats(

// Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
Expand All @@ -322,41 +332,64 @@ fn push_address_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_address(address, &key, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(key)
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
}
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}

// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
let pool_stats = PoolStats::construct_pool_lookup();
for (pool_id, stats) in pool_stats.iter() {
for (name, value) in stats.clone() {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_pool(pool_id.clone(), &name, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(name)
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for ({})", name, *pool_id);
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}

// Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) {
let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u32>>> = HashMap::new();
for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone();
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);

let metrics = vec![
("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections),
Expand All @@ -365,14 +398,25 @@ fn push_database_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
}
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}

// Adds relevant metrics shown in a SHOW SERVERS admin command.
Expand Down Expand Up @@ -405,7 +449,7 @@ fn push_server_stats(lines: &mut Vec<String>) {
crate::stats::ServerState::Idle => entry.idle_count += 1,
}
}

let mut grouped_metrics: HashMap<String, Vec<PrometheusMetric<u64>>> = HashMap::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
Expand All @@ -428,7 +472,10 @@ fn push_server_stats(lines: &mut Vec<String>) {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value)
{
lines.push(prometheus_metric.to_string());
grouped_metrics
.entry(key.to_string())
.or_default()
.push(prometheus_metric);
} else {
debug!("Metric {} not implemented for {}", key, address.name());
}
Expand All @@ -437,6 +484,14 @@ fn push_server_stats(lines: &mut Vec<String>) {
}
}
}
for (_key, metrics) in grouped_metrics {
if !metrics.is_empty() {
lines.push(metrics[0].get_header());
for metric in metrics {
lines.push(metric.to_string());
}
}
}
}

pub async fn start_metric_server(http_addr: SocketAddr) {
Expand Down

0 comments on commit f0865ca

Please sign in to comment.