Heim >Datenbank >MySQL-Tutorial >非阻塞同步算法实战(三)-LatestResultsProvider

非阻塞同步算法实战(三)-LatestResultsProvider

WBOY
WBOYOriginal
2016-06-07 16:32:251230Durchsuche

感谢trytocatch投递本文。 前言 阅读本文前,需要读者对happens-before比较熟悉,了解非阻塞同步的一些基本概念。本文主要为happens-before法则的灵活运用,和一些解决问题的小技巧,分析问题的方式。 背景介绍 原始需求为:本人当时在编写一个正则替换工具

感谢trytocatch投递本文。

前言

阅读本文前,需要读者对happens-before比较熟悉,了解非阻塞同步的一些基本概念。本文主要为happens-before法则的灵活运用,和一些解决问题的小技巧,分析问题的方式。

背景介绍

原始需求为:本人当时在编写一个正则替换工具,里面会动态地显示所有的匹配结果(包括替换预览),文本、正则表达式、参数,这些数据的其中一项发生了变化,结果就应该被更新,为了提供友好的交互体验,数据变化时,应该是发起一个异步请求,由另一个独立的线程来完成运算,完成后通知UI更新结果。由于是动态显示,所以提交会非常频繁。

需求描述

需要这样一个工具类,允许用户频繁地提交数据(本文之后以“submit”表示该操作)和更新结果(本文之后以“update”表示该操作),submit时,如果当前有进行中的运算,则应该取消,使用新参数执行新的运算;update时,如果当前没有进行中的运算(处于阻塞状态),并且当前结果不是最新的,则唤醒该线程,使用当前的新数据,执行新的运算。此处之所以分为submit和update两个方法,是为了支持手动更新,即点击更新按钮时,才更新结果。

此外,出于练手的原因,也出于编写一个功能全面,更实用的工具的目的,我还加入了一些额外的需求:

1、引入多线程场景,update和submit均可由多个线程同时发起,该工具类应设计成线程安全的。

2、允许延迟执行运算,如果延时内执行submit,仅重新计算延时。如果运算不方便取消,在短时间频繁submit的场景下,延时会是一个很好的应对办法。

3、允许设置一个最大延迟时间,作为延迟开启运算的补充。当长时间频繁submit时,会形成这样的局面,一直未进入运算环节,新结果计算不出来,上一次计算结果却是很早以前的。如果需要显示一个较新但不是最新的结果,最大延迟时间将会很有用。

4、提供主动取消方法,主动取消正在进行的运算。

5、update时,允许等待运算完成,同时也可设置超时时间。当主动取消、超时、完成了当前或更(更加的意思)新的数据对应的运算时,结束等待。

需求交待完了,有兴趣有精力的读者,可以先试着思考下怎么实现。

问题分析

该工具应该维护一个状态字段,这样才能在发起某个操作时,根据所处的状态作出正确的动作,如:如果当前不处于停止状态(或者主动取消状态,原因见下文),执行update就不需要唤醒运算线程。简单分析可知,至少应该有这样几种状态:

1、停止状态:当前没有运算任务,线程进入阻塞状态,主动取消和运算完成后,进入该状态

2、延迟状态:设置了延迟开启运算时,进入运算前,处于该状态

3、运算状态:正在执行运算

4、主动取消状态:当发起主动取消时,进入该状态

5、新任务状态:当时有新的运算任务时,进入该状态,然后重新进入运算状态

延迟

再来看一下延迟,如果延迟500毫秒,就每次sleep(500),那么期间再submit怎么办?将它唤醒然后重新sleep(500)吗?显然不行,成本太大了。

我有一个小技巧:将500分成多个合适的等份,使用一个计数器,每次sleep一个等份,计数器加1,如果发起submit,仅把计数器置0即可,虽然看起来线程的状态切换变多了,但应对频繁重置时,它更稳定。虽然时间上会上下波动一个等份,但此处并不需要多么精确。

现在还面临这样一个问题,如何知道当前是处于延迟状态并计数器置0?取出状态值进行判断,然后置0,这方法显然不行,因为置0的时候,可能状态已经变了,所以你无法知道该操作是否生效了。

我想到的办法是,再引入一个延迟重置状态。如果处于该状态,则下一次计数器加1时,将计数器重置,状态变更是可以知道成功与否的。

状态变更

有些状态的变更是有条件的,比如说当前处于取消状态,就不能把它转为运算状态,运算状态只能由新任务状态、延迟状态(延迟完成后执行运算)或延迟重置状态转入。这种场景正好跟CAS一致,所以,使用一个AtomicInteger来表示状态。

分析下各状态之间的转换,可以得出下面的状态变更图:

状态变更图

蓝色的a(bcd)|(e)f线路为停止状态下,发起一次update,运算完重新回到停止的过程,开启延迟时是bcd,否则是e。

红色的线j表示超过了最大延迟时间,退出延迟,进入运算状态(也可以是d)。

绿色的线ghi(包括a)表示:如果发起了submit或update,状态应该怎么改变。如果处于延迟重置、新任务则不需要进行任何操作;如果处于延迟状态,则转为延迟重置即可;如果处于运算状态,则可能使用了旧参数,应该转为新任务;如果为主动取消或停止状态,并且是调用update方法,则转为新任务,并且可能处于阻塞状态,应该唤醒该线程。

黑色的线l表示,可在任意状态下发起主动取消,进入该状态。然后通知等待线程后,转入停止状态,对应紫色的k,如果在停止状态下发起主动取消,则仅转为主动取消状态,不会通知等待线程。所以当线程阻塞时,可能处于停止状态或者主动取消状态。

顺序问题

上面已经分析到,当submit时,应该把延迟转为延迟重置、或运算转为新任务,这两个尝试的顺序是不是也有讲究呢?

是的,因为正常执行流程a(bcd)|(e)f中,运算状态在延迟状态之后,假如先尝试运算转为新任务,可能此时为延迟状态,故失败,再尝试延迟转为延迟重置时,状态在这期间从刚才的延迟转为了运算,故两次尝试都失败了,本应该重置延迟的,却什么也没干,这是错误的。而将两次尝试顺序调换一下,只要状态为延迟或运算,那么两次状态转换尝试中,一定有一次会成功。

之后的代码中还有多处类似的顺序细节。

解决方案

下面给出完整的代码,除去等待运算完成那部分,其它地方均为wait-free级别的实现。

calculateResult是具体执行运算的方法;上文中的submit对应代码里的updateParametersVersion方法,上文中的update对应剩余几个update方法。

updateAndWait方法中,使用了上一篇中讲到的BoundlessCyclicBarrier,其维护的版本号就是参数的版本号ParametersVersion。

/**
 * @author trytocatch@163.com
 * @date 2013-2-2
 */
public abstract class LatestResultsProvider {
    /** update return value */
    public static final int UPDATE_FAILED = -1;
    public static final int UPDATE_NO_NEED_TO_UPDATE = 0;
    public static final int UPDATE_SUCCESS = 1;
    public static final int UPDATE_COMMITTED = 2;
    /** update return value */
    /** work states*/
    private static final int WS_OFF = 0;
    private static final int WS_NEW_TASK = 1;
    private static final int WS_WORKING = 2;
    private static final int WS_DELAYING = 3;
    private static final int WS_DELAY_RESET = 4;
    private static final int WS_CANCELED = 5;
    /** work states*/
    private final AtomicInteger workState;
    private int sleepPeriod = 30;
    private final AtomicInteger parametersVersion;
    private volatile int updateDelay;// updateDelay>=0
    private volatile int delayUpperLimit;
    private final BoundlessCyclicBarrier barrier;
    private Thread workThread;
    /**
     *
     * @param updateDelay unit: millisecond
     * @param delayUpperLimit limit the sum of the delay, disabled
     * while delayUpperLimit 0 ? WS_DELAY_RESET : WS_WORKING)) {
                            if (workState.compareAndSet(WS_CANCELED, WS_OFF)) {
                                barrier.cancel();
                            }
                            LockSupport.park();
                            interrupted();
                        }
                        if (workState.get() == WS_DELAY_RESET) {
                            int delaySum = 0;
                            for (;;) {
                                if (workState.compareAndSet(WS_DELAY_RESET,
                                        WS_DELAYING)) {
                                    sleepCount = (updateDelay + sleepPeriod - 1)
                                            / sleepPeriod;
                                }
                                sleep(sleepPeriod);
                                if (--sleepCount = 0) {
                                    delaySum += sleepPeriod;
                                    if (delaySum >= delayUpperLimit) {
                                        if (!workState.compareAndSet(
                                                WS_DELAYING, WS_WORKING))
                                            workState.compareAndSet(
                                                    WS_DELAY_RESET, WS_WORKING);
                                        break;
                                    }
                                }
                                if (workState.get() != WS_DELAYING
                                        && workState.get() != WS_DELAY_RESET)
                                    break;
                            }
                        }
                        if (isWorking()) {
                            int workingVersion = parametersVersion.get();
                            try {
                                calculateResult();
                                if (workState.compareAndSet(WS_WORKING, WS_OFF))
                                    barrier.nextCycle(workingVersion);
                            } catch (Throwable t) {
                                t.printStackTrace();
                                workState.set(WS_CANCELED);
                            }
                        }
                    } catch (InterruptedException e) {
                        workState.compareAndSet(WS_DELAYING, WS_CANCELED);
                        workState.compareAndSet(WS_DELAY_RESET, WS_CANCELED);
                    }
                }// for(;;)
            }// run()
        };
        workThread.setDaemon(true);
        workThread.start();
    }
    public int getUpdateDelay() {
        return updateDelay;
    }
    /**
     * @param updateDelay
     *            delay time. unit: millisecond
     */
    public void setUpdateDelay(int updateDelay) {
        this.updateDelay = updateDelay 
<p>代码中,我直接在构造方法里开启了新的线程,一般来说,是不推荐这样做的,但在此处,除非在构造还未完成时就执行update方法,否则不会引发什么问题。</p>
<p>最后,附上该正则替换工具的介绍和下载地址:http://www.cnblogs.com/trytocatch/p/RegexReplacer.html</p>
<h2>小结</h2>
<p>状态变更非常适合使用非阻塞算法,并且还能够达到wait-free级别。限于篇幅,有些没讲到的细节,请读者借助代码来理解吧,如有疑问,欢迎回复讨论。</p>
<h2>系列总结</h2>
<p>本实战系列就到此结束了,简单总结下。</p>
<p>非阻塞同步相对于锁同步而言,由代码块,转为了点,是另一种思考方式。</p>
<p>有时,无法做到一步完成,也许可以分成两步完成,同样可以解决问题,ConcurrentLinkedQueue就是这么做的。</p>
<p>如果需要维护多个数据之间的某种一致关系,则可以将它们封装到一个类中,更新时采用更新该类对象的引用的方式。</p>
<p>众所周知,锁同步算法是难以测试的,非阻塞同步算法更加难以测试,我个人认为,其正确性主要靠慎密的推敲和论证。</p>
<p>非阻塞同步算法比锁同步算法要显得更复杂些,如果对性能要求不高,对非阻塞算法掌握得还不太熟练,建议不要使用非阻塞算法,锁同步算法要简洁得多,也更容易维护,如上面所说的,两条看似没有顺序的语句,调换下顺序,可能就会引发BUG。</p>

    <p class="copyright">
        原文地址:非阻塞同步算法实战(三)-LatestResultsProvider, 感谢原作者分享。
    </p>
    
    


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn