实现简单的多节点抢注(主)功能

前言

  在分布式系统中经常会遇到某个业务仅需要单个节点执行的场景,通常这样做是为了解决并发引起的状态不一致问题。但是为了防止出现单点故障,又需要为这些节点做故障转移的实现。简单的方案是同时起多个节点,但是只有一个节点作为主节点执行业务,其他的作为备份节点需要实时跟踪主节点运行状态,一旦发现主节点挂掉就将自己转变为主节点进行业务处理,这也就是所谓的“多节点抢注(主)”。

 

实现

  实现一个简单的多节点抢注功能并不复杂,只需要借助一些中间件进行状态维护就可以做到,这里使用大家常用的redis作为实现方案。这个最小实现方案中仅涉及三个参与角色,分别是:

  WorkNode — 工作节点

  Redis — Redis缓存

  CallbackHandler — 节点注册成功回调(代表业务处理)

具体实现代码如下(仅供参考,相关细节需要自己根据业务进一步实现)

WorkNode

package me.kongdl.ezwok.work;  import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;  /**  * @author: kongdl  * @date: 2022/6/7 14:55  * @description: 分布式节点抢占注册模型  **/ public class WorkNode extends Thread {     // 业务代码 private String code;     // 节点ID private String nodeId;     // redis private Redis redis;     // 主节点标志 private AtomicBoolean master;     // 注册成功回调接口 private CallbackHandler callbackHandler;      public WorkNode(String code, String nodeId, CallbackHandler callbackHandler) {         this.code = code;         this.nodeId = nodeId;         this.redis = new Redis();         this.master = new AtomicBoolean(false);         this.callbackHandler = callbackHandler;     }      @Override     public void run() {         // master:event-handler => <NODE_ID>         String key = "master:" + code;         while (!registerAsMaster(key)) {             try {                 Thread.sleep(1000);             } catch (InterruptedException e) {                 e.printStackTrace();                 return;             }         }         redis.expire(key, 5, TimeUnit.SECONDS);         while (true) { // 节点续期 try {                 Thread.sleep(3000);             } catch (InterruptedException e) {                 e.printStackTrace();                 break;             }             redis.expire(key, 5, TimeUnit.SECONDS);         }     }      /**      * 尝试注册为主节点      * @param key      * @return true-成功,false-失败      */ private boolean registerAsMaster(String key) {         boolean result = redis.setnx(key, nodeId);         if (result) {             master.set(true);             // callback in async mode new Thread(() -> callbackHandler.handle(this)).start();         }         return result;     }      /**      * 当前节点是否为主节点      * @return */ public boolean isMaster() {         return master.get();     }      /**      * 业务代码      * @return */ public String getCode() {         return code;     }      /**      * 节点ID      * @return */ public String getNodeId() {         return nodeId;     }  }

View Code

 

Redis

package me.kongdl.ezwok.work;  import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import redis.clients.jedis.JedisPoolConfig;  import java.util.concurrent.TimeUnit;  /**  * @author: kongdl  * @date: 2022/6/7 14:58  * @description: Redis  **/ public class Redis {     private final StringRedisTemplate redisTemplate;      public Redis() {         this.redisTemplate = initRedisTemplate();     }      private StringRedisTemplate initRedisTemplate() {         RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();         standaloneConfig.setHostName("localhost");         standaloneConfig.setPort(6379);         standaloneConfig.setPassword("Redis#321");         standaloneConfig.setDatabase(0);         JedisPoolConfig poolConfig = new JedisPoolConfig();         poolConfig.setMinIdle(2);         poolConfig.setMaxIdle(10);         poolConfig.setMaxTotal(100);         JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).build();         JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfig, clientConfiguration);         connectionFactory.afterPropertiesSet();         StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);         redisTemplate.afterPropertiesSet();         return redisTemplate;     }      public boolean setnx(String key, String value) {         return redisTemplate.opsForValue().setIfAbsent(key, value);     }      public boolean expire(String key, long timeout, TimeUnit unit) {         return redisTemplate.expire(key, timeout, unit);     } }

View Code

 

CallbackHandler

package me.kongdl.ezwok.work;  /**  * @author: kongdl  * @date: 2022/6/7 15:58  * @description: 回调处理接口  **/ public interface CallbackHandler {      /**      * 回调处理      */ void handle(WorkNode node); }

View Code

 

测试

/** * 模拟多节点下的运行状况 **/ public class Demo {      public static void main(String[] args) {         // 业务代码,用以区分不同的业务         String code = "event-handler";         // 节点注册成功后的回调处理 final CallbackHandler callbackHandler = node -> {             // 执行业务操作             System.out.println(node.getCode() + "<" + node.getNodeId() + "> registered as master ok!");             Random random = new Random(System.currentTimeMillis());             try { // 模拟线程随机挂掉                 TimeUnit.SECONDS.sleep(random.nextInt(10));                 node.interrupt();             } catch (InterruptedException e) {                 e.printStackTrace();             }         };         // 启动多个节点 for (int i = 1; i <= 10; i++) {             String nodeId = "node-" + i;             new WorkNode(code, nodeId, callbackHandler).start();         }     } }

 

结论

  测试代码执行后会发现某个节点注册成功,运行一段时间(几秒)后挂掉,后续备份节点会自动注册成为主节点并接替执行业务,从而证明了该模型具备了基本的节点抢注和故障转移功能。当然,实际生产环境具有更复杂的场景和业务需求,但是可以认为都是在这个基础上进行了相关扩展。

 

商匡云商
Logo
对比商品
  • 合计 (0)
对比
0
购物车