你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

负载均衡——一致性Hash算法

2021/12/19 0:21:55

负载均衡——一致性Hash算法

讲一致性Hash之前要先将Hash,对于Hash来说,只需要O(1)的时间复杂度,然后就能计算出结果,像我们的HashMap中,在计算key的下标时,用到hash算法,不过那个是计算了hashcode后然后将hash值进行散列,其实也都差不多,对于同一个key,每次计算的结果都是一样的,这个就是一致性,但是对于一致性Hash,比Hash又多了些什么东西呢

普通Hash:假设我们有3台服务器,我们可以像HashMap中添加元素那样,利用key的Hash来计算出下标,因为同一个key每次计算出的hash是一致的,所以我们的同一个请求可以路由到同一台服务器上,这样能有效利用缓存等机制,减少重复请求带来的开销之类的,简单易用,但是有缺陷,当我们需要增加机器或者减少机器,那么会出现什么结果?
首先增加机器,我们的hash与下标直接的结果导致了服务器和请求的不匹配,被路由到另一台机器上了,这样不就乱套了?
于是提出了一致性Hash

一致性Hash:主要的特点就是Hash环,我们的请求可以构建成一个Hash环,按照顺时针记录hash和请求
当我们的服务挂了A时,我们只需要将A的请求交给A后面的B处理
当我们需要增加服务器C时,我们只需要在Hash环上划一块范围,然后交给C
这样就可以实现动态的扩容和缩容。
一致性哈希用于解决分布式缓存系统中的节点选择和在增删服务器后,节点减少带来的数据缓存的消失与重新分配问题


public class ConsistentHash {

    //哈希环
    private static TreeMap<Integer,String> virtual=new TreeMap<>();
    //初始化hash环
    static {
        for (String ip: IPs.LIST){

            for (int i=0;i<VIRTUAL_NODES;i++){
                //对ip生成hash
                int hash=getHash(ip);
                virtual.put(hash,ip);
            }
        }
    }

	//随意,可加可不加
    private static int getHash(String str){
        final int p=16777619;
        int hash=(int) 2166136261L;
        for (int i=0;i<str.length();i++){
            hash=(hash^str.charAt(i))*p;
        }
        hash+=hash<<13;
        hash^=hash>>7;
        hash+=hash<<3;
        hash^=hash>>17;
        hash+=hash<<5;
        if (hash<0){
            hash=Math.abs(hash);
        }
        return hash;
    }

    public static String getServer(String client){
		//拿到Hash
        int hash=getHash(client);
       
        //寻找大于这个hash,virtual的字数的firstKey
        SortedMap<Integer, String> subMap = virtual.tailMap(hash);
        Integer firstKey = null;
        if (subMap==null){
        	//
            firstKey =  virtual.firstKey();
        }else {
            firstKey=subMap.firstKey();
        }
        return virtual.get(firstKey);
    }

    public static void main(String[] args) {
        System.out.println(getServer("client"));
    }
}

但是有没有考虑过一个问题,就是我们的服务器结点的倾斜问题,即,服务器管理的Hash范围不均匀,导致大量请求只砸中到一个服务器上,但是其他几个服务器的压力却很小,就想这样(图片自己画的,丑爆了)
关于结点倾斜问题

既然出现了上面的问题,我们可以怎么解决呢?

带虚拟结点的一致性Hash:增加N个虚拟结点,将真实节点计算多个哈希形成多个虚拟节点并放置到哈希环上,定位算法不变,只是多了一步虚拟节点到真实节点映射的过程,然后当服务器挂了,其他服务器可以分享他的虚拟结点

加了虚拟结点的Hash环
就像这样

这里就还得提一下redis中的Hash槽了,Hash槽是在redis cluster中,用于数据分片的,用来进行数据的存储和读取,默认是16384个槽,用于计算槽区的算法是CRC16(key) mod 16384,每个服务器结点都会被分配指定个数的Hash槽,当我们的节点挂掉后,会由其他的服务器结点来承担数据,导致重新进行分配槽区,注意,Hash槽并不像一致性Hash一样是一个闭环,他是一个槽,定位服务器结点的规则也不一样,我们的一致性Hash是需要计算后然后顺时针进行寻找,而槽的话只需要计算出槽区就行了

最后让我们再来看看Dubbo怎么实现一致性Hash算法的

//Dubbo中的一致性Hash负载均衡实现
public class ConsistentHashLoadBalance extends AbstractLoadBalance {

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {

        //Hash环
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
        //虚拟结点树
        private final int replicaNumber;
        //HashCode
        private final int identityHashCode;
        //参数索引数组
        private final int[] argumentIndex;

        public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            //使用treeMap来保存我们的节点
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();

            this.identityHashCode = System.identityHashCode(invokers);
            //获取到我们的Url
            //dubbo://127.0.0.1:20880/service.DemoService?anyhost=true&application=srcAnalysisClient&check=false&dubbo=2.8.4&generic=false&interface=service.DemoService&loadbalance=consistenthash&methods=sayHello,retMap&pid=14648&sayHello.timeout=20000&side=consumer&timestamp=1493522325563
            URL url = invokers.get(0).getUrl();
            //默认160个虚拟结点
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            //遍历我们的Invoker,并且加入到我们的环中
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        //计算出Hash
                        long m = hash(digest, h);
                        //放入到我们的Hash环中
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public int getIdentityHashCode() {
            return identityHashCode;
        }

        //选择节点
        public Invoker<T> select(Invocation invocation) {
            //根据参数来生成Key
            String key = toKey(invocation.getArguments());
            //MD5后hash
            byte[] digest = md5(key);
            //选择节点
            Invoker<T> invoker = sekectForKey(hash(digest, 0));
            return invoker;
        }

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }

        private Invoker<T> sekectForKey(long hash) {
            Invoker<T> invoker;
            Long key = hash;
            //如果没有这个Key
            if (!virtualInvokers.containsKey(key)) {
                //没有的话,就找到比这个key要大的最近的key节点
                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
                //map为空,我们需要直接拿第一个节点
                if (tailMap.isEmpty()) {
                    key = virtualInvokers.firstKey();
                } else {
                    //
                    key = tailMap.firstKey();
                }
            }
            //直接获取节点,然后返回
            invoker = virtualInvokers.get(key);
            return invoker;
        }

        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[0 + number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }

        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes = null;
            try {
                bytes = value.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.update(bytes);
            return md5.digest();
        }

    }

}