Skip to content

Commit

Permalink
[collector]bugfix:To fix the issue of reusing the adminClient durin…
Browse files Browse the repository at this point in the history
…g the process of modifying monitoring instances in the Kafka client
  • Loading branch information
doveLin0818 committed Dec 21, 2024
1 parent 6aff6ed commit aa2d1bd
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hertzbeat.collector.collect.kafka;

import org.apache.commons.lang3.StringUtils;
import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand All @@ -32,6 +33,8 @@ public class KafkaConnect extends AbstractConnection<AdminClient> {

private static AdminClient adminClient;

private static String preUrl;

public KafkaConnect(String brokerList) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
Expand All @@ -53,12 +56,14 @@ public void closeConnection() {
}

public static synchronized AdminClient getAdminClient(String brokerList) {
if (adminClient == null) {
if (StringUtils.isBlank(preUrl) || !brokerList.equals(preUrl)) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
adminClient = KafkaAdminClient.create(properties);
preUrl = brokerList;
return adminClient;
}
return adminClient;
}

}
}

0 comments on commit aa2d1bd

Please sign in to comment.