찾다
데이터 베이스MySQL 튜토리얼非阻塞同步算法实战(一)

非阻塞同步算法实战(一)

Jun 07, 2016 pm 04:31 PM
동기식실제 전투연산차단하다

感谢trytocache投递本文。 前言 本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。 背景介绍 上个月,我被安排独自负责一个聊天系统的服务端,因为一些

感谢trytocache投递本文。

前言

本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。

背景介绍

上个月,我被安排独自负责一个聊天系统的服务端,因为一些原因,我没使用现成的开源框架,网络那块直接使用AIO,收数据时,因为只会从channel里过来,所以不需要考虑同步问题;但是发送数据时,因为有聊天消息的转发,所以必需处理这个同步问题。AIO中,是处理完一个注册的操作后,再执行我们定义的方法,此时,如果还有数据需要写,则继续注册写操作,如果没有则不注册;提交数据时,如果当前没有注册写操作,则注册一个,否则仅提交(此时再注册则会报异常)。这样,需要同步的点就是:如何知道当前还有没有数据需要发送(因为其它线程也可以提交数据到此处),和如何知道此次提交时,有没有注册写操作。总之,要完成:有数据要发送时,必需有写操作被注册,并且只能注册一次;没有数据时,不能有写操作被注册。

问题分析

经过分析,上面的问题,可以抽象成:我需要知道当往队列里插入一条数据之前,该队列是否为空,如果为空则应该注册新的写操作。当从队列里取出一条数据后,该队列是否为非空,如果非空则应该继续注册写操作。(本文之后以“关注的操作”来表示这种场景下的插入或取出操作)

目前的问题是,我使用的队列是ConcurrentLinkedQueue,但是它的取出数据的方法,没有返回值告诉我们从队列里取出数据之后队列是否为空,如果是使用size或peek方法来执行判断,那就必需加锁了,否则在拿到队列大小时,可能队列大小已经变化了。所以我首先想到的是,如何对该队列进行改造,让它提供该信息。

注意:这里指的不是当次而是之后,所以如果我们使用队列的peek()方法返回null,就知道队列是否为空,但是不知道之后是否为空 ,并且,当关注的操作发生时,在插入或取出操作的返回值里告知此信息,来指导是否继续注册写操作。

用锁实现

如果使用锁的话很容易处理,用锁同步插入和取出方法,下面是锁实现的参考:

    public E poll() {
        synchronized (this) {
            E re = q.poll();
            // 获取元素后,队列是空,表示是我关注的操作
            if (q.peek() == null) {
            }
            return re;
        }
    }
    public void offer(E e) {
        synchronized (this) {
            // 插入元素前,队列是空,表示是我关注的操作
            if (q.peek() == null) {
            }
            q.offer(e);
        }
    }

但因为是服务端,我想用非阻塞同步算法来实现。

尝试方案一

我第一次想到的改造办法是,将head占位节点改成固定的,头节点移动时,只将head节点的next指向新的节点,在插入数据时,如果是在head节点上成功执行的该操作,那么该插入就是关注的的操作;在取出时,如果将head节点的next置为了null,那么该取出就是关注的操作(?因为之前的占位节点是变化的,所以没法比较,必需用同步,现在是固定的了,所以可以直接与head节点进行比较?)。如此一来,问题好像被解决了。改造完之后,出于严谨,我仔细读了一遍代码,发现引入了新的问题,我的取出操作是这样写的

/**
 * @author trytocatch@163.com
 */
public E poll(){
    for(;;){
        Node n = head.nextRef.get();//head指向固定的head节点,为final
        if(n == null)
            return null;
        Node m = n.nextRef.get();
        if(head.next.compareAndSet(n,m){
            if(m==null)
                ;//此时为关注的操作(为了简化代码显示,不将该信息当作返回值返回了,仅注释)
            return n.itemRef.get();
        }
    }
}

这里有一个致命的问题:如果m为null,在CAS期间,插入了新节点,n的next由null变成了非null,紧接着又把head的next更新为了null,那么链就断了,该方法还存在一些其它的问题,如当队列为空的时候,尾节点指向了错误的位置,本应该是head的。我认为最根本的原因在于,head不能设为固定的,否则会引发一堆问题。第一次尝试宣告失败。

尝试方案二

这次我尝试将head跟tail两个引用包装成一个对象,然后对这个对象进行CAS更新(这仅为一处理论上的尝试,因为从性能上面来讲,已经大打折扣了,创建了很多额外的对象),如果head跟tail指向了同一个节点,则认为队列是空的,根据此信息来判断一个操作是不是关注的操作。但该尝试仅停留在了理论分析阶段,期间发现了一些小问题,没法解决,后来我发现,我把ConcurrentLinkedQueue原本可以分成两步执行的插入和取出操作(更新节点的next或item引用,然后再更新head或tail引用),变成了必需一步完成,ConcurrentLinkedQueue尚不能一步完成,我何德何能,可将它们一步完成?所以直接放弃了。

解决方案一

经过两次的失败尝试,我几乎绝望了,我怀疑这是不是不能判断出是否为关注的操作。

因为是在做项目,周末已经过去了,不能再搞这些“研究”了,所以我退而求其次,想了个不太漂亮的办法,在队列外部维护一个变量,记录队列有多大,在插入或取出后,更新该变量,使用的是AtomicInteger,如果更新时,将变量从1变成0,或从0变成了1,就认为该插入或取出为关注的操作。

    private AtomicInteger size = new AtomicInteger(0);
    public E poll() {
        E re = q.poll();
        if (re == null)
            return null;
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old-1)){
                // 获取元素后,队列是空,表示是我关注的操作
                if(old == 1){
                }
                break;
            }
        }
        return re;
    }
    public void offer(E e) {
        q.offer(e);
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old+1)){
                // 插入元素前,队列是空,表示是我关注的操作
                if(old == 0){
                }
                break;
            }
        }
    }

此时,也许细心的朋友会问,因为没有使用锁,这个变量并不能真实反映队列的大小,也就不能确定它是不是关注的操作。没错,是不能真实反映,但是,我获取关注的操作的目的是用来指导我:该不该注册新的写操作,该变量的值变化就能提供正确的指导,所以,同样是正确的,只不过途径不同而已。理论上的分析和后来的项目正确运行都印证了该方法的正确性。

解决方案二

因为上面的方法额外加了一次lock-free级别的CAS操作,我心里总不太舒服,空余时间总在琢磨,真的就没有办法,在不增加额外lock-free级别CAS开支的情况下,知晓一个操作是不是关注的操作?

后来经分析,如果要知晓是不是关注的操作,跟两个数据有关,真实的头节点跟尾节点(不同于head跟tail,因为它们是滞后的,之前将它们包装成一个对象就是犯了该错误),ConcurrentLinkedQueue的实现中,这两个节点是没有竞争的,一个是更新item,一个是更新next,必需得让他们竞争同一个东西,才能解决该问题,于是我想到了一个办法,取出完成后,如果该节点的next为null,就将其用CAS置为一个特殊的值,若成功则认为是关注的操作;插入成功后,如果next被替换掉的值不是null而是这个特殊值,那么该插入也为关注的操作。这仅增加了一次wait-free级别的CAS操作(取出后的那次CAS),perfect!

因为ConcurrentLinkedQueue的很多变量、内部类都是私有的,没法通过继承来改造,没办法,只得自行实现。对于队列里使用的Node,实现的方式有很多种,可以使用AtomicReference、AtomicReferenceFieldUpdater来实现,如果你愿意的话,甚至是像ConcurrentLinkedQueue一样,用sun.misc.Unsafe来实现(注意:一般来说,sun包下的类是不推荐使用的),各有优缺点吧,所以我就不提供该队列的具体实现了,下面给出在ConcurrentLinkedQueue(版本:1.7.0_10)基础上进行的改造,供参考。注意,如果需要用到size等方法,因为特殊值的引入,影响了之前的判断逻辑,应重新编写。

   /**
    * @author trytocatch@163.com
    */
    private static final Node MARK_NODE = new Node(null);
    public boolean offer(E e) {
        checkNotNull(e);
        final Node newNode = new Node(e);
        for (Node t = tail, p = t;;) {
            Node q = p.next;
            if (q == null || q == MARK_NODE) {//修改1:加入条件:或q == MARK_NODE
                // p is last node
                if (p.casNext(q, newNode)) {//修改2:将null改为q
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (q == MARK_NODE)//修改3:
                        ;//此时为关注的操作(为了简化代码显示,仅注释)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode); // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list. If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable. Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }
    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node h = head, p = h, q;;) {
                E item = p.item;
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    if (p.casNext(null,MARK_NODE))//修改1:
                        ;//此时为关注的操作
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

小结

设计非阻塞算法的关键在于,找出竞争点,如果获取的某个信息跟两个操作有关,那么应该让这两个操作竞争同一个东西,这样才能反应出它们的关系。

성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
MySQL은 sqlite와 어떻게 다릅니 까?MySQL은 sqlite와 어떻게 다릅니 까?Apr 24, 2025 am 12:12 AM

MySQL과 Sqlite의 주요 차이점은 설계 개념 및 사용 시나리오입니다. 1. MySQL은 대규모 응용 프로그램 및 엔터프라이즈 수준의 솔루션에 적합하며 고성능 및 동시성을 지원합니다. 2. SQLITE는 모바일 애플리케이션 및 데스크탑 소프트웨어에 적합하며 가볍고 내부질이 쉽습니다.

MySQL의 색인이란 무엇이며 성능을 어떻게 향상 시키는가?MySQL의 색인이란 무엇이며 성능을 어떻게 향상 시키는가?Apr 24, 2025 am 12:09 AM

MySQL의 인덱스는 데이터 검색 속도를 높이는 데 사용되는 데이터베이스 테이블에서 하나 이상의 열의 주문 구조입니다. 1) 인덱스는 스캔 한 데이터의 양을 줄임으로써 쿼리 속도를 향상시킵니다. 2) B-Tree Index는 균형 잡힌 트리 구조를 사용하여 범위 쿼리 및 정렬에 적합합니다. 3) CreateIndex 문을 사용하여 CreateIndexIdx_customer_idonorders (customer_id)와 같은 인덱스를 작성하십시오. 4) Composite Indexes는 CreateIndexIdx_customer_orderOders (Customer_id, Order_Date)와 같은 다중 열 쿼리를 최적화 할 수 있습니다. 5) 설명을 사용하여 쿼리 계획을 분석하고 피하십시오

MySQL에서 트랜잭션을 사용하여 데이터 일관성을 보장하는 방법을 설명하십시오.MySQL에서 트랜잭션을 사용하여 데이터 일관성을 보장하는 방법을 설명하십시오.Apr 24, 2025 am 12:09 AM

MySQL에서 트랜잭션을 사용하면 데이터 일관성이 보장됩니다. 1) STARTTRANSACTION을 통해 트랜잭션을 시작한 다음 SQL 작업을 실행하고 커밋 또는 롤백으로 제출하십시오. 2) SavePoint를 사용하여 부분 롤백을 허용하는 저장 지점을 설정하십시오. 3) 성능 최적화 제안에는 트랜잭션 시간 단축, 대규모 쿼리 방지 및 격리 수준을 합리적으로 사용하는 것이 포함됩니다.

MySQL을 통해 어떤 시나리오에서 PostgreSQL을 선택할 수 있습니까?MySQL을 통해 어떤 시나리오에서 PostgreSQL을 선택할 수 있습니까?Apr 24, 2025 am 12:07 AM

MySQL 대신 PostgreSQL을 선택한 시나리오에는 다음이 포함됩니다. 1) 복잡한 쿼리 및 고급 SQL 기능, 2) 엄격한 데이터 무결성 및 산 준수, 3) 고급 공간 기능이 필요하며 4) 큰 데이터 세트를 처리 할 때 고성능이 필요합니다. PostgreSQL은 이러한 측면에서 잘 수행되며 복잡한 데이터 처리 및 높은 데이터 무결성이 필요한 프로젝트에 적합합니다.

MySQL 데이터베이스를 어떻게 보호 할 수 있습니까?MySQL 데이터베이스를 어떻게 보호 할 수 있습니까?Apr 24, 2025 am 12:04 AM

MySQL 데이터베이스의 보안은 다음 조치를 통해 달성 할 수 있습니다. 1. 사용자 권한 관리 : CreateUser 및 Grant 명령을 통한 액세스 권한을 엄격히 제어합니다. 2. 암호화 된 전송 : 데이터 전송 보안을 보장하기 위해 SSL/TLS를 구성합니다. 3. 데이터베이스 백업 및 복구 : MySQLDump 또는 MySQLPump를 사용하여 정기적으로 백업 데이터를 사용하십시오. 4. 고급 보안 정책 : 방화벽을 사용하여 액세스를 제한하고 감사 로깅 작업을 가능하게합니다. 5. 성능 최적화 및 모범 사례 : 인덱싱 및 쿼리 최적화 및 정기 유지 보수를 통한 안전 및 성능을 모두 고려하십시오.

MySQL 성능을 모니터링하는 데 사용할 수있는 몇 가지 도구는 무엇입니까?MySQL 성능을 모니터링하는 데 사용할 수있는 몇 가지 도구는 무엇입니까?Apr 23, 2025 am 12:21 AM

MySQL 성능을 효과적으로 모니터링하는 방법은 무엇입니까? Mysqladmin, Showglobalstatus, Perconamonitoring and Management (PMM) 및 MySQL Enterprisemonitor와 같은 도구를 사용하십시오. 1. MySQLADMIN을 사용하여 연결 수를보십시오. 2. showglobalstatus를 사용하여 쿼리 번호를보십시오. 3.pmm은 자세한 성능 데이터 및 그래픽 인터페이스를 제공합니다. 4. MySQLENTERPRISOMITOR는 풍부한 모니터링 기능 및 경보 메커니즘을 제공합니다.

MySQL은 SQL Server와 어떻게 다릅니 까?MySQL은 SQL Server와 어떻게 다릅니 까?Apr 23, 2025 am 12:20 AM

MySQL과 SqlServer의 차이점은 1) MySQL은 오픈 소스이며 웹 및 임베디드 시스템에 적합합니다. 2) SQLServer는 Microsoft의 상용 제품이며 엔터프라이즈 수준 애플리케이션에 적합합니다. 스토리지 엔진의 두 가지, 성능 최적화 및 응용 시나리오에는 상당한 차이가 있습니다. 선택할 때는 프로젝트 규모와 향후 확장 성을 고려해야합니다.

MySQL을 통해 어떤 시나리오에서 SQL Server를 선택할 수 있습니까?MySQL을 통해 어떤 시나리오에서 SQL Server를 선택할 수 있습니까?Apr 23, 2025 am 12:20 AM

고 가용성, 고급 보안 및 우수한 통합이 필요한 엔터프라이즈 수준의 응용 프로그램 시나리오에서는 MySQL 대신 SQLServer를 선택해야합니다. 1) SQLServer는 고 가용성 및 고급 보안과 같은 엔터프라이즈 수준의 기능을 제공합니다. 2) VisualStudio 및 Powerbi와 같은 Microsoft Ecosystems와 밀접하게 통합되어 있습니다. 3) SQLSERVER는 성능 최적화에서 우수한 성능을 발휘하며 메모리 최적화 된 테이블 및 열 스토리지 인덱스를 지원합니다.

See all articles

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

Video Face Swap

Video Face Swap

완전히 무료인 AI 얼굴 교환 도구를 사용하여 모든 비디오의 얼굴을 쉽게 바꾸세요!

뜨거운 도구

Eclipse용 SAP NetWeaver 서버 어댑터

Eclipse용 SAP NetWeaver 서버 어댑터

Eclipse를 SAP NetWeaver 애플리케이션 서버와 통합합니다.

mPDF

mPDF

mPDF는 UTF-8로 인코딩된 HTML에서 PDF 파일을 생성할 수 있는 PHP 라이브러리입니다. 원저자인 Ian Back은 자신의 웹 사이트에서 "즉시" PDF 파일을 출력하고 다양한 언어를 처리하기 위해 mPDF를 작성했습니다. HTML2FPDF와 같은 원본 스크립트보다 유니코드 글꼴을 사용할 때 속도가 느리고 더 큰 파일을 생성하지만 CSS 스타일 등을 지원하고 많은 개선 사항이 있습니다. RTL(아랍어, 히브리어), CJK(중국어, 일본어, 한국어)를 포함한 거의 모든 언어를 지원합니다. 중첩된 블록 수준 요소(예: P, DIV)를 지원합니다.

DVWA

DVWA

DVWA(Damn Vulnerable Web App)는 매우 취약한 PHP/MySQL 웹 애플리케이션입니다. 주요 목표는 보안 전문가가 법적 환경에서 자신의 기술과 도구를 테스트하고, 웹 개발자가 웹 응용 프로그램 보안 프로세스를 더 잘 이해할 수 있도록 돕고, 교사/학생이 교실 환경 웹 응용 프로그램에서 가르치고 배울 수 있도록 돕는 것입니다. 보안. DVWA의 목표는 다양한 난이도의 간단하고 간단한 인터페이스를 통해 가장 일반적인 웹 취약점 중 일부를 연습하는 것입니다. 이 소프트웨어는

Atom Editor Mac 버전 다운로드

Atom Editor Mac 버전 다운로드

가장 인기 있는 오픈 소스 편집기

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경