Compass is a big data task diagnosis platform, which aims to improve the efficiency of user troubleshooting and reduce the cost of abnormal tasks for users.
The key features:
-
Non-invasive, instant diagnosis, you can experience the diagnostic effect without modifying the existing scheduling platform.
-
Supports multiple scheduling platforms(DolphinScheduler 2.x or 3.x, Airflow, or self-developed etc.)
-
Supports Spark 2.x or 3.x, Flink 1.2~, Hadoop 2.x or 3.x troubleshooting.
-
Supports workflow layer exception diagnosis, identifies various failures and baseline time-consuming abnormal problems.
-
Supports Spark engine layer exception diagnosis, including 14 types of exceptions such as data skew, large table scanning, and memory waste.
-
Supports various log matching rule writing and abnormal threshold adjustment, and can be optimized according to actual scenarios.
-
Supports Flink engine layer resource and exception diagnosis,such as low memory utilization,low cpu utilization.
Compass has supported the concept of diagnostic types:
Engine | Diagnostic Dimensions | Diagnostic Type | Type Description |
Spark | Failure analysis | Run failure | Tasks that ultimately fail to run |
First failure | Tasks that have been retried more than once | ||
Long term failure | Tasks that have failed to run in the last ten days | ||
Time analysis | Baseline time abnormality | Tasks that end earlier or later than the historical normal end time | |
Baseline time-consuming abnormality | Tasks that run for too long or too short relative to the historical normal running time | ||
Long running time | Tasks that run for more than two hours | ||
Error analysis | SQL failure | Tasks that fail due to SQL execution issues | |
Shuffle failure | Tasks that fail due to shuffle execution issues | ||
Memory overflow | Tasks that fail due to memory overflow issues | ||
Resource analysis | Memory waste | Tasks with a peak memory usage to total memory ratio that is too low | |
CPU waste | Tasks with a driver/executor calculation time to total CPU calculation time ratio that is too low | ||
Efficiency analysis | Large table scanning | Tasks with too many scanned rows due to no partition restrictions | |
OOM warning | Tasks with a cumulative memory of broadcast tables and a high memory ratio of driver or executor | ||
Data skew | Tasks where the maximum amount of data processed by the task in the stage is much larger than the median | ||
Job time-consuming abnormality | Tasks with a high ratio of idle time to job running time | ||
Stage time-consuming abnormality | Tasks with a high ratio of idle time to stage running time | ||
Task long tail | Tasks where the maximum running time of the task in the stage is much larger than the median | ||
HDFS stuck | Tasks where the processing rate of tasks in the stage is too slow | ||
Too many speculative execution tasks | Tasks in which speculative execution of tasks frequently occurs in the stage | ||
Global sorting abnormality | Tasks with long running time due to global sorting | ||
MapReduce | Resource analysis | Memory waste | Tasks with a peak memory usage to total memory ratio that is too low |
Efficiency analysis | Large table scanning | Tasks with too many scanned rows | |
Task long tail | Tasks where the maximum running time of the task in the map/reduce is much larger than the median | ||
Data skew | Tasks where the maximum amount of data processed by the task in the map/reduce is much larger than the median | ||
Too many speculative execution tasks | Tasks in which speculative execution of tasks frequently occurs in the map/reduce | ||
GC abnormal | Tasks with a high ratio of GC time to CPU time | ||
Flink | Resource analysis | High memory utilization | Calculates the utilization of memory, if it's higher than threshold,then increase the memory config |
Low memory utilization | Calculates the utilization of memory, if it's lower than threshold,then decrease the memory config | ||
Job manager memory | Calculates the memory of job manager according to tm numbers | ||
Job no data flow | Calculates if the job has no data flow | ||
Task manager manage memory optimization | Calculates manage memory utilization of job, give the advice of manage memory config | ||
Task managers run without data flow | Calculates if a part of task managers running without data flow | ||
Parallel not enough | Calculates whether the parallel of job is not enough | ||
Cpu utilization high | Calculates the cpu utilization of job, if it's higher than threshold then increase the cpu config | ||
Cpu utilization low | Calculates the cpu utilization of job, if it's lower than threshold then decrease the cpu config | ||
Cpu peek utilization high | Calculates the peek cpu utilization of job, if it's higher than threshold then increase the cpu config | ||
Exception analysis | Slow vertices | Calculates if the job has slow vertices | |
Back pressure | Calculates if the job has back pressure | ||
High delay | Calculates if the job has high data delay |
Use JDK 8 and maven 3.6.0+ to Compile
git clone https://github.com/cubefs/compass.git
cd compass
mvn package -DskipTests
cd dist/compass
vi bin/compass_env.sh
# Scheduler MySQL
export SCHEDULER_MYSQL_ADDRESS="ip:port"
export SCHEDULER_MYSQL_DB="scheduler"
export SCHEDULER_DATASOURCE_USERNAME="user"
export SCHEDULER_DATASOURCE_PASSWORD="pwd"
# Compass MySQL
export COMPASS_MYSQL_ADDRESS="ip:port"
export COMPASS_MYSQL_DB="compass"
export SPRING_DATASOURCE_USERNAME="user"
export SPRING_DATASOURCE_PASSWORD="pwd"
# Kafka (default version: 3.4.0)
export SPRING_KAFKA_BOOTSTRAPSERVERS="ip1:port,ip2:port"
# Redis (cluster mode)
export SPRING_REDIS_CLUSTER_NODES="ip1:port,ip2:port"
# Zookeeper (default version: 3.4.5, used by canal)
export SPRING_ZOOKEEPER_NODES="ip1:port,ip2:port"
# Elasticsearch (default version: 7.17.9)
export SPRING_ELASTICSEARCH_NODES="ip1:port,ip2:port"
# Flink metric prometheus
export FLINK_PROMETHEUS_HOST="host"
export FLINK_PROMETHEUS_TOKEN=""
export FLINK_PROMETHEUS_DATABASE=""
vi conf/application-hadoop.yml
hadoop:
namenodes:
- nameservices: logs-hdfs # the value of dfs.nameservices
namenodesAddr: [ "machine1.example.com", "machine2.example.com" ] # the value of dfs.namenode.rpc-address.[nameservice ID].[name node ID]
namenodes: [ "nn1", "nn2" ] # the value of dfs.ha.namenodes.[nameservice ID]
user: hdfs
password:
port: 8020
# scheduler platform hdfs log path keyword identification, used by task-application
matchPathKeys: [ "flume" ]
# kerberos
enableKerberos: false
# /etc/krb5.conf
krb5Conf: ""
# hdfs/*@EXAMPLE.COM
principalPattern: ""
# admin
loginUser: ""
# /var/kerberos/krb5kdc/admin.keytab
keytabPath: ""
yarn:
- clusterName: "bigdata"
resourceManager: [ "machine1:8088", "machine2:8088" ] # the value of yarn.resourcemanager.webapp.address
jobHistoryServer: "machine3:19888" # the value of mapreduce.jobhistory.webapp.address
spark:
sparkHistoryServer: [ "machine4:18080" ] # the value of spark.history.ui
The Compass table structure consists of two parts, one is compass.sql, and the other is a table that depends on the scheduling platform (dolphinscheduler.sql or airflow.sql, etc.)
-
Please execute document/sql/compass.sql first
-
If you are using the DolphinScheduler scheduling platform, please execute document/sql/dolphinscheduler.sql(It needs to be modified according to the actual version used); if you are using the Airflow scheduling platform, please execute document/sql/airflow.sql(It needs to be modified according to the actual version used)
-
If you are using a self-developed scheduling platform, please refer to the task-syncer module to determine the tables that need to be synchronized
./bin/start_all.sh
Third party system can send flink metadata to compass by kafka stream or http API, user do not have to run canal to capture metadata from scheduler. the format of metadata as following:
format parameter:
{
// fields required
"startTime":"2023-06-01", // job startrd time
"projectName":"test", // project name
"flowName":"test", // flow name
"taskName":"test", // task name
"jobName":"job_name", // job name
"username":"test", // user name
"flinkTrackUrl":"tracking url", // job tracking url
"taskState":"RUNNING", // running state
"parallel":150, // job parallel
"tmSlot":1, // tm slot
"tmCore":2, // tm core
"jmMem":1024, // jobmanager memory MB
"tmMem":4096, // taskmanager memory MB
// fields optionally required
"userId":1, // user id from scheduler
"projectId":1, // project id
"flowId":1, // flow id
"taskId":1, // task id
"taskInstanceId":1, // task instance id
"executionTime":"2023-06-01", // execution time
"allocatedMb":1, // yarn allocated memory
"allocatedVcores":1, // yarn allocated core
"runningContainers":1, // running containers
"engineType":"flink", // engine type
"duration":"1", // job duration time
"endTime":"2023-06-01", // job end time
"vcoreSeconds":1, // job vcore seconds
"memorySeconds":1, // job memory seconds
"queue":"flink", // yarn queue
"clusterName":"flink", // yarn cluster name
"retryTimes":1, // retry times
"executeUser":"user", // execute user
"createTime":"2023-06-01", // created time
"updateTime":"2023-06-01", // updated time
"diagnosis":"1", // yarn diagnosis
"applicationId":"app id" // app id
}
Kafka:
Send the json content to flink-task-app topic. If you want to change the topic
name,then modify the spring.kafka.flinkTaskApp property of application.yml file in
task-flink module.
Http:
Fill the json content to http body and send the post request to
http://[compass_host]/compass/api/flink/saveRealtimeTaskApp,
Welcome to join the community for the usage or development of Compass. Here is the way to get help:
- Submit an issue.
- Join the wechat group, search and add WeChat ID
daiwei_cn
orzebozhuang
. Please indicate your intention in the verification information. After verification, we will invite you to the community group. - If you like our product, please star our repository-compass, your support will be our motivation to create better product.
Compass is licensed under the Apache License, Version 2.0 For detail see LICENSE and NOTICE.