前言
在分布式系统中经常会遇到某个业务仅需要单个节点执行的场景,通常这样做是为了解决并发引起的状态不一致问题。但是为了防止出现单点故障,又需要为这些节点做故障转移的实现。简单的方案是同时起多个节点,但是只有一个节点作为主节点执行业务,其他的作为备份节点需要实时跟踪主节点运行状态,一旦发现主节点挂掉就将自己转变为主节点进行业务处理,这也就是所谓的“多节点抢注(主)”。
实现
实现一个简单的多节点抢注功能并不复杂,只需要借助一些中间件进行状态维护就可以做到,这里使用大家常用的redis作为实现方案。这个最小实现方案中仅涉及三个参与角色,分别是:
WorkNode — 工作节点
Redis — Redis缓存
CallbackHandler — 节点注册成功回调(代表业务处理)
具体实现代码如下(仅供参考,相关细节需要自己根据业务进一步实现)
WorkNode
package me.kongdl.ezwok.work; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * @author: kongdl * @date: 2022/6/7 14:55 * @description: 分布式节点抢占注册模型 **/ public class WorkNode extends Thread { // 业务代码 private String code; // 节点ID private String nodeId; // redis private Redis redis; // 主节点标志 private AtomicBoolean master; // 注册成功回调接口 private CallbackHandler callbackHandler; public WorkNode(String code, String nodeId, CallbackHandler callbackHandler) { this.code = code; this.nodeId = nodeId; this.redis = new Redis(); this.master = new AtomicBoolean(false); this.callbackHandler = callbackHandler; } @Override public void run() { // master:event-handler => <NODE_ID> String key = "master:" + code; while (!registerAsMaster(key)) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); return; } } redis.expire(key, 5, TimeUnit.SECONDS); while (true) { // 节点续期 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); break; } redis.expire(key, 5, TimeUnit.SECONDS); } } /** * 尝试注册为主节点 * @param key * @return true-成功,false-失败 */ private boolean registerAsMaster(String key) { boolean result = redis.setnx(key, nodeId); if (result) { master.set(true); // callback in async mode new Thread(() -> callbackHandler.handle(this)).start(); } return result; } /** * 当前节点是否为主节点 * @return */ public boolean isMaster() { return master.get(); } /** * 业务代码 * @return */ public String getCode() { return code; } /** * 节点ID * @return */ public String getNodeId() { return nodeId; } }
View Code
Redis
package me.kongdl.ezwok.work; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import redis.clients.jedis.JedisPoolConfig; import java.util.concurrent.TimeUnit; /** * @author: kongdl * @date: 2022/6/7 14:58 * @description: Redis **/ public class Redis { private final StringRedisTemplate redisTemplate; public Redis() { this.redisTemplate = initRedisTemplate(); } private StringRedisTemplate initRedisTemplate() { RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration(); standaloneConfig.setHostName("localhost"); standaloneConfig.setPort(6379); standaloneConfig.setPassword("Redis#321"); standaloneConfig.setDatabase(0); JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMinIdle(2); poolConfig.setMaxIdle(10); poolConfig.setMaxTotal(100); JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).build(); JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfig, clientConfiguration); connectionFactory.afterPropertiesSet(); StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); redisTemplate.afterPropertiesSet(); return redisTemplate; } public boolean setnx(String key, String value) { return redisTemplate.opsForValue().setIfAbsent(key, value); } public boolean expire(String key, long timeout, TimeUnit unit) { return redisTemplate.expire(key, timeout, unit); } }
View Code
CallbackHandler
package me.kongdl.ezwok.work; /** * @author: kongdl * @date: 2022/6/7 15:58 * @description: 回调处理接口 **/ public interface CallbackHandler { /** * 回调处理 */ void handle(WorkNode node); }
View Code
测试
/** * 模拟多节点下的运行状况 **/ public class Demo { public static void main(String[] args) { // 业务代码,用以区分不同的业务 String code = "event-handler"; // 节点注册成功后的回调处理 final CallbackHandler callbackHandler = node -> { // 执行业务操作 System.out.println(node.getCode() + "<" + node.getNodeId() + "> registered as master ok!"); Random random = new Random(System.currentTimeMillis()); try { // 模拟线程随机挂掉 TimeUnit.SECONDS.sleep(random.nextInt(10)); node.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } }; // 启动多个节点 for (int i = 1; i <= 10; i++) { String nodeId = "node-" + i; new WorkNode(code, nodeId, callbackHandler).start(); } } }
结论
测试代码执行后会发现某个节点注册成功,运行一段时间(几秒)后挂掉,后续备份节点会自动注册成为主节点并接替执行业务,从而证明了该模型具备了基本的节点抢注和故障转移功能。当然,实际生产环境具有更复杂的场景和业务需求,但是可以认为都是在这个基础上进行了相关扩展。