Skip to content

Commit

Permalink
1. CanalController stop 需要同时将 embededCanalServer.stop (#4477)
Browse files Browse the repository at this point in the history
2. ServerRunningMonitor 线程池未正常回收,线程池管理与 start/stop保持一致
  • Loading branch information
hua74ni authored Nov 16, 2022
1 parent 10ff148 commit fa0c08c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ServerRunningMonitor extends AbstractCanalLifeCycle {
private ServerRunningData serverData;
// 当前实际运行的节点状态信息
private volatile ServerRunningData activeData;
private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService delayExecutor;
private int delayTime = 5;
private ServerRunningListener listener;

Expand Down Expand Up @@ -73,7 +73,7 @@ public void handleDataDeleted(String dataPath) throws Exception {
initRunning();
} else {
// 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
delayExector.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
delayExecutor.schedule(() -> initRunning(), delayTime, TimeUnit.SECONDS);
}
}

Expand All @@ -90,6 +90,7 @@ public synchronized void start() {
try {
processStart();
if (zkClient != null) {
delayExecutor = Executors.newScheduledThreadPool(1);
// 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
Expand Down Expand Up @@ -122,6 +123,10 @@ public synchronized void stop() {
if (zkClient != null) {
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.unsubscribeDataChanges(path, dataListener);
if (delayExecutor != null) {
delayExecutor.shutdown();
delayExecutor = null;
}

releaseRunning(); // 尝试一下release
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@ public void stop() throws Throwable {
}

ZkClientx.clearClients();

// 需要释放 CanalServerWithEmbedded 否则主线程退出后,进程无法自动完整退出...
if (embededCanalServer != null) {
embededCanalServer.stop();
}
}

private void initCid(String path) {
Expand Down

0 comments on commit fa0c08c

Please sign in to comment.