Skip to content

Commit

Permalink
add autoRegister and cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Aug 31, 2019
1 parent ba24c15 commit 456945f
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.otter.canal.admin.model.BaseModel;
import com.alibaba.otter.canal.admin.model.CanalConfig;
Expand Down Expand Up @@ -40,11 +45,15 @@ public class PollingConfigController {
@GetMapping(value = "/server_polling")
public BaseModel<CanalConfig> canalConfigPoll(@RequestHeader String user, @RequestHeader String passwd,
@RequestParam String ip, @RequestParam Integer port,
@RequestParam String md5, @PathVariable String env) {
@RequestParam String md5, @PathVariable boolean register,
@PathVariable String cluster, @PathVariable String env) {
if (!auth(user, passwd)) {
throw new RuntimeException("auth :" + user + " is failed");
}

if (register) {
// do something
}
CanalConfig canalConfig = pollingConfigServer.getChangedConfig(ip, port, md5);
return BaseModel.getInstance(canalConfig);
}
Expand All @@ -54,8 +63,8 @@ public BaseModel<CanalConfig> canalConfigPoll(@RequestHeader String user, @Reque
*/
@GetMapping(value = "/instance_polling/{destination}")
public BaseModel<CanalInstanceConfig> instanceConfigPoll(@RequestHeader String user, @RequestHeader String passwd,
@PathVariable String env, @PathVariable String destination,
@RequestParam String md5) {
@PathVariable String env,
@PathVariable String destination, @RequestParam String md5) {
if (!auth(user, passwd)) {
throw new RuntimeException("auth :" + user + " is failed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class CanalConstants {
public static final String CANAL_ADMIN_PORT = ROOT + "." + "admin.port";
public static final String CANAL_ADMIN_USER = ROOT + "." + "admin.user";
public static final String CANAL_ADMIN_PASSWD = ROOT + "." + "admin.passwd";
public static final String CANAL_ADMIN_AUTO_REGISTER = ROOT + "." + "admin.auto.register";
public static final String CANAL_ADMIN_AUTO_CLUSTER = ROOT + "." + "admin.auto.cluster";
public static final String CANAL_ZKSERVERS = ROOT + "." + "zkServers";
public static final String CANAL_WITHOUT_NETTY = ROOT + "." + "withoutNetty";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
public class CanalController {

private static final Logger logger = LoggerFactory.getLogger(CanalController.class);
private Long cid;
private String ip;
private String registerIp;
private int port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -46,20 +47,25 @@ public static void main(String[] args) {
}

final CanalStater canalStater = new CanalStater(properties);
String managerAddress = properties.getProperty(CanalConstants.CANAL_ADMIN_MANAGER);
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
String user = properties.getProperty(CanalConstants.CANAL_ADMIN_USER);
String passwd = properties.getProperty(CanalConstants.CANAL_ADMIN_PASSWD);
String adminPort = properties.getProperty(CanalConstants.CANAL_ADMIN_PORT, "11110");
String registerIp = properties.getProperty(CanalConstants.CANAL_REGISTER_IP);
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
registerIp = AddressUtils.getHostIp();
}
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort));
Integer.parseInt(adminPort),
autoRegister,
autoCluster);
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
Expand All @@ -69,8 +75,8 @@ public static void main(String[] args) {
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
int scanIntervalInSecond = Integer.valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(properties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
executor.scheduleWithFixedDelay(new Runnable() {

private PlainCanal lastCanalConfig;
Expand Down
5 changes: 4 additions & 1 deletion deployer/src/main/resources/canal_local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ canal.register.ip =
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public class PlainCanalConfigClient extends AbstractCanalLifeCycle implements Ca
private HttpHelper httpHelper;
private String localIp;
private int adminPort;
private boolean autoRegister;
private String autoCluster;

public PlainCanalConfigClient(String configURL, String user, String passwd, String localIp, int adminPort,
boolean autoRegister, String autoCluster){
this(configURL, user, passwd, localIp, adminPort);
this.autoCluster = autoCluster;
this.autoRegister = autoRegister;
}

public PlainCanalConfigClient(String configURL, String user, String passwd, String localIp, int adminPort){
this.configURL = configURL;
Expand Down Expand Up @@ -61,7 +70,8 @@ public PlainCanal findServer(String md5) {
if (StringUtils.isEmpty(md5)) {
md5 = "";
}
String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5;
String url = configURL + "/api/v1/config/server_polling?ip=" + localIp + "&port=" + adminPort + "&md5=" + md5
+ "&register=" + (autoRegister ? 1 : 0) + "&cluster=" + autoCluster;
return queryConfig(url);
}

Expand Down

0 comments on commit 456945f

Please sign in to comment.