博客 使用 Apache Curator 实现 集群 Primary 选举
Post
Cancel

使用 Apache Curator 实现 集群 Primary 选举

前言

最近公司的项目里在实现高可用时HA时,部署了多个节点,但这部署的节点不能同时工作,集群里只有一个节点作为主节点进行作业,别的节点只能是作为从节点,对主节点的消息数据进行实时的复制,当主节点出现宕机时,将会多个从节点会选举出一个从节点作为主节点,其他从节点开始从新的主节点开始同步数据。

大概的构架图:

出现Fail Over 后如下图:

Node1 节点宕机后,Node2 被选举成为 Primary 基其他节点对 Node2 节点进行数据两步。

从功能的需求上能想到,要实现这个功能需要解决以下两个问题

  • Primary 选举机制,主节点宕机后,从节点有一个被选 出来当主节点。
  • 当主节点进行变更后,其他节点需要被通知,主节点变更事件,从而更新数据同步策略。

实现

将使用 Apache Curator 的两个组件来实现,通过 LeaderSelector 来实现主节点选举,这个组件只提供选举功能,并不支持主节点变更事件,所以如果只用这个组件,其中一个节点成为了主节点,从节点毫无感知,所以这个时候就用到它的另一个组件NodeCache , 这个组件从名字了解,它是一个缓存用的,其实里面最实用的功能是监听,就是它能监听节点的变化,提供回调事件, 具体实现如下:

  1. 实现 Primary 节点选举
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class MyLeaderSelectorClient {

    private static final Logger logger = LoggerFactory.getLogger(MyLeaderSelectorClient.class);
    private String hostId;
    public static final String leaderInfoPath = "/test/leader";
    private String leaderSelectorPath = "/test/selector";
    private CuratorFramework client;
    private LeaderSelector leaderSelector;

    public MyLeaderSelectorClient(String hostId) {
        this.hostId = hostId;
    }

    public void start() throws Exception {

        client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryForever(100));
        client.start();

        logger.info("leader selector start");
        leaderSelector = new LeaderSelector(client, leaderSelectorPath, new LeaderSelectorListenerAdapter() {
            @Override
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                logger.info("Host[{}] is taking leader ship ", hostId);
                
                //更新当前节点信息到ZK
                updatePrimaryInfoToZK();
                
                logger.info("Host[{}] is doing something...", hostId);
                TimeUnit.DAYS.sleep(1);
                
                logger.info("Host[{}] end taking leader ship", hostId);
            }
        });

        leaderSelector.setId(hostId);
        leaderSelector.start();
        leaderSelector.autoRequeue();
        logger.info("leader selector started and auto requeue");
        
        //启动 Primary 节点监听
        PrimaryListener primaryListener = new PrimaryListener(client, hostId);
        primaryListener.start();
        logger.info("Primary listener started");
    }

    private void updatePrimaryInfoToZK() throws Exception {
        client.create()
                .orSetData()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(leaderInfoPath, hostId.getBytes());
    }

    public void close(){
        leaderSelector.close();
        client.close();
    }
}

  1. 实现 Primary 节点变化监听

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
    public class PrimaryListener {
        private static final Logger LOGGER = LoggerFactory.getLogger(PrimaryListener.class);
        private NodeCache nodeCache;
        private String hostId;
        private CuratorFramework client;
       
        public PrimaryListener(CuratorFramework client, String hostId) {
            this.client = client;
            this.hostId = hostId;
        }
       
        public void start() throws Exception {
            nodeCache = new NodeCache(client, leaderInfoPath);
            nodeCache.getListenable().addListener(this::primaryChanged);
            nodeCache.start();
        }
       
        private void primaryChanged() {
            LOGGER.info("found primary changed!");
            ChildData currentData = nodeCache.getCurrentData();
            if (currentData != null ) {
                String newPrimary = new String(currentData.getData());
                if(!hostId.equals(newPrimary)){
                    LOGGER.info("Host[{}] will follow the new primary[{}]", hostId, newPrimary);
                }
            }
        }
    }
       
    

效果如下:

1
2
3
4
5
6
7
21:21:18.685 [main] INFO com.powerjun.curator.MyLeaderSelectorClient - leader selector started and auto requeue
21:21:18.726 [main] INFO com.powerjun.curator.MyLeaderSelectorClient - Primary listener started 
21:21:18.762 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - found primary changed!
21:21:18.762 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - Host[2] will follow the new primary[1]
21:21:38.891 [Curator-LeaderSelector-0] INFO com.powerjun.curator.MyLeaderSelectorClient - Host[2] is taking leader ship 
21:21:38.939 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - found primary changed!

这个节点2 的日志,从一开始Follow 节点1,当主节点节点1宕机后,自己则成为主节点

1
2
3
4
5
6
 leader selector started and auto requeue
21:21:27.233 [main] INFO com.powerjun.curator.MyLeaderSelectorClient - Primary listener started
21:21:27.273 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - found primary changed!
21:21:27.273 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - Host[3] will follow the new primary[1]
21:21:38.945 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - found primary changed!
21:21:38.945 [main-EventThread] INFO com.powerjun.curator.PrimaryListener - Host[3] will follow the new primary[2]

这是节点3 的日志,它能监听到主节点从节点1 变成节点2

具体实现代码

https://github.com/helloworld28/leader-selector-demo

This post is licensed under CC BY 4.0 by the author.

Spring Cloud 脚手架项目

Spring scheduler 定时任务避坑使用指南及原理剖析

Comments powered by Disqus.