diff --git a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java b/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java index fe983dc2b7..bc068abdc2 100644 --- a/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java +++ b/common/src/main/java/com/alibaba/otter/canal/common/zookeeper/running/ServerRunningMonitor.java @@ -38,7 +38,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle { private ServerRunningData serverData; // 当前实际运行的节点状态信息 private volatile ServerRunningData activeData; - private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService delayExecutor; private int delayTime = 5; private ServerRunningListener listener; @@ -73,7 +73,7 @@ public void handleDataDeleted(String dataPath) throws Exception { initRunning(); } else { // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作 - delayExector.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS); + delayExecutor.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS); } } @@ -90,6 +90,7 @@ public synchronized void start() { try { processStart(); if (zkClient != null) { + delayExecutor = Executors.newScheduledThreadPool(1); // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); @@ -122,6 +123,10 @@ public synchronized void stop() { if (zkClient != null) { String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.unsubscribeDataChanges(path, dataListener); + if (delayExecutor != null) { + delayExecutor.shutdown(); + delayExecutor = null; + } releaseRunning(); // 尝试一下release } else { diff --git a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java index 53de822c9e..33e5e1cb58 100644 --- a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java +++ b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java @@ -558,6 +558,11 @@ public void stop() throws Throwable { } ZkClientx.clearClients(); + + // 需要释放 CanalServerWithEmbedded 否则主线程退出后,进程无法自动完整退出... + if (embededCanalServer != null) { + embededCanalServer.stop(); + } } private void initCid(String path) {