ConcurrentSkipListMap
本文最后更新于 1569 天前,其中的信息可能已经有所发展或是发生改变。

参考文章http://cmsblogs.com/chenssy

简介

到目前为止,我们在Java世界里看到了两种实现key-value的数据结构:Hash Map、Tree Map,这两种数据结构各自都有着优缺点。

  1. Hash表:插入、查找最快,为O(1);如使用链表实现则可实现无锁;数据有序化需要显式的排序操作。
  2. 红黑树:插入、查找为O(logn),但常数项较小;无锁实现的复杂性很高,一般需要加锁;数据天然有序

然而,这次介绍第三种实现key-value的数据结构:SkipList。SkipList有着不低于红黑树的效率,但是其原理和实现的复杂度要比红黑树简单多了。

SkipList

什么是SkipList?SkipList称之为跳表,它是一种可以替代平衡树的数据结构,其数据元素默认按照key值升序,天然有序。SkipList让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。

我们先看一个简单的链表,如下:

如果我们需要查询9、21、30,则需要比较次数为3 + 6 + 8 = 17 次,那么有没有优化方案呢?有!我们将该链表中的某些元素提炼出来作为一个比较“索引”,如下:

我们先与这些索引进行比较来决定下一个元素是往右还是下走,由于存在“索引”的缘故,导致在检索的时候会大大减少比较的次数。当然元素不是很多,很难体现出优势,当元素足够多的时候,这种索引结构就会大显身手。

SkipList的特性

SkipList具备如下特性:

  1. 由很多层结构组成,level是通过一定的概率随机产生的
  2. 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的Comparator进行排序,具体取决于使用的构造方法
  3. 最底层(Level 1)的链表包含所有元素
  4. 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现
  5. 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素

我们将上图再做一些扩展就可以变成一个典型的SkipList结构了

SkipList的查找

SkipList的查找算法较为简单,对于上面我们我们要查找元素21,其过程如下:

  1. 比较3,大于,往后找(9)
  2. 比9大,继续往后找(25),但是比25小,则从9的下一层开始找(16)
  3. 16的后面节点依然为25,则继续从16的下一层找
  4. 找到21

如图:

红色虚线代表路径。

SkipList的插入

SkipList的插入操作主要包括:

  1. 查找合适的位置。这里需要明确一点就是在确认新节点要占据的层次K时,采用丢硬币的方式,完全随机。如果占据的层次K大于链表的层次,则重新申请新的层,否则插入指定层次
  2. 申请新的节点
  3. 调整指针

假定我们要插入的元素为23,经过查找可以确认她是位于25后,9、16、21前。当然需要考虑申请的层次K。

如果层次K > 3

需要申请新层次(Level 4)

如果层次 K = 2

直接在Level 2 层插入即可

SkipList的删除

删除节点和插入节点思路基本一致:找到节点,删除节点,调整指针。

比如删除节点9,如下:

ConcurrentSkipListMap

通过上面我们知道SkipList采用空间换时间的算法,其插入和查找的效率O(logn),其效率不低于红黑树,但是其原理和实现的复杂度要比红黑树简单多了。一般来说会操作链表List,就会对SkipList毫无压力。

ConcurrentSkipListMap其内部采用SkipList数据结构实现。为了实现SkipList,ConcurrentSkipListMap提供了三个内部类来构建这样的链表结构:Node、Index、HeadIndex。其中Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,Head Index用来维护索引层次。到这里我们可以这样说ConcurrentSkipListMap是通过HeadIndex维护索引层次,通过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只需要比较很小一部分数据了。在JDK中的关系如下图:

Node

static final class Node<K,V> {

    final K key;
    volatile Object value;
    volatile Node<K,V> next;

    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }

Node的结构和一般的单链表毫无区别,key-value和一个指向下一个节点的next。

Index

static class Index<K,V> {

    final Node<K,V> node;
    final Index<K,V> down;
    volatile Index<K,V> right;

    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }

Index提供了一个基于Node节点的索引Node,一个指向下一个Index的right,一个指向下层的down节点。

HeadIndex

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

HeadIndex内部就一个level来定义层级。

ConcurrentSkipListMap提供了四个构造函数,每个构造函数都会调用initialize()方法进行初始化工作。

private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
}

注意,initialize()方法不仅仅只在构造函数中被调用,如clone,clear、readObject时都会调用该方法进行初始化步骤。

put操作

CoucurrentSkipListMap提供了put()方法用于将指定值与此映射中的指定键关联。源码如下:

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

首先判断value如果为null,则抛出NullPointerException,否则调用doPut方法,其实如果各位看过JDK的源码的话,应该对这样的操作很熟悉了,JDK源码里面很多方法都是先做一些必要性的验证后,然后通过调用doput()方法进行真正的操作。

doPut()方法内容较多,我们分步分析。

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {

首先判断key是否为null,如果为null,则抛出NullPointerException,从这里我们可以确认ConcurrentSkipList是不支持key或者value为null的。然后调用findPredecessor()方法,传入key来确认位置。findPredecessor()方法其实就是确认key要插入的位置。

doPut()方法有三个参数,除了key,value外还有一个boolean类型的onlyIfAbsent,该参数作用与如果存在当前key时,该做何动作。当onlyIfAbsent为false时,替换value,为true时,则返回该value。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        // 从head节点开始,head是level最高级别的headIndex
        for (Index<K,V> q = head, r = q.right, d;;) {
            // r != null,表示该节点右边还有节点,需要比较
            if (r != null) {
                Node<K,V> n = r.node;
                K k = n.key;
                // 如果value == null
                if (n.value == null) {
                    if (!q.unlink(r))
                        break;           // restart
                    r = q.right;         // reread r
                    continue;
                }
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            if ((d = q.down) == null)
                return q.node;
            q = d;
            r = d.right;
        }
    }
}

findPredecessor()方法意思非常明确:寻找前继。从最高层的headIndex开始向右一步一步比较,直到right为null或者右边节点的Node的key大于当前key为止,然后再向下寻找,依次重复该过程,直到down为null为止,即找到了前继,看返回的结果注意是Node,不是Index,所以插入的位置应该是最底层的Node链表。

在这个过程中ConcurrentSkipListMap赋予了该方法一个其他的功能,就是通过判断节点的value是否为null,如果为null,表示该节点已经被删除了,通过调用unlink()方法删除该Index节点。

final boolean unlink(Index<K,V> succ) {
    return node.value != null && casRight(succ, succ.right);
}

删除节点过程非常简单,更改下right指针即可。

通过findPredecessor()找到前继节点后,做什么呢?看下面:

for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
    if (n != null) {
        Object v; int c;
        Node<K,V> f = n.next;
        // b的前继被修改了,需要重新操作
        if (n != b.next)               // inconsistent read
            break;
        // n.value == null,说明n被删除了
        if ((v = n.value) == null) {   // n is deleted
            n.helpDelete(b, f);
            break;
        }
        // b.value == null,说明b被删除了
        if (b.value == null || v == n) // b is deleted
            break;
        //如果结果大于0,说明key比前继n的key要大,需要前移
        if ((c = cpr(cmp, key, n.key)) > 0) {
            b = n;
            n = f;
            continue;
        }
        // 如果结果等于0,说明找到了相同key节点,根据onlyIfAbsent来进行相应的操作
        // onlyIfAbsent == false,则通过casValue,替换value
        // onlyIfAbsent == true,返回该value
        if (c == 0) {
            if (onlyIfAbsent || n.casValue(v, value)) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            break; // restart if lost race to replace value
        }
        // else c < 0; fall through
    }
    // 生成新的key和value的节点插入跳表的Node层
    z = new Node<K,V>(key, value, n);
    if (!b.casNext(n, z))
        break;         // restart if lost race to append to b
    break outer;
}

找到合适的位置后,就是在该位置插入节点咯。插入节点的过程比较简单,就是将key-value包装成一个Node,然后通过casNext()方法加入到链表当中。当然是插入之前需要进行一系列的校验工作。

在最下层插入节点后,下一步工作是什么?新建索引。前面博主提过,在插入节点的时候,会根据采用抛硬币的方式来决定新节点所插入的层次,由于存在并发的可能,ConcurrentSkipListMap采用ThreadLocalRandom来生成随机数。

int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
    int level = 1, max;
    while (((rnd >>>= 1) & 1) != 0)
        ++level;

nextSecondarySeed方法会产生一个int正数范围的一个随机数,然后通过与rnd的最高位和最低位进行与操作来决定是否生成索引节点,因为ConcurrentSkipListMap中有的节点是没有索引节点的。如果需要生成索引节点,对rnd的每位进行判断,如果为1就加一层。

在阐述SkipList插入节点的时候说明了,决定的层次level会分为两种情况进行处理,一是如果层次level大于最大的层次话则需要新增一层,否则就在相应层次以及小于该level的层次进行节点新增处理。

level <= headIndex.level

// 如果层数小于headIndex.level,就只生成对应key的Index节点就可以了
// 节点是从下往上生成的,这样就不需要对每个Index的down进行额外操作了
if (level <= (max = h.level)) {
    for (int i = 1; i <= level; ++i)
        idx = new Index<K,V>(z, idx, null);
}

从底层开始生成Index,并在生成的过程中将上层的down设置完成,这样就不需要对每个Index的down进行额外操作了,这部操作只是纵向连接,横向连接会下一步进行。

level > headIndex.level

else { // try to grow by one level
    // 将层数增加一层
    level = max + 1; // hold in array and later pick the one to use
    // 初始化 level+1个节点,0节点不使用
    @SuppressWarnings("unchecked")Index<K,V>[] idxs =
        (Index<K,V>[])new Index<?,?>[level+1];
    // 将刚刚初始化的节点进行连接
    for (int i = 1; i <= level; ++i)
        idxs[i] = idx = new Index<K,V>(z, idx, null);
    for (;;) {
        h = head;
        int oldLevel = h.level;
        // 如果level小于原来的level,说明有其他的线程修改了level,需要重新开始
        if (level <= oldLevel) // lost race to add level
            break;
        HeadIndex<K,V> newh = h;
        Node<K,V> oldbase = h.node;
        // 这是在新的一层生成新的HeadIndex,并将head设为新的HeadIndex
        for (int j = oldLevel+1; j <= level; ++j)
            newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
        if (casHead(h, newh)) {
            h = newh;
            idx = idxs[level = oldLevel];
            break;
        }
    }
}

如果随机的层数比当前层数大,并不是将层数设置为随机的层数。而是在原来的基础上再增加一层,其实对key的操作与上面的情况相同,唯一的区别是初始化一个数组,通过这个数组就可以与接下来生成的HeadIndex连接,这样新增加的一层就不需要处理了。这样在下面就不用区分进行横向连接了。

因为生成新的HeadIndex后,原来的head就不是指向最上层的HeadIndex,需要将head设为新的HeadIndex。

下面就需要进行Index的纵向连接了,代码如下:

// insertionLevel 等于当前有节点需要连接的层数
splice: for (int insertionLevel = level;;) {
    int j = h.level;
    // 从headIndex开始向右连接
    for (Index<K,V> q = h, r = q.right, t = idx;;) {
        // 如果q或t== null,说明节点被删除
        if (q == null || t == null)
            break splice;
        // 如果r != null说明右边有节点,需要进行比较,判断是否需要连接
        if (r != null) {
            Node<K,V> n = r.node;
            // compare before deletion check avoids needing recheck
            int c = cpr(cmp, key, n.key);
            if (n.value == null) {
                if (!q.unlink(r))
                    break;
                r = q.right;
                continue;
            }
            // 如果 c > 0,说明需要向右移动
            if (c > 0) {
                q = r;
                r = r.right;
                continue;
            }
        }
        // 如果j == insertionLevel,说明当前j层需要连接节点
        if (j == insertionLevel) {
            if (!q.link(r, t))
                break; // restart
            // 如果t.node.value == null,说明刚刚插入的key又被删除了,就不用继续连接了
            if (t.node.value == null) {
                findNode(key);
                break splice;
            }
            // 插入成功后insertionLevel减1,之后如果等于0,说明所有节点都被连接。
            if (--insertionLevel == 0)
                break splice;
        }
        // 此处为当前层节点已经全部遍历,需要j减1,如果之后j >= insertionLevel && j < level,说明当前的t已经连接成功,需要下移
        if (--j >= insertionLevel && j < level)
            t = t.down;
        q = q.down;
        r = q.right;
    }
}

这段代码分为两部分看,一部分是找到相应层次的该节点插入的位置,第二部分在该位置插入,然后下移。

至此,ConcurrentSkipListMap的put操作到此就结束了。代码量有点儿多,这里总结下:

  1. 首先通过findPredecessor()方法找到前继节点Node。
  2. 根据返回的前继节点以及key-value,新建Node节点,同时通过CAS设置next。
  3. 设置节点Node,再设置索引节点。采取抛硬币方式决定层次,如果所决定的层次大于现存的最大层次,则新增一层,然后新建一个Item链表。
  4. 最后,将新建的Item链表插入到SkipList结构中。

get操作

相比于put操作 ,get操作会简单很多,其过程其实就只相当于put操作的第一步:

private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                // inconsistent read
                break;
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  // b is deleted
                break;
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

与put操作第一步相似,首先调用findPredecessor()方法找到前继节点,然后顺着right一直往右找即可,同时在这个过程中同样承担了一个删除value为null的节点的职责。

remove操作

remove操作为删除指定key节点,如下:

public V remove(Object key) {
    return doRemove(key, null);
}

直接调用doRemove()方法,这里remove有两个参数,一个是key,另外一个是value,所以doRemove方法即提供remove key,也提供同时满足key-value。

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            // 如果n != b.next说明有其他线程修改b的后继,需要重新开始
            if (n != b.next)                    // inconsistent read
                break;
            // 如果n.value == null 说明n已经被删除
            if ((v = n.value) == null) {        // n is deleted
                n.helpDelete(b, f);
                break;
            }
            // 如果b.value == null 说明b已经被删除
            if (b.value == null || v == n)      // b is deleted
                break;
            // 如果b的后继都比key小,说明这个key在map里不存在
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }
            //如果value不为空,需要key和value同时满足才进行删除
            if (value != null && !value.equals(v))
                break outer;
            if (!n.casValue(v, null))
                break;
            if (!n.appendMarker(f) || !b.casNext(n, f))
                findNode(key);                  // retry via findNode
            else {
                findPredecessor(key, cmp);      // clean index
                if (head.right == null)
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    return null;
}

调用findPredecessor()方法找到前辈节点,然后通过右移,然后比较,找到后利用CAS把value替换为null,然后判断该节点是不是这层唯一的index,如果是的话,调用tryReduceLevel()方法把这层干掉,完成删除。findPredecessor()方法是查找到key插入位置的前继。

在这个过程中ConcurrentSkipListMap赋予了该方法一个其他的功能,就是通过判断节点的value是否为null,如果为null,表示该节点已经被删除了,通过调用unlink()方法删除该Index节点。

final boolean unlink(Index<K,V> succ) {
    return node.value != null && casRight(succ, succ.right);
}

调用findNode方法用来删除对应key的Node节点。

private Node<K,V> findNode(Object key) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                // inconsistent read
                break;
            // 这里判断,如果n的value == null 就说明n被删除了
            // 这里的help方法就是删除Node节点的方法
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  // b is deleted
                break;
            if ((c = cpr(cmp, key, n.key)) == 0)
                return n;
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

其实调用findNode方法是为了验证并调用helpDelete来真正删除元素。

void helpDelete(Node<K,V> b, Node<K,V> f) {
    if (f == next && this == b.next) {
        if (f == null || f.value != f) // not already marked
            casNext(f, new Node<K,V>(f));
        else
            b.casNext(this, f.next);
    }
}

这个方法在get和put方法中都有调用,因为在删除的过程中,会有其他线程遍历到删除的节点,这样他就会调用helpDelete来帮助删除,首先判断n.next == f并且b.next == n这个是删除节点的前继和后继是否发生了变化,如果没有变化就会进行f == null || f.value != f,这个是用来判断删除Node节点后面是否添加了空白节点。如果已经添加了,就可以进行节点的删除了。

为什么要其他线程来帮助删除呢,因为ConcurrentSkipListMap删除节点分为四步:

  1. 将删除的key对应的Node节点value设置为null。
  2. 将删除的Node节点和后继节点之间插入一个空白节点。
  3. 将删除的Node节点的前继节点next设置为后继节点。
  4. 调用findPredecessor来删除key对应的Index节点。

在这四步中,每一步都可能会有其他的线程遍历或修改删除的Node节点,从而导致失败,所以在进行其他操作之前,如果发现Node节点的value为null,就需要当前线程调用helpDelete方法来帮助其他线程删除节点。这样防止因为删除节点,影响其他线程。

为什么要在删除节点后面加一个空白节点呢:

  1. 在删除节点后面加入一个空白节点,这样,就不会修改删除节点的next指针,而是修改空白节点的next指针,这样就会保证删除过程中,删除节点的后继不会发生变化。
  2. 在删除节点后面加入一个空白节点,并发情况下,其他线程都会帮助删除节点,这样如果添加成功空白节点,就会使所有当前在helpDelete方法的线程CAS全部失败,从而节点成功删除。

因为删除操作只删除了Node节点,还需要调用findPredecessor方法来删除Index节点。

size操作

ConcurrentSkipListMap的size()操作和ConcurrentHashMap不同,它并没有维护一个全局变量来统计元素的个数,所以每次调用该方法的时候都需要去遍历。

public int size() {
    long count = 0;
    for (Node<K,V> n = findFirst(); n != null; n = n.next) {
        if (n.getValidValue() != null)
            ++count;
    }
    return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}

调用findFirst()方法找到第一个Node,然后利用node的next去统计。最后返回统计数据,最多能返回Integer.MAX_VALUE。注意这里在线程并发下是安全的。

原创声明
本文由 makese 于2018年09月26日发表在 我的博客
如未特殊声明,本站所有文章均为原创;你可以在保留作者及原文地址的情况下转载
转载请注明:ConcurrentSkipListMap | 我的博客
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
下一篇