Skip to content

Commit 35e211f

Browse files
author
Dementii Priadko
committed
Merge branch 'mvp' into 'main'
Fixed a bug in reports related to invalid expression type Range query See merge request postgres-ai/postgres_ai!13
2 parents eec9325 + 0126519 commit 35e211f

File tree

1 file changed

+327
-3
lines changed

1 file changed

+327
-3
lines changed

reporter/postgres_reports.py

Lines changed: 327 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"""
33
PostgreSQL Reports Generator using PromQL
44
5-
This script generates reports for specific PostgreSQL check types (A002, A003, A004, A007, H001, F005, F004)
5+
This script generates reports for specific PostgreSQL check types (A002, A003, A004, A007, H001, F005, F004, K001, K003)
66
by querying Prometheus metrics using PromQL queries.
77
"""
88

@@ -428,6 +428,325 @@ def generate_f004_heap_bloat_report(self, cluster: str = "local", node_name: str
428428
"total_bloat_size_pretty": self.format_bytes(total_bloat_size)
429429
}, node_name)
430430

431+
def generate_k001_query_calls_report(self, cluster: str = "local", node_name: str = "node-01",
432+
time_range_minutes: int = 60) -> Dict[str, Any]:
433+
"""
434+
Generate K001 Globally Aggregated Query Metrics report (sorted by calls).
435+
436+
Args:
437+
cluster: Cluster name
438+
node_name: Node name
439+
time_range_minutes: Time range in minutes for metrics collection
440+
441+
Returns:
442+
Dictionary containing query metrics sorted by calls
443+
"""
444+
print("Generating K001 Globally Aggregated Query Metrics report...")
445+
446+
# Calculate time range
447+
end_time = datetime.now()
448+
start_time = end_time - timedelta(minutes=time_range_minutes)
449+
450+
# Get pg_stat_statements metrics using the same logic as CSV endpoint
451+
query_metrics = self._get_pgss_metrics_data(cluster, node_name, start_time, end_time)
452+
453+
# Sort by calls (descending)
454+
sorted_metrics = sorted(query_metrics, key=lambda x: x.get('calls', 0), reverse=True)
455+
456+
# Calculate totals
457+
total_calls = sum(q.get('calls', 0) for q in sorted_metrics)
458+
total_time = sum(q.get('total_time', 0) for q in sorted_metrics)
459+
total_rows = sum(q.get('rows', 0) for q in sorted_metrics)
460+
461+
return self.format_report_data("K001", {
462+
"query_metrics": sorted_metrics,
463+
"summary": {
464+
"total_queries": len(sorted_metrics),
465+
"total_calls": total_calls,
466+
"total_time_ms": total_time,
467+
"total_rows": total_rows,
468+
"time_range_minutes": time_range_minutes,
469+
"start_time": start_time.isoformat(),
470+
"end_time": end_time.isoformat()
471+
}
472+
}, node_name)
473+
474+
def generate_k003_top_queries_report(self, cluster: str = "local", node_name: str = "node-01",
475+
time_range_minutes: int = 60, limit: int = 50) -> Dict[str, Any]:
476+
"""
477+
Generate K003 Top-50 Queries by total_time report.
478+
479+
Args:
480+
cluster: Cluster name
481+
node_name: Node name
482+
time_range_minutes: Time range in minutes for metrics collection
483+
limit: Number of top queries to return (default: 50)
484+
485+
Returns:
486+
Dictionary containing top queries sorted by total execution time
487+
"""
488+
print("Generating K003 Top-50 Queries by total_time report...")
489+
490+
# Calculate time range
491+
end_time = datetime.now()
492+
start_time = end_time - timedelta(minutes=time_range_minutes)
493+
494+
# Get pg_stat_statements metrics using the same logic as CSV endpoint
495+
query_metrics = self._get_pgss_metrics_data(cluster, node_name, start_time, end_time)
496+
497+
# Sort by total_time (descending) and limit to top N
498+
sorted_metrics = sorted(query_metrics, key=lambda x: x.get('total_time', 0), reverse=True)[:limit]
499+
500+
# Calculate totals for the top queries
501+
total_calls = sum(q.get('calls', 0) for q in sorted_metrics)
502+
total_time = sum(q.get('total_time', 0) for q in sorted_metrics)
503+
total_rows = sum(q.get('rows', 0) for q in sorted_metrics)
504+
505+
return self.format_report_data("K003", {
506+
"top_queries": sorted_metrics,
507+
"summary": {
508+
"queries_returned": len(sorted_metrics),
509+
"total_calls": total_calls,
510+
"total_time_ms": total_time,
511+
"total_rows": total_rows,
512+
"time_range_minutes": time_range_minutes,
513+
"start_time": start_time.isoformat(),
514+
"end_time": end_time.isoformat(),
515+
"limit": limit
516+
}
517+
}, node_name)
518+
519+
def _get_pgss_metrics_data(self, cluster: str, node_name: str, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
520+
"""
521+
Get pg_stat_statements metrics data between two time points.
522+
Adapted from the logic in flask-backend/app.py get_pgss_metrics_csv().
523+
524+
Args:
525+
cluster: Cluster name
526+
node_name: Node name
527+
start_time: Start datetime
528+
end_time: End datetime
529+
530+
Returns:
531+
List of query metrics with calculated differences
532+
"""
533+
# Metric name mapping for cleaner output
534+
METRIC_NAME_MAPPING = {
535+
'calls': 'calls',
536+
'exec_time_total': 'total_time',
537+
'rows': 'rows',
538+
'shared_bytes_hit_total': 'shared_blks_hit',
539+
'shared_bytes_read_total': 'shared_blks_read',
540+
'shared_bytes_dirtied_total': 'shared_blks_dirtied',
541+
'shared_bytes_written_total': 'shared_blks_written',
542+
'block_read_total': 'blk_read_time',
543+
'block_write_total': 'blk_write_time'
544+
}
545+
546+
# Build filters
547+
filters = [f'cluster="{cluster}"', f'node_name="{node_name}"']
548+
filter_str = '{' + ','.join(filters) + '}'
549+
550+
# Get all pg_stat_statements metrics
551+
all_metrics = [
552+
'pgwatch_pg_stat_statements_calls',
553+
'pgwatch_pg_stat_statements_exec_time_total',
554+
'pgwatch_pg_stat_statements_rows',
555+
'pgwatch_pg_stat_statements_shared_bytes_hit_total',
556+
'pgwatch_pg_stat_statements_shared_bytes_read_total',
557+
'pgwatch_pg_stat_statements_shared_bytes_dirtied_total',
558+
'pgwatch_pg_stat_statements_shared_bytes_written_total',
559+
'pgwatch_pg_stat_statements_block_read_total',
560+
'pgwatch_pg_stat_statements_block_write_total'
561+
]
562+
563+
# Get metrics at start and end times
564+
start_data = []
565+
end_data = []
566+
567+
for metric in all_metrics:
568+
metric_with_filters = f'{metric}{filter_str}'
569+
570+
try:
571+
# Query metrics around start time - use instant queries at specific timestamps
572+
start_result = self.query_range(metric_with_filters, start_time - timedelta(minutes=1), start_time + timedelta(minutes=1))
573+
if start_result:
574+
start_data.extend(start_result)
575+
576+
# Query metrics around end time
577+
end_result = self.query_range(metric_with_filters, end_time - timedelta(minutes=1), end_time + timedelta(minutes=1))
578+
if end_result:
579+
end_data.extend(end_result)
580+
581+
except Exception as e:
582+
print(f"Warning: Failed to query metric {metric}: {e}")
583+
continue
584+
585+
# Process the data to calculate differences
586+
return self._process_pgss_data(start_data, end_data, start_time, end_time, METRIC_NAME_MAPPING)
587+
588+
def query_range(self, query: str, start_time: datetime, end_time: datetime, step: str = "30s") -> List[Dict[str, Any]]:
589+
"""
590+
Execute a range PromQL query.
591+
592+
Args:
593+
query: PromQL query string
594+
start_time: Start time
595+
end_time: End time
596+
step: Query step interval
597+
598+
Returns:
599+
List of query results
600+
"""
601+
params = {
602+
'query': query,
603+
'start': start_time.timestamp(),
604+
'end': end_time.timestamp(),
605+
'step': step
606+
}
607+
608+
try:
609+
response = requests.get(f"{self.base_url}/query_range", params=params)
610+
if response.status_code == 200:
611+
result = response.json()
612+
if result.get('status') == 'success':
613+
return result.get('data', {}).get('result', [])
614+
else:
615+
print(f"Range query failed with status {response.status_code}: {response.text}")
616+
except Exception as e:
617+
print(f"Range query error: {e}")
618+
619+
return []
620+
621+
def _process_pgss_data(self, start_data: List[Dict], end_data: List[Dict],
622+
start_time: datetime, end_time: datetime,
623+
metric_mapping: Dict[str, str]) -> List[Dict[str, Any]]:
624+
"""
625+
Process pg_stat_statements data and calculate differences between start and end times.
626+
Adapted from the logic in flask-backend/app.py process_pgss_data().
627+
"""
628+
# Convert Prometheus data to dictionaries
629+
start_metrics = self._prometheus_to_dict(start_data, start_time)
630+
end_metrics = self._prometheus_to_dict(end_data, end_time)
631+
632+
if not start_metrics and not end_metrics:
633+
return []
634+
635+
# Create a combined dictionary with all unique query identifiers
636+
all_keys = set()
637+
all_keys.update(start_metrics.keys())
638+
all_keys.update(end_metrics.keys())
639+
640+
result_rows = []
641+
642+
# Calculate differences for each query
643+
for key in all_keys:
644+
start_metric = start_metrics.get(key, {})
645+
end_metric = end_metrics.get(key, {})
646+
647+
# Extract identifier components from key
648+
db_name, query_id, user, instance = key
649+
650+
# Calculate actual duration from metric timestamps
651+
start_timestamp = start_metric.get('timestamp')
652+
end_timestamp = end_metric.get('timestamp')
653+
654+
if start_timestamp and end_timestamp:
655+
start_dt = datetime.fromisoformat(start_timestamp)
656+
end_dt = datetime.fromisoformat(end_timestamp)
657+
actual_duration = (end_dt - start_dt).total_seconds()
658+
else:
659+
# Fallback to query parameter duration if timestamps are missing
660+
actual_duration = (end_time - start_time).total_seconds()
661+
662+
# Create result row
663+
row = {
664+
'queryid': query_id,
665+
'database': db_name,
666+
'user': user,
667+
'duration_seconds': actual_duration
668+
}
669+
670+
# Numeric columns to calculate differences for (using original metric names)
671+
numeric_cols = list(metric_mapping.keys())
672+
673+
# Calculate differences and rates
674+
for col in numeric_cols:
675+
start_val = start_metric.get(col, 0)
676+
end_val = end_metric.get(col, 0)
677+
diff = end_val - start_val
678+
679+
# Use simplified display name
680+
display_name = metric_mapping[col]
681+
682+
# Convert bytes to blocks for block-related metrics (PostgreSQL uses 8KB blocks)
683+
if 'blks' in display_name and 'bytes' in col:
684+
diff = diff / 8192 # Convert bytes to 8KB blocks
685+
686+
row[display_name] = diff
687+
688+
# Calculate rates per second
689+
if row['duration_seconds'] > 0:
690+
row[f'{display_name}_per_sec'] = diff / row['duration_seconds']
691+
else:
692+
row[f'{display_name}_per_sec'] = 0
693+
694+
# Calculate per-call averages
695+
calls_diff = row.get('calls', 0)
696+
if calls_diff > 0:
697+
row[f'{display_name}_per_call'] = diff / calls_diff
698+
else:
699+
row[f'{display_name}_per_call'] = 0
700+
701+
result_rows.append(row)
702+
703+
return result_rows
704+
705+
def _prometheus_to_dict(self, prom_data: List[Dict], timestamp: datetime) -> Dict:
706+
"""
707+
Convert Prometheus API response to dictionary keyed by query identifiers.
708+
Adapted from the logic in flask-backend/app.py prometheus_to_dict().
709+
"""
710+
if not prom_data:
711+
return {}
712+
713+
metrics_dict = {}
714+
715+
for metric_data in prom_data:
716+
metric = metric_data.get('metric', {})
717+
values = metric_data.get('values', [])
718+
719+
if not values:
720+
continue
721+
722+
# Get the closest value to our timestamp
723+
closest_value = min(values, key=lambda x: abs(float(x[0]) - timestamp.timestamp()))
724+
725+
# Create unique key for this query
726+
key = (
727+
metric.get('datname', ''),
728+
metric.get('queryid', ''),
729+
metric.get('user', ''),
730+
metric.get('instance', '')
731+
)
732+
733+
# Initialize metric dict if not exists
734+
if key not in metrics_dict:
735+
metrics_dict[key] = {
736+
'timestamp': datetime.fromtimestamp(float(closest_value[0])).isoformat(),
737+
}
738+
739+
# Add metric value
740+
metric_name = metric.get('__name__', 'pgwatch_pg_stat_statements_calls')
741+
clean_name = metric_name.replace('pgwatch_pg_stat_statements_', '')
742+
743+
try:
744+
metrics_dict[key][clean_name] = float(closest_value[1])
745+
except (ValueError, IndexError):
746+
metrics_dict[key][clean_name] = 0
747+
748+
return metrics_dict
749+
431750
def format_bytes(self, bytes_value: float) -> str:
432751
"""Format bytes value for human readable display."""
433752
if bytes_value == 0:
@@ -619,6 +938,8 @@ def generate_all_reports(self, cluster: str = "local", node_name: str = "node-01
619938
reports['H001'] = self.generate_h001_invalid_indexes_report(cluster, node_name)
620939
reports['F005'] = self.generate_f005_btree_bloat_report(cluster, node_name)
621940
reports['F004'] = self.generate_f004_heap_bloat_report(cluster, node_name)
941+
reports['K001'] = self.generate_k001_query_calls_report(cluster, node_name)
942+
reports['K003'] = self.generate_k003_top_queries_report(cluster, node_name)
622943

623944
return reports
624945
def create_report(self, api_url, token, project, epoch):
@@ -673,7 +994,7 @@ def main():
673994
help='Cluster name (default: local)')
674995
parser.add_argument('--node-name', default='node-01',
675996
help='Node name (default: node-01)')
676-
parser.add_argument('--check-id', choices=['A002', 'A003', 'A004', 'A007', 'H001', 'F005', 'F004', 'ALL'],
997+
parser.add_argument('--check-id', choices=['A002', 'A003', 'A004', 'A007', 'H001', 'F005', 'F004', 'K001', 'K003', 'ALL'],
677998
help='Specific check ID to generate (default: ALL)')
678999
parser.add_argument('--output', default='-',
6791000
help='Output file (default: stdout)')
@@ -704,7 +1025,6 @@ def main():
7041025
if not args.no_upload:
7051026
generator.upload_report_file(args.api_url, args.token, report_id, f"{report}.json")
7061027
if args.output == '-':
707-
7081028
pass
7091029
else:
7101030
with open(args.output, 'w') as f:
@@ -726,6 +1046,10 @@ def main():
7261046
report = generator.generate_f005_btree_bloat_report(args.cluster, args.node_name)
7271047
elif args.check_id == 'F004':
7281048
report = generator.generate_f004_heap_bloat_report(args.cluster, args.node_name)
1049+
elif args.check_id == 'K001':
1050+
report = generator.generate_k001_query_calls_report(args.cluster, args.node_name)
1051+
elif args.check_id == 'K003':
1052+
report = generator.generate_k003_top_queries_report(args.cluster, args.node_name)
7291053

7301054
if args.output == '-':
7311055
print(json.dumps(report, indent=2))

0 commit comments

Comments
 (0)