Home >Database >Mysql Tutorial >国际搜索离线系统优化之一 全局排序优化

国际搜索离线系统优化之一 全局排序优化

WBOY
WBOYOriginal
2016-06-07 16:34:021375browse

总觉得阶段性的总结是个好习惯,很多自己做的事情,如果不及时总结一下,过一段时间就忘记了,当要用到时,又需要花费较多的时间去重新熟悉。于是决定抽点时间总结一下以前对国际搜索离线系统做的一些优化(这里说的国际搜索,主要指AE、SC和SC店铺,AE即Ali

总觉得阶段性的总结是个好习惯,很多自己做的事情,如果不及时总结一下,过一段时间就忘记了,当要用到时,又需要花费较多的时间去重新熟悉。于是决定抽点时间总结一下以前对国际搜索离线系统做的一些优化(这里说的国际搜索,主要指AE、SC和SC店铺,AE即AliExpress,SC即Sourcing,这些优化对这几个应用都是通用的),不仅起到一个备忘的作用,如果能给读者带来一些启发,想必也是极好的。

既然是搜索离线系统相关,我们就先看一下国际搜索全量流程的几个主要环节,如图1所示。

全量流程

图1. 全量流程

1)dump,将数据从数据库读出来,写入hbase,只有做大全量的时候才会全量dump数据库,一般情况下每天只需跑一次小全量,数据库中数据的更新会以增量的方式更新hbase。

2)join,读取hbase,做多表join,生成一条条doc,一条doc包含了一条产品的全部字段。

3)global sort,即全局排序,按产品全局分global_score对产品进行全局排序,生成的单个文件内部并不要求有序。

4)abuild,读取全局排序后生成的文件,构建索引,生成的索引会存储在HDFS上。

5)dispatch,将索引从HDFS上分发到对应的search机器上。

6)switch,切换索引、程序、配置和算法词典,新索引上线,对外提供服务。

这次先总结一下全局排序优化,任何项目或需求都有相应的背景,我们的离线计算中为何要做全局排序?

说到这个,又引出了分层检索,早些时候,国际站搜索引擎对外提供服务时,在处理每个搜索请求时,都会查询所有的segment,但其实对于每个请求,都只需返回一定数量的结果集,因此,查询所有的segment并非必要,只会带来性能上的损失。于是,分层检索就在千呼万唤中出来了。

何谓分层检索,顾名思义,就是只查询一定数量的segment,当结果集够了就不再继续查询,这对搜索引擎查询性能的优化是显而易见的。

但这里存在一个问题,就是对于卖家发布的产品,质量是良莠不齐的,我们需要把质量好的优先搜索出来,所以前面segment的产品质量要高于后面的segment,否则一些质量高的展品就没有展示机会了。比如,我们有3个segment,seg_1, seg_2, seg_3,那么seg_1中的产品质量就要比seg_2中的产品质量高,seg_2中的产品质量要比seg_3中的产品质量高,在每个segment内部并不做要求。

判断产品质量好坏的标准是什么呢?我们引入了一个全局分global_score,每条产品的global_score都是离线计算好的,以此作为分层检索的依据。

如图1所示,在搜索引擎的离线计算中,有个多表join的环节,在多表join的过程中会有一些业务逻辑的计算,global_score就是在这个阶段计算出来的。有了global_score,我们就可以对产品做全局排序了。假如排序之后我们生成3个文件,part_1, part_2, part_3,就要求part_1中每条doc的global_score要高于part_2中的每条doc,part_2之于part_3亦如此,但每个part内部并不要求有序。在后面建索引的过程中,会有一个保序逻辑,以此保证多个segment之间的有序。

全局排序怎么做呢?由于数据量大,我们各个应用的离线计算任务基本上都是运行在hadoop集群上的,全局排序亦如此。要达到上述的效果,即各个partition之间是按global_score有序的,我们采用的方案是:首先对数据进行采样,按global_score进行分区,将定义分区的键写入_partitions文件,再实现自定义的TotalOrderPartitioner(这里实现自定义的TotalOrderPartitioner是为了在输出的单个文件内部将同一家公司的产品聚合在一起,即按company_id聚合,从而大大提高输出文件的压缩比,显著缩短了后面abuild构建索引的运行时间),进行全局排序。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此构建分区。

这里有必要先提一下列的概念,由于单台search能承载的索引量有限,所以数据量大时,需要对数据进行分列,使所有数据尽量均匀分布到不同的列上。比如SC有19列,采用的做法就是根据product_id % 19将全部数据分布到19列上。在做多表join的之后,数据的分列就已经做好了。因此全局排序是对多列的数据分别进行全局排序。

在分层检索项目上线到SC BT集群(预发布环境)时,全局排序需要80min才能运行完成,经分析,大部分的时间耗在采样上面。看了代码,发现每列的全局排序都对应一个job,SC有19列数据,就跑19个job分别对每列数据进行全局排序。排序之前先采样,采样器是在客户端运行的,因此,限制分片的下载数量以加速采样器的运行就显得尤为重要。在优化之前的代码实现中,每个job都是读取对应列的数据,自己独立采样的,而且多个job是串行采样。因此,一个可行的优化方案就是多个job并行采样,但由于我们的产品数据是分列存储的,每一列的数据量也足够大。比如SC现在3.6亿的数据量,单列的数据就接近2千万,因此其实每一列产品global_score的分布是基本一致的,所以,我们是否可以只对一列数据进行采样,然后所有job都共享这一个样本呢?这样就不仅能大大缩短采样时间,而且也不会引入并行的复杂性。答案是可行的。

简单的说,全局排序优化的基本思想,就是根据数据的分布特点,使多列数据的多个全局排序job共享同一个样本。

下面我们来看一下优化后的代码实现:

Vector vecRunningJob = new Vector(build_num);
Vector vecJobClient = new Vector(build_num);
for (int j = 0; j 
<p>其中build_num表示列数,从上面的代码可以看出,对每列数据都会调用makeJob方法,然后提交任务进行全局排序。注意这里调用makeJob方法和提交任务是串行的,不过任务提交后是并行跑的。</p>
<p><span style="font-size: 13px;">?我们再看一下makeJob方法的实现:</span></p>
<pre class="brush:php;toolbar:false">private static JobConf makeJob(JobConf basejob, String inputPath,
        Vector vecInPutFile, String outPutPath, String aggregateField) throws Exception {
    JobConf conf = new JobConf(basejob);
    conf.setJarByClass(DCSortMain.class);
    for (int i = 0; i 
<p>可见,在做好相关设置后,makeJob中会调用sample方法进行采样,也就是说,其实针对每一列的makeJob都会调用sample方法。</p>
<p>再来看看sample方法的实现:</p>
<pre class="brush:php;toolbar:false">private static void sample(JobConf conf, String inputPath) throws IOException, URISyntaxException {
    int jobIndex = 0;
    Path partitionFile = new Path(inputPath, jobIndex + "_partitions");
    conf.setPartitionerClass(MyTotalOrderPartitioner.class);
    conf.set("total.order.partitioner.natural.order", "false");
    MyTotalOrderPartitioner.setPartitionFile(conf, partitionFile);
    if (!sampleDone) {
        LOG.info("sample start ...");
        MyInputSampler.Sampler sampler =
            new MyInputSampler.RandomSampler(1, 20000, 10);
        MyInputSampler.writePartitionFile(conf, sampler);
        LOG.info("sample end ...");
        sampleDone = true;
    }
    // Add to DistributedCache
    URI partitionUri = new URI(partitionFile.toString() + "#" + jobIndex + "_partitions");
    DistributedCache.addCacheFile(partitionUri, conf);
    DistributedCache.createSymlink(conf);
} 

可以看出,我们引入了一个布尔变量sampleDone对采样进行了控制,只在第1次调用makeJob方法时才执行采样操作,后面的创建的job都不再进行采样,而是与第1个job共享同一个_partitions文件,载入到自己使用的分布式缓存中,供后面的全局排序使用。sampleDone定义如下:

private static boolean sampleDone = false; 

顺便提一下采样操作,hadoop内置的采样器有3个:

1)RandomSampler,以指定的采样率均匀地从一个数据集中选择样本;

2)SplitSampler,只采样一个分片中的前n个记录;

3)IntervalSampler,以一定的间隔定期从划分中选择键,对于已排好序的数据来说是一个更好的选择。

RandomSampler是优秀的通用采样器,我们最终也是选择RandomSampler,因为虽然使用另外两个采用器,采样时间更短,但最终数据分布却很不均匀,只有RandomSampler才能达到预期效果。同时,我们将采样率设置为1,最大样本数设置为20000,最大分区设置为10。最大样本数和最大分区只需满足其一,即停止采样。可以通过调整RandomSampler的这些参数达到不同的采样效果。

优化版本上线SC BT之后,全局排序的运行时间从80min降到了30min,缩短了50min。正式环境由于hadoop集群更加强大,全局排序的运行时间更短。

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn