Home  >  Article  >  Database  >  SQL on Hadoop系统的最新进展(1)

SQL on Hadoop系统的最新进展(1)

WBOY
WBOYOriginal
2016-06-07 16:32:261150browse

为什么非要把SQL放到Hadoop上? SQL易于使用。 那为什么非得基于Hadoop呢?the robust and scalable architecture of Hadoop 目前SQL on Hadoop产品主要有以下几种: Hive, Tez/Stinger, Impala, Shark/Spark,?Phoenix, Hawq/Greenplum, HadoopDB, Citusdata

为什么非要把SQL放到Hadoop上? SQL易于使用。
那为什么非得基于Hadoop呢?the robust and scalable architecture of Hadoop

目前SQL on Hadoop产品主要有以下几种:
Hive, Tez/Stinger, Impala, Shark/Spark,?Phoenix, Hawq/Greenplum, HadoopDB, Citusdata等。本文主要讨论Hive, Tez/Stinger, Impala, Shark以及传统开源数据仓库brighthouse的特点和最新进展;下一篇文章会讨论Phoenix, Hadapt/HadoopDB, Hawq/Greenplum。

在互联网企业中一般的基于Hadoop的数据仓库的数据来源主要有以下几个:
1,通过Flume/Scribe/Chukwa这样的日志收集和分析系统把来自Apache/nginx等Server cluster的日志收集到HDFS上,然后通过Hive创建Table时指定SerDe把非结构化的日志数据转化成结构化数据。
2,通过Sqoop这样的工具把用户和业务维度数据(一般存储在Oracle/MySQL中)定期导入Hive,那么OLTP数据就有了一个用于OLAP的副本了。
3,通过ETL工具从其他外部DW数据源里导入的数据。

目前所有的SQL on Hadoop产品其实都是在某个或者某些特定领域内适合的,没有silver bullet。像当年Oracle/Teradata这样的满足几乎所有企业级应用的产品在现阶段是不现实的。所以每一种SQL on Hadoop产品都在尽量满足某一类应用的特征。
典型需求:
1, ?interactive query (ms~3min)
2,data analyst, reporting query (3min~20min)
3,data mining, modeling and large ETL (20 min ~ hr ~ day)
4,机器学习需求(通过MapReduce/MPI/Spark等计算模型来满足)

Hive
Hive是目前互联网企业中处理大数据、构建数据仓库最常用的解决方案,甚至在很多公司部署了Hadoop集群不是为了跑原生MapReduce程序,而全用来跑Hive SQL的查询任务。

对于有很多data scientist和analyst的公司,会有很多相同table的查询需求。那么显然每个人都从hive中查数据速度既慢又浪费资源。我们在online的数据库系统部署的时候都会在DB前面部署Redis或者memcache用于缓存用户经常访问的数据。那么OLAP应用也可以参考类似的方法,把经常访问的数据放到内存组成的集群中供用户查询。
Facebook针对这一需求开发了Presto,一个把热数据放到内存中供SQL查询的系统。这个设计思路跟Impala和Stinger非常类似了。使用Presto进行简单查询只需要几百毫秒,即使是非常复杂的查询,也只需数分钟即可完成,它在内存中运行,并且不会向磁盘写入。Facebook有超过850名工程师每天用它来扫描超过320TB的数据,满足了80%的ad-hoc查询需求。

目前Hive的主要缺点:
1,data shuffle时网络瓶颈,Reduce要等Map结束才能开始,不能高效利用网络带宽
2,一般一个SQL都会解析成多个MR job,Hadoop每次Job输出都直接写HDFS,性能差
3,每次执行Job都要启动Task,花费很多时间,无法做到实时
4,由于把SQL转化成MapReduce job时,map,shuffle和reduce所负责执行的SQL功能不同。那么就有Map->MapReduce或者MapReduce->Reduce这样的需求。这样可以降低写HDFS的次数,从而提高性能。

目前Hive主要的改进(主要是体现在 hive 0.11版本上):

1,同一条hive sql解析出的多个MR任务的合并。
由Hive解析出来的MR jobs中有非常多的Map->MapReduce类型的job,可以考虑把这个过程合并成一个MRjob。https://issues.apache.org/jira/browse/HIVE-3952

2,Hive query optimizer(查询优化器是Hive需要持续不断优化的一个topic)

http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/optimize-joins.html

  • Joins where one side fits in memory
  • Star schema join的改进,就是原来一个大表和多个小表在不同column匹配的条件下join需要解析成多个map join + MR job,现在可以合并成一个MR job

这个改进方向要做的就是用户不用给太多的hint,hive可以自己根据表的大小、行数等,自动选择最快的join的方法(小表能装进内存的话就用map join,Map join能和其他MR job合并的就合并)。这个思路跟cost-based query optimizer有点类似了,用户写出来的SQL在翻译成执行计划之前要计算那种执行方式效率更高。

3,ORCFile
ORCFile是一种列式存储的文件,对于分析型应用来说列存有非常大的优势。 http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/orcfile.html
原来的RCFile中把每一列看成binary blob,没有任何语义,所以只能用通用的zlib,LZO,Snappy等压缩方法。
ORCFile能够获取每一列的类型(int还是string),那么就可以使用诸如dictionary encoding, bit packing, delta encoding, run-length encoding等轻量级的压缩技术。这种压缩技术的优势有两点:一是提高压缩率;二是能够起到过滤无关数据的效果
现在ORCFile中主要有三种编码:

  • bit编码,所有数据类型都可以用。Google’s protocol buffers and uses the high bit to represent whether this byte is not the last and the lower 7 bits to encode data
  • run-length encoding(行程长度压缩算法),int类型专用。
  • dictionary encoding,string类型专用。同时这个dictionary还能帮助过滤查询中的predicate条件。

Run length Encoding对某些列压缩会减少存储3-4个数量级,对内存提升也有2-3个数量级,Dictionary Encoding一般对磁盘空间减少大概20倍,对内存空间大概减少5倍,根据Google PowerDrill的实验,在常见的聚合查询中这些特殊的编码方式会对查询速度有2-3个数量级的提升.

Predicate Pushdown:原来的Hive是把所有的数据都读到内存中,然后再判断哪些是符合查询需求的。在ORCFile中数据以Stripe为单元读取到内存,那么ORCFile的RecordReader会根据Stripe的元数据(Index Data,常驻内存)判断该Stripe是否满足这个查询的需求,如果不满足直接略过不读,从而节省了IO。

关于ORCFile的压缩效果,使用情况和性能可以参考hortonworks的博客

http://hortonworks.com/blog/orcfile-in-hdp-2-better-compression-better-performance/

未来ORCFile还会支持轻量级索引,就是每一列中以1W行作为一组的最大值和最小值。

通过对ORCFile的上述分析,我想大家已经看到了brighthouse的影子了吧。都是把列数据相应的索引、统计数据、词典等放到内存中参与查询条件的过滤,如果不符合直接略过不读,大量节省IO。关于brighthouse大家可以参考下面的分析。

4,HiveServer2的Security和Concurrency特性

http://blog.cloudera.com/blog/2013/07/how-hiveserver2-brings-security-and-concurrency-to-apache-hive/

HiveServer2能够支持并发客户端(JDBC/ODBC)的访问。
Cloudera还搞了个Sentry用于Hadoop生态系统的的安全性和授权管理方面的工作。
这两个特点是企业级应用Hadoop/Hive主要关心的。

5,HCatalog Hadoop的统一元数据管理平台
目前Hive存储的表格元数据和HDFS存储的表格数据之间在schema上没有一致性保证,也就是得靠管理员来保证。目前Hive对列的改变只会修改 Hive 的元数据,而不会改变实际数据。比如你要添加一个column,那么你用hive命令行只是修改了了Hive元数据,没有修改HDFS上存储的格式。还得通过修改导入HDFS的程序来改变HDFS上存储的文件的格式。而且还要重启Hive解析服务,累坏了系统管理员。

  • Hadoop系统目前对表的处理是’schema on read’,有了HCatlog就可以做到EDW的’schema on write’。
  • HCatlog提供REST接口提供元数据服务,有利于不同平台(HDFS/HBase/Oracle/MySQL)上的不同数据(unstructured/semi-structured/structured)共享。能够把Hadoop和EDW结合起来使用。
  • HCatlog对用户解耦了schema和storage format。举个例子吧,在写MR任务的时候,目前是把所有的行数据都当成Text来处理,Text一点点解析出各个Column需要编程人员来控制。有个HCatlog之后编程人员就不用管这事了,直接告诉它是哪个Database->Table,然后schema可以通过查询HCatlog来获得。也省得数据存储格式发生变化之后,原来的程序不能用的情况发生。

6,Windowing and Analytics Functions的支持。

  • 支持Windowing functions :?lead, lag, first_value, last_value
  • 支持over clause : partition by, order by, window frame
  • 支持analytics functions : rank, row_number, dense_rank, cume_dist, percent_rank, ntile

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+WindowingAndAnalytics

https://issues.apache.org/jira/browse/HIVE-4197

Tez/Stinger(升级版的Hive)

Tez是一种新的基于YARN的DAG计算模型,主要是为了优化Hive而设计的。目前Tez/Stinger主要是Hortonworks在搞,他们希望以后把Hive SQL解析成能够在Tez上跑的DAG而不是MapReduce,从而解决计算实时性的问题。Tez的主要特点有:

  • 底层执行引擎不再使用MR,而是使用基于YARN的更加通用的DAG执行引擎
  • MR是高度抽象的Map和Reduce两个操作,而Tez则是在这两个操作的基础上提供了更丰富的接口。把Map具体到Input, Processor, Sort, Merge, Output,而Reduce也具体化成Input, Shuffle, Sort, Merge, Processor, Output。在MR程序里,编程人员只需编写对应的Processor逻辑,其他的是通过指定几种具体实现来完成的;而在Tez里面给我们更大的自由度。其实这个跟Spark有点类似了,都是提供更丰富的可操作单元给用户。
  • 传统的Reduce只能输出到HDFS,而Tez的Reduce Processor能够输出给下一个Reduce Processor作为输入。
  • Hot table也放到内存中cache起来
  • Tez service:预启动container和container重用,降低了每次Query执行计划生成之后Task启动的时间,从而提高实时性。
  • Tez本身只是YARN框架下得一个library,无需部署。只需指定mapreduce.framework.name=yarn-tez

http://dongxicheng.org/mapreduce-nextgen/apache-tez-newest-progress/

Tez/Stinger还有一个最重要的feature : Vectorized Query Execution (该feature在HDP 2.0 GA中会提供)

https://issues.apache.org/jira/browse/HIVE-4160

  • 目前Hive中一行一行的处理数据,然后调用lazy deserialization解析出该列的Java对象,显然会严重影响效率。
  • 多行数据同时读取并处理(基本的比较或者数值计算),降低了一行一行处理中过多的函数调用的次数,提高了CPU利用率和cache命中率
  • 需要实现基于向量的vectorized scan, filter, scalar aggregate, group-by-aggregate, hash join等基本操作单元。

未来工作方向:
Cost-based optimizer,基于统计选择执行策略,例如多表JOIN时按照怎样的顺序执行效率最高。
统计执行过程中每个中间表的Row/Column等数目,从而决定启动多少个MR执行

Impala
Impala可以看成是Google Dremel架构和MPP (Massively Parallel Processing)结构的混合体。

https://github.com/cloudera/impala

Dremel论文: http://research.google.com/pubs/pub36632.html

优点:

  • 目前支持两种类型的JOIN:broadcast join和partition join。对于大表JOIN时由于内存限制,装不下时就要dump部分数据到磁盘,那样就会比较慢
  • Impala各个任务之间传输数据采用的是push的方式(MR采用的是pull的方式),也就是上游任务计算完就会push到下游,这样能够分散网络压力,提高job执行效率。
  • Parquet列存格式,同时能够处理嵌套数据。通过嵌套数据以及扩展的SQL查询语义,在某些特定的场景上避开了JOIN从而解决了一部分性能的bottleneck。(Impala目前支持LZO Text, Sequence, Avro, RCFile, Parquet,?不支持ORCFile。但是未来支持多种存储文件格式是个趋势,所以Impala读取文件这部分功能也需要尽快插件化。)
  • Cloudera Manager 4.6以后会有slow query的分析功能
  • Runtime Code Generation http://blog.cloudera.com/blog/2013/02/inside-cloudera-impala-runtime-code-generation/
  • impala可以直接使用硬盘上的数据而不经过hdfs

缺点:

  • impala不会按照group by的列排序
  • 目前不支持UDF,impala 1.2即将支持Hive UDFs(Java写的)和Impala native UDFs and UDAs(接口类似PosgreSQL)
  • 不支持像Hive的Serializer/Deserializer,从而使得它做从非结构化到结构化数据的ETL工作比较麻烦。所以本质上讲Impala适合MR配合做ETL之后的查询工作。
  • 由于Impala的设计初衷是short query,所以不支持fault tolerance。如果参与查询的某个node出错,Impala将会丢弃本次查询。
  • 安全方面的支持还比较差。impalad之间传输的数据没有加密,不支持表或者列级别的授权。
  • 每个PlanFragment执行尽量并行化,但是有的时候并不是很容易。例如Hash Join需要等到其中一个表完全Scan结束才能开始。

虽然有这么多缺点,但是很多公司还是开始尝试Impala了。以百度为例,百度尝试把MySQL接入Impala的后端作为存储引擎,同时实现相应操作对应的PlanFragment,那么用户来的query还是按照原来的解析方法解析成各种PlanFragment,然后直接调度到对应的节点(HDFS DataNode/HBase RegionServer/MySQL)上执行。会把某些源数据或者中间数据放到MySQL中,用户的query涉及到使用这部分数据时直接去MySQL里面拿。

Shark/Spark

  • 由于数据能放到内存尽量放到内存,使用内存非常aggressive。优点是做JOIN时会比较快,缺点是占用内存太大,且自行管理内存,占用内存后不会释放。
  • 由于Shark借用了Hive的codebase,所以在SQL,SerDes,UDF支持方面和Hive是完全兼容的。
  • 支持fault tolerance

性能:
特别简单的select…where查询,shark性能的提升不明显。(因为hive也不怎么费时间)
但是如果查询比较复杂select…join…where…group by,hive的job数目会比较多,读写HDFS次数增多,时间自然会变长。当内存还足够大的时候shark性能是最好的,如果内存不够装下所有的数据时性能会下降,但还是会比Hive好很多。

SQL on Hadoop产品需要向传统数据仓库学习的地方
以开源数据仓库brighthouse(基于MySQL的数据仓库存储引擎)为例。
VLDB 2008 论文 >

brighthouse的SQL解析用的是MySQL的代码,开发了brighthouse专用的optimizer,executor以及storage engine
brighthouse的数据存储通过三层来组织:Data Pack, Data Pack Node, Knowledge Node

  • DP(Data Pack):brighthouse是列存储的,每个DP存储一列中64K个单元的数据。
  • DPN(Data Pack Node):DPN和DP是一对一的关系,DPN中记录每个DP数据对应的一些统计值(max,min,count,sum)
  • KN(Knowledge Node):DP的更详细的数据信息和DP之间关系的信息

KN又分为一下三个部分:

  • HISTs(Histograms):数值类型列的统计直方图,能够快速判断这个DP是否符合查询条件。
  • CMAPs(Character Maps):文本类型的位图,用于快速查找字符。(优化关键字like)
  • Pack-To-Pack:等值JOIN操作时产生的两个列(DP)之间关系的位图。

DPN和KN相当于DP的一些统计信息,占整个DP的1%的存储空间,所以可以轻松装入内存。他们是为了快速定位哪些DP是跟这个query相关(relevant)的,哪些是不相关(irrelevant)的,哪些是可能相关(suspect)的。从而减小IO读取的数据量,提高性能。

性能测试:http://www.fuchaoqun.com/tag/brighthouse/
从这个性能测试中可以看出:
1,压缩率:infobright比MyISAM/tar.gz的压缩率都要高很多
2,查询性能:跟建了索引的MyISAM表相比,查询速度也要快3-6倍

总之,大家都缺少的是:
1,workload management or query optimization
多个表的JOIN如何执行,例如3个表的JOIN会有6种执行策略,那么哪一种才是效率最高的呢。显然要通过计算每种执行顺序的开销来获得。在传统数据库或者数据仓库领域(Oracle/Teradata/PostgreSQL)都有非常好的查询优化器,而在分布式系统中该如何衡量这些指标(磁盘IO,网络带宽,内存)与最后查询效率之间的关系是个需要认真研究的问题。
2,关联子查询correlated sub-queries还是没有谁能够实现。
在TPC-H中又很多关联子查询的例子,但是现在的SQL on Hadoop产品都不支持。听Impala的人说,他们客户对这个的需求不是很强烈,大部分关联子查询可以转化成JOIN操作。但是目前的商业产品像Hawq/Greenplum都是支持关联子查询的。

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn