Skip to content

Commit

Permalink
[INLONG-2711][Manager][SDK] Dataproxy-SDK gets manager IP list error (a…
Browse files Browse the repository at this point in the history
…pache#2712)

* [INLONG-2711][Manager] [Bug][SDK] Dataproxy-SDK gets manager IP list error

* [INLONG-2711][Manager] [Bug][SDK] Change error log info
  • Loading branch information
ciscozhou authored Feb 25, 2022
1 parent 924538a commit 5a5b8cd
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class AgentController {
@Autowired
private ThirdPartyClusterService thirdPartyClusterService;

@GetMapping("/getInLongManagerIp")
@GetMapping("/getManagerIpList")
@ApiOperation(value = "Get inlong manager ip list")
public Response<List<String>> getInLongManagerIp() {
return Response.success(thirdPartyClusterService.listClusterIpByType("inlong-openapi"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,8 @@

package org.apache.inlong.sdk.dataproxy.utils;

import static org.apache.inlong.sdk.dataproxy.ConfigConstants.REQUEST_HEADER_AUTHORIZATION;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import javax.net.ssl.SSLContext;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
Expand All @@ -53,116 +39,128 @@
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;

import static org.apache.inlong.sdk.dataproxy.ConfigConstants.REQUEST_HEADER_AUTHORIZATION;

/**
* Created by elroyzhang on 2018/5/10.
* Utils for service discovery
*/
public class ServiceDiscoveryUtils {

private static final Logger log = LoggerFactory.getLogger(ServiceDiscoveryUtils.class);

private static final String GET_MANAGER_IP_LIST_API = "/api/inlong/manager/openapi/agent/getManagerIpList";
private static String latestManagerIPList = "";
private static String arraySed = ",";

public static String getManagerIpList(ProxyClientConfig proxyClientConfig) throws ProxysdkException {
String managerIpList;
String managerAddress = proxyClientConfig.getManagerIP() + ":" + proxyClientConfig.getManagerPort();
/**
* Get Inlong-Manager IP list from the given proxy client config
*/
public static String getManagerIpList(ProxyClientConfig clientConfig) {
String managerAddress = clientConfig.getManagerIP() + ":" + clientConfig.getManagerPort();
if (StringUtils.isBlank(managerAddress)) {
log.error("managerAddress is blank.");
log.error("ServiceDiscovery get managerIpList but managerAddress is blank, just return");
return null;
}

managerIpList = getManagerIpListByHttp(managerAddress, proxyClientConfig);
if (!StringUtils.isBlank(managerIpList)) {
String managerIpList = getManagerIpListByHttp(managerAddress, clientConfig);
if (StringUtils.isNotBlank(managerIpList)) {
latestManagerIPList = managerIpList;
return managerIpList;
}
log.error("ServiceDiscovery get managerIpList from "
+ "managerHost occur error, will try to get from managerIpList.");

log.error("ServiceDiscovery get managerIpList from {} occur error, try to get from latestManagerIPList",
managerAddress);

String[] managerIps = latestManagerIPList.split(arraySed);
if (managerIps.length > 0) {
for (String managerIp : managerIps) {
if (!StringUtils.isBlank(managerIp)) {
String currentAddress = managerIp + ":" + proxyClientConfig.getManagerPort();
managerIpList = getManagerIpListByHttp(currentAddress, proxyClientConfig);
if (!StringUtils.isBlank(managerIpList)) {
latestManagerIPList = managerIpList;
return managerIpList;
} else {
log.error("ServiceDiscovery request " + managerIp
+ " got by latestManagerIPList[" + latestManagerIPList
+ "] got nothing, will try next ip.");
}
} else {
log.error("ServiceDiscovery managerIp is null, "
+ "latestManagerIPList is [" + latestManagerIPList
+ "].");
if (StringUtils.isBlank(managerIp)) {
log.error("ServiceDiscovery managerIp is null, latestManagerIPList is {}", latestManagerIPList);
continue;
}

String currentAddress = managerIp + ":" + clientConfig.getManagerPort();
managerIpList = getManagerIpListByHttp(currentAddress, clientConfig);
if (StringUtils.isBlank(managerIpList)) {
log.error("ServiceDiscovery get latestManagerIPList from {} but got nothing, will try next ip",
managerIp);
continue;
}
latestManagerIPList = managerIpList;
return managerIpList;
}
} else {
log.error("ServiceDiscovery latestManagerIpList["
+ latestManagerIPList + "] format error, or not contain ip");
log.error("ServiceDiscovery latestManagerIpList {} format error, or not contain ip", latestManagerIPList);
}

String existedTdmIpList = getLocalManagerIpList(proxyClientConfig.getManagerIpLocalPath());
if (!StringUtils.isBlank(existedTdmIpList)) {
String[] existedTdmIps = existedTdmIpList.split(arraySed);
if (existedTdmIps.length > 0) {
for (String existedTdmIp : existedTdmIps) {
if (!StringUtils.isBlank(existedTdmIp)) {
String currentAddress = existedTdmIp + ":" + proxyClientConfig.getManagerPort();
managerIpList = getManagerIpListByHttp(currentAddress, proxyClientConfig);
if (!StringUtils.isBlank(managerIpList)) {
latestManagerIPList = managerIpList;
return managerIpList;
} else {
log.error("ServiceDiscovery request " + existedTdmIp + " got by local file["
+ proxyClientConfig.getManagerIpLocalPath() + "] got nothing, will try next ip.");
}
} else {
log.error("ServiceDiscovery get illegal format TdmIpList from local file, "
+ "exist one ip is empty, managerIpList is ["
+ existedTdmIpList + "], "
+ "local file is [" + proxyClientConfig.getManagerIpLocalPath() + "]");
String existedIpList = getLocalManagerIpList(clientConfig.getManagerIpLocalPath());
if (StringUtils.isNotBlank(existedIpList)) {
String[] existedIps = existedIpList.split(arraySed);
if (existedIps.length > 0) {
for (String existedIp : existedIps) {
if (StringUtils.isBlank(existedIp)) {
log.error("ServiceDiscovery get illegal format ipList from local file, "
+ "exist ip is empty, managerIpList is {}, local file is {}",
existedIpList, clientConfig.getManagerIpLocalPath());
continue;
}

String currentAddress = existedIp + ":" + clientConfig.getManagerPort();
managerIpList = getManagerIpListByHttp(currentAddress, clientConfig);
if (StringUtils.isBlank(managerIpList)) {
log.error("ServiceDiscovery get {} from local file {} but got nothing, will try next ip",
existedIp, clientConfig.getManagerIpLocalPath());
continue;
}
latestManagerIPList = managerIpList;
return managerIpList;
}
} else {
log.error("ServiceDiscovery get illegal format TdmIpList from local file, "
+ "managerIpList is [" + existedTdmIpList + "], "
+ "local file is [" + proxyClientConfig.getManagerIpLocalPath() + "]");
log.error("ServiceDiscovery get illegal format ipList from local file, "
+ "exist ip is empty, managerIpList is {}, local file is {}",
existedIpList, clientConfig.getManagerIpLocalPath());
}
} else {
log.error("ServiceDiscovery get empty TdmIpList from local file, "
+ "file path is [" + proxyClientConfig.getManagerIpLocalPath() + "].");
log.error("ServiceDiscovery get empty ipList from local file {}", clientConfig.getManagerIpLocalPath());
}

return managerIpList;
}

public static String getManagerIpListByHttp(String managerIp,
ProxyClientConfig proxyClientConfig) throws ProxysdkException {
/**
* Get Inlong-Manager IP list from the given managerIp and proxy client config
*/
public static String getManagerIpListByHttp(String managerIp, ProxyClientConfig proxyClientConfig) {
String url =
(proxyClientConfig.isLocalVisit() ? "http://" : "https://") + managerIp + "/api/getmanagervirtualip";
(proxyClientConfig.isLocalVisit() ? "http://" : "https://") + managerIp + GET_MANAGER_IP_LIST_API;
ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
params.add(new BasicNameValuePair("operation", "query"));
params.add(new BasicNameValuePair("username", proxyClientConfig.getUserName()));

log.info("Begin to get configure from manager {}, param is {}", url, params);
CloseableHttpClient httpClient = null;
HttpPost httpPost = null;
String returnStr = null;
CloseableHttpClient httpClient;
HttpParams myParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(myParams, proxyClientConfig.getManagerConnectionTimeout());
HttpConnectionParams.setSoTimeout(myParams, proxyClientConfig.getManagerSocketTimeout());
if (proxyClientConfig.isLocalVisit()) {
httpClient = new DefaultHttpClient(myParams);
} else {
try {
ArrayList<Header> headers = new ArrayList<Header>();
ArrayList<Header> headers = new ArrayList<>();
for (BasicNameValuePair paramItem : params) {
headers.add(new BasicHeader(paramItem.getName(), paramItem.getValue()));
}
Expand All @@ -174,13 +172,13 @@ public static String getManagerIpListByHttp(String managerIp,
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
httpClient = HttpClients.custom().setDefaultHeaders(headers)
.setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build();
} catch (Throwable eHttps) {
log.error("Create Https cliet failure, error 1 is ", eHttps);
eHttps.printStackTrace();
} catch (Throwable t) {
log.error("Create Https client failed: ", t);
return null;
}
}

HttpPost httpPost = null;
try {
httpPost = new HttpPost(url);
if (proxyClientConfig.isNeedAuthentication()) {
Expand All @@ -190,22 +188,21 @@ public static String getManagerIpListByHttp(String managerIp,
Utils.getAuthorizenInfo(proxyClientConfig.getUserName(),
proxyClientConfig.getSecretKey(), timestamp, nonce));
}
UrlEncodedFormEntity se = new UrlEncodedFormEntity(params);
httpPost.setEntity(se);
httpPost.setEntity(new UrlEncodedFormEntity(params));
HttpResponse response = httpClient.execute(httpPost);
returnStr = EntityUtils.toString(response.getEntity());
String returnStr = EntityUtils.toString(response.getEntity());
if (Utils.isNotBlank(returnStr) && response.getStatusLine().getStatusCode() == 200) {
log.info("Get configure from manager is " + returnStr);
JsonParser jsonParser = new JsonParser();
JsonObject jb = jsonParser.parse(returnStr).getAsJsonObject();
JsonObject rd = jb.get("resultData").getAsJsonObject();
String ip = rd.get("ip").getAsString();
log.info("ServiceDiscovery updated managerVirtualIP success, ip : " + ip + ", retStr : " + returnStr);
log.info("ServiceDiscovery updated manager ip success, ip : " + ip + ", retStr : " + returnStr);
return ip;
}
return null;
} catch (Throwable e) {
log.error("Connect Manager error, {}", e.getMessage());
} catch (Throwable t) {
log.error("Connect Manager error: ", t);
return null;
} finally {
if (httpPost != null) {
Expand All @@ -217,71 +214,51 @@ public static String getManagerIpListByHttp(String managerIp,
}
}

public static String getLocalManagerIpList(String managerIpLocalPath) {
log.info("ServiceDiscovery start loading config for :{} from file ...", managerIpLocalPath);
BufferedReader reader = null;
/**
* Get Inlong-Manager IP list from local path
*/
public static String getLocalManagerIpList(String localPath) {
log.info("ServiceDiscovery start loading config from file {} ...", localPath);
String newestIp = null;
try {
File managerIpListFile = new File(managerIpLocalPath);
File managerIpListFile = new File(localPath);
if (!managerIpListFile.exists()) {
log.info("ServiceDiscovery no found local groupIdInfo file, "
+ "doesn't matter, path is [" + managerIpLocalPath + "].");
log.info("ServiceDiscovery not found local groupIdInfo file from {}", localPath);
return null;
}
byte[] serialized;
serialized = FileUtils.readFileToByteArray(managerIpListFile);
byte[] serialized = FileUtils.readFileToByteArray(managerIpListFile);
if (serialized == null) {
return null;
}
newestIp = new String(serialized, "UTF-8");
log.info("ServiceDiscovery get manager ip list from local success, result is : {}", newestIp);
} catch (FileNotFoundException e) {
log.error("ServiceDiscovery load manager config error, file not found. Exception info : {}", e);
newestIp = new String(serialized, StandardCharsets.UTF_8);
log.info("ServiceDiscovery get manager ip list from local success, result is: {}", newestIp);
} catch (IOException e) {
log.error("ServiceDiscovery load manager config error. Exception info : {}", e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
log.error("ServiceDiscovery close bufferedReader "
+ "error after loading InLong Manager config . Exception info : {}.", e);
}
}
log.error("ServiceDiscovery load manager config error: ", e);
}

return newestIp;
}

/**
* Update Inlong-Manager info to local file
*/
public static void updateManagerInfo2Local(String storeString, String path) {
if (StringUtils.isBlank(storeString)) {
log.warn("ServiceDiscovery updateTdmInfo2Local error, configMap is empty or managerIpList is blank.");
log.warn("ServiceDiscovery updateTdmInfo2Local error, configMap is empty or managerIpList is blank");
return;
}
BufferedWriter writer = null;
try {
File localPath = new File(path);
if (!localPath.getParentFile().exists()) {
localPath.getParentFile().mkdirs();
}

File localPath = new File(path);
if (!localPath.getParentFile().exists()) {
localPath.getParentFile().mkdirs();
}
writer = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(localPath), StandardCharsets.UTF_8));
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(localPath), StandardCharsets.UTF_8))) {
writer.write(storeString);
writer.flush();
} catch (UnsupportedEncodingException e) {
log.error("ServiceDiscovery save manager config error1 .", e);
} catch (FileNotFoundException e) {
log.error("ServiceDiscovery save manager config error2 .", e);
} catch (IOException e) {
log.error("ServiceDiscovery save manager config error3 .", e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
log.error("ServiceDiscovery close manager writer error.", e);
}
}
log.error("ServiceDiscovery save manager config error: ", e);
}
}

}

0 comments on commit 5a5b8cd

Please sign in to comment.