搜尋
首頁資料庫mysql教程非阻塞同步算法实战(一)

感谢trytocache投递本文。 前言 本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。 背景介绍 上个月,我被安排独自负责一个聊天系统的服务端,因为一些

感谢trytocache投递本文。

前言

本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。

背景介绍

上个月,我被安排独自负责一个聊天系统的服务端,因为一些原因,我没使用现成的开源框架,网络那块直接使用AIO,收数据时,因为只会从channel里过来,所以不需要考虑同步问题;但是发送数据时,因为有聊天消息的转发,所以必需处理这个同步问题。AIO中,是处理完一个注册的操作后,再执行我们定义的方法,此时,如果还有数据需要写,则继续注册写操作,如果没有则不注册;提交数据时,如果当前没有注册写操作,则注册一个,否则仅提交(此时再注册则会报异常)。这样,需要同步的点就是:如何知道当前还有没有数据需要发送(因为其它线程也可以提交数据到此处),和如何知道此次提交时,有没有注册写操作。总之,要完成:有数据要发送时,必需有写操作被注册,并且只能注册一次;没有数据时,不能有写操作被注册。

问题分析

经过分析,上面的问题,可以抽象成:我需要知道当往队列里插入一条数据之前,该队列是否为空,如果为空则应该注册新的写操作。当从队列里取出一条数据后,该队列是否为非空,如果非空则应该继续注册写操作。(本文之后以“关注的操作”来表示这种场景下的插入或取出操作)

目前的问题是,我使用的队列是ConcurrentLinkedQueue,但是它的取出数据的方法,没有返回值告诉我们从队列里取出数据之后队列是否为空,如果是使用size或peek方法来执行判断,那就必需加锁了,否则在拿到队列大小时,可能队列大小已经变化了。所以我首先想到的是,如何对该队列进行改造,让它提供该信息。

注意:这里指的不是当次而是之后,所以如果我们使用队列的peek()方法返回null,就知道队列是否为空,但是不知道之后是否为空 ,并且,当关注的操作发生时,在插入或取出操作的返回值里告知此信息,来指导是否继续注册写操作。

用锁实现

如果使用锁的话很容易处理,用锁同步插入和取出方法,下面是锁实现的参考:

    public E poll() {
        synchronized (this) {
            E re = q.poll();
            // 获取元素后,队列是空,表示是我关注的操作
            if (q.peek() == null) {
            }
            return re;
        }
    }
    public void offer(E e) {
        synchronized (this) {
            // 插入元素前,队列是空,表示是我关注的操作
            if (q.peek() == null) {
            }
            q.offer(e);
        }
    }

但因为是服务端,我想用非阻塞同步算法来实现。

尝试方案一

我第一次想到的改造办法是,将head占位节点改成固定的,头节点移动时,只将head节点的next指向新的节点,在插入数据时,如果是在head节点上成功执行的该操作,那么该插入就是关注的的操作;在取出时,如果将head节点的next置为了null,那么该取出就是关注的操作(?因为之前的占位节点是变化的,所以没法比较,必需用同步,现在是固定的了,所以可以直接与head节点进行比较?)。如此一来,问题好像被解决了。改造完之后,出于严谨,我仔细读了一遍代码,发现引入了新的问题,我的取出操作是这样写的

/**
 * @author trytocatch@163.com
 */
public E poll(){
    for(;;){
        Node n = head.nextRef.get();//head指向固定的head节点,为final
        if(n == null)
            return null;
        Node m = n.nextRef.get();
        if(head.next.compareAndSet(n,m){
            if(m==null)
                ;//此时为关注的操作(为了简化代码显示,不将该信息当作返回值返回了,仅注释)
            return n.itemRef.get();
        }
    }
}

这里有一个致命的问题:如果m为null,在CAS期间,插入了新节点,n的next由null变成了非null,紧接着又把head的next更新为了null,那么链就断了,该方法还存在一些其它的问题,如当队列为空的时候,尾节点指向了错误的位置,本应该是head的。我认为最根本的原因在于,head不能设为固定的,否则会引发一堆问题。第一次尝试宣告失败。

尝试方案二

这次我尝试将head跟tail两个引用包装成一个对象,然后对这个对象进行CAS更新(这仅为一处理论上的尝试,因为从性能上面来讲,已经大打折扣了,创建了很多额外的对象),如果head跟tail指向了同一个节点,则认为队列是空的,根据此信息来判断一个操作是不是关注的操作。但该尝试仅停留在了理论分析阶段,期间发现了一些小问题,没法解决,后来我发现,我把ConcurrentLinkedQueue原本可以分成两步执行的插入和取出操作(更新节点的next或item引用,然后再更新head或tail引用),变成了必需一步完成,ConcurrentLinkedQueue尚不能一步完成,我何德何能,可将它们一步完成?所以直接放弃了。

解决方案一

经过两次的失败尝试,我几乎绝望了,我怀疑这是不是不能判断出是否为关注的操作。

因为是在做项目,周末已经过去了,不能再搞这些“研究”了,所以我退而求其次,想了个不太漂亮的办法,在队列外部维护一个变量,记录队列有多大,在插入或取出后,更新该变量,使用的是AtomicInteger,如果更新时,将变量从1变成0,或从0变成了1,就认为该插入或取出为关注的操作。

    private AtomicInteger size = new AtomicInteger(0);
    public E poll() {
        E re = q.poll();
        if (re == null)
            return null;
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old-1)){
                // 获取元素后,队列是空,表示是我关注的操作
                if(old == 1){
                }
                break;
            }
        }
        return re;
    }
    public void offer(E e) {
        q.offer(e);
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old+1)){
                // 插入元素前,队列是空,表示是我关注的操作
                if(old == 0){
                }
                break;
            }
        }
    }

此时,也许细心的朋友会问,因为没有使用锁,这个变量并不能真实反映队列的大小,也就不能确定它是不是关注的操作。没错,是不能真实反映,但是,我获取关注的操作的目的是用来指导我:该不该注册新的写操作,该变量的值变化就能提供正确的指导,所以,同样是正确的,只不过途径不同而已。理论上的分析和后来的项目正确运行都印证了该方法的正确性。

解决方案二

因为上面的方法额外加了一次lock-free级别的CAS操作,我心里总不太舒服,空余时间总在琢磨,真的就没有办法,在不增加额外lock-free级别CAS开支的情况下,知晓一个操作是不是关注的操作?

后来经分析,如果要知晓是不是关注的操作,跟两个数据有关,真实的头节点跟尾节点(不同于head跟tail,因为它们是滞后的,之前将它们包装成一个对象就是犯了该错误),ConcurrentLinkedQueue的实现中,这两个节点是没有竞争的,一个是更新item,一个是更新next,必需得让他们竞争同一个东西,才能解决该问题,于是我想到了一个办法,取出完成后,如果该节点的next为null,就将其用CAS置为一个特殊的值,若成功则认为是关注的操作;插入成功后,如果next被替换掉的值不是null而是这个特殊值,那么该插入也为关注的操作。这仅增加了一次wait-free级别的CAS操作(取出后的那次CAS),perfect!

因为ConcurrentLinkedQueue的很多变量、内部类都是私有的,没法通过继承来改造,没办法,只得自行实现。对于队列里使用的Node,实现的方式有很多种,可以使用AtomicReference、AtomicReferenceFieldUpdater来实现,如果你愿意的话,甚至是像ConcurrentLinkedQueue一样,用sun.misc.Unsafe来实现(注意:一般来说,sun包下的类是不推荐使用的),各有优缺点吧,所以我就不提供该队列的具体实现了,下面给出在ConcurrentLinkedQueue(版本:1.7.0_10)基础上进行的改造,供参考。注意,如果需要用到size等方法,因为特殊值的引入,影响了之前的判断逻辑,应重新编写。

   /**
    * @author trytocatch@163.com
    */
    private static final Node MARK_NODE = new Node(null);
    public boolean offer(E e) {
        checkNotNull(e);
        final Node newNode = new Node(e);
        for (Node t = tail, p = t;;) {
            Node q = p.next;
            if (q == null || q == MARK_NODE) {//修改1:加入条件:或q == MARK_NODE
                // p is last node
                if (p.casNext(q, newNode)) {//修改2:将null改为q
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (q == MARK_NODE)//修改3:
                        ;//此时为关注的操作(为了简化代码显示,仅注释)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode); // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list. If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable. Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node h = head, p = h, q;;) {
                E item = p.item;
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    if (p.casNext(null,MARK_NODE))//修改1:
                        ;//此时为关注的操作
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

小结

设计非阻塞算法的关键在于,找出竞争点,如果获取的某个信息跟两个操作有关,那么应该让这两个操作竞争同一个东西,这样才能反应出它们的关系。

陳述
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
解决win11中同时播放耳机和音响的问题解决win11中同时播放耳机和音响的问题Jan 06, 2024 am 08:50 AM

一般来说,我们只需要同时使用耳机或者音响的其中一个设备,但是有些朋友反映在win11系统中,遇到了耳机和音响一起响的问题,其实我们可以在realtek面板中将它关闭,就可以了,下面一起来看一下吧。win11耳机和音响一起响怎么办1、首先在桌面上找到并打开“控制面板”2、进入控制面板,在其中找到并打开“硬件和声音”3、然后再找到一个喇叭图标的“Realtek高清晰音频管理器”4、选择“扬声器”再点击“后面板”进入扬声器设置。5、打开之后我们可以看到设备类型,如果要关闭耳机就取消勾选“耳机”,如果要

MySql的数据迁移和同步:如何实现多台服务器之间的MySQL数据迁移和同步MySql的数据迁移和同步:如何实现多台服务器之间的MySQL数据迁移和同步Jun 15, 2023 pm 07:48 PM

MySQL是一个非常流行的开源关系型数据库管理系统,广泛应用于各种Web应用、企业系统等。在现代业务的应用场景下,大多数的MySQL数据库需要部署在多台服务器上,以提供更高的可用性和性能,这就需要进行MySQL数据的迁移和同步。本文将介绍如何实现多台服务器之间的MySQL数据迁移和同步。一.MySQL数据迁移MySQL数据迁移指的是将MySQL服务器中的数

教你如何将win10剪贴板与手机同步教你如何将win10剪贴板与手机同步Jan 06, 2024 am 09:18 AM

win10剪贴板有个非常好用的功能就是跨设备云储存功能,非常的好用可以帮助用户PC设备和手机设备同步复制黏贴。设置的方法非常简单,只要在系统里的剪切板设置就好。win10剪贴板同步到手机1、首先点击左下角的开始,进入设置。2、然后去点击“系统”。3、选择左侧的“剪贴板”。4、最后在右边的“跨设备同步”中点击登录,然后选择手机就好了。

如何在 Windows 11 的 OneDrive 中选择要同步的特定文件夹如何在 Windows 11 的 OneDrive 中选择要同步的特定文件夹Apr 13, 2023 pm 04:22 PM

您系统上的 OneDrive 应用程序将所有文件和文件夹存储在云端。但有时用户不希望某些文件或文件夹被存储并占用限制为 5 GB 的 OneDrive 空间而无需订阅。为此,OneDrive 应用程序中有一个设置,允许用户选择要在云上同步的文件或文件夹。如果您也在寻找这个,那么这篇文章将帮助您在 Windows 11 系统的 OneDrive 中选择要同步的文件夹或文件。如何在 Windows 11 的 OneDrive 中选择要同步的某些文件夹注意:确保 OneDrive 应用程序已连接并同步

Python 并发编程中的锁与同步:保持你的代码安全可靠Python 并发编程中的锁与同步:保持你的代码安全可靠Feb 19, 2024 pm 02:30 PM

并发编程中的锁与同步在并发编程中,多个进程或线程同时运行,这可能会导致资源争用和不一致性问题。为了解决这些问题,需要使用锁和同步机制来协调对共享资源的访问。锁的概念锁是一种机制,它允许一次只有一个线程或进程访问共享资源。当一个线程或进程获得锁时,其他线程或进程将被阻止访问该资源,直到锁被释放。锁的类型python中有几种类型的锁:互斥锁(Mutex):确保一次只有一个线程或进程可以访问资源。条件变量:允许线程或进程等待某个条件,然后获取锁。读写锁:允许多个线程同时读取资源,但只允许一个线程写入资

iPhone与win10日历同步iPhone与win10日历同步Jan 02, 2024 pm 12:53 PM

很多小伙伴想把win10电脑日历和自己的苹果手机日历同步,这个该如何解决呢?我们可以通过iCloud账号,分别登陆AppleID,获取密码,再登陆日历的账户解决这个问题,下面一起来看详细的教程吧。win10日历同步iphone的方法1、先打开AppleID的登陆界面,通过双重验证,输入手机的验证码,然后登陆向左2、在安全中选择“生成密码”3、输入密码标签,然后官网就会给你一个密码。将这个密码复制下来(不是你原来登陆iCloud的密码,是一个新的密码)4、打开日历——添加账户——选择iCloud—

了解Go语言中阻塞的实现方法与优势了解Go语言中阻塞的实现方法与优势Mar 24, 2024 am 08:36 AM

Go语言是一种并发特性十分强大的编程语言,它采用了goroutine的概念来实现并发,同时也提供了丰富的工具和方法来处理阻塞。在Go语言中,阻塞的实现方法与优势是我们需要了解的重要内容。本文将介绍Go语言中阻塞的实现方法及其优势,并提供具体的代码示例来帮助读者更好地理解。阻塞的实现方法在Go语言中,阻塞可以通过多种方式实现,其中包括通道(channel)、互

vivo云服务怎么同步vivo云服务怎么同步Feb 05, 2024 pm 05:05 PM

vivo软件中有很多的功能可以使用,在其中用户最为好奇的就是vivo云服务怎么同步呢?现在就来看一下小编给大家带来的vivo云服务同步的方法是什么吧。1、首先打开云服务软件进入到首页之后【登录】;2、然后选择需要备份的内容;3、最后在点击【备份】即可;

See all articles

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
2 週前By尊渡假赌尊渡假赌尊渡假赌
倉庫:如何復興隊友
1 個月前By尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒險:如何獲得巨型種子
4 週前By尊渡假赌尊渡假赌尊渡假赌

熱工具

MantisBT

MantisBT

Mantis是一個易於部署的基於Web的缺陷追蹤工具,用於幫助產品缺陷追蹤。它需要PHP、MySQL和一個Web伺服器。請查看我們的演示和託管服務。

VSCode Windows 64位元 下載

VSCode Windows 64位元 下載

微軟推出的免費、功能強大的一款IDE編輯器

Dreamweaver Mac版

Dreamweaver Mac版

視覺化網頁開發工具

SublimeText3 英文版

SublimeText3 英文版

推薦:為Win版本,支援程式碼提示!

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器