《Spark/Tachyon:基于内存的分布式存储系统》-史鸣飞(英特尔亚太研发有限公司大数据软件部工程师)

史鸣飞:大家好,我是叫史鸣飞,来自英特尔公司,接下来我向大家介绍一下Tachyon。我事先想了解一下大家有没有听说过Tachyon,或者是对Tachyon有没有一些了解?对Spark呢?

 

首先做一个介绍,我来自英特尔的大数据团队,我们团队主要是致力于各种大数据的软件开发以及这些软件在工业界的推广和应用,我所在的团队主要负责Spark及其软件栈的开发和推广。我们是国内最早参加Spark开发和推广的团队,我们在2012年就加入了Spark社区。在Spark和相关的项目中间投入了大量的人力,长期以来我们在Spark及相关项目中有超过10位活跃的贡献者,在Spark项目中我们团队代码的贡献量是排名前3的。

 

下面是我今天演讲的内容的概要,首先我会向大家做一个Tachyon的介绍,包括为什么会出现Tachyon,Tachyon的基本架构,Tachyon和现有系统的集成以及Tachyon一些基本的原理,接下来我会向大家介绍我们在使用Tachyon方面的经验和一些应用的实例,最后会向大家介绍一下Tachyon的发展情况和英特尔在Tachyon上的工作。

 

Tachyon出现的背景是什么?首先内存为王这句话这两年很流行,大数据处理对速度的追求是无止境的。内存的速度和磁盘的速度不是一个数量级的,另一方面内存的价格越来越低,内存的容量越来越低大,这就让数据存在内存中间就有了可行性。伴随着这种趋势,大量的基于内存的计算框架也出来了,像Spark和SAP的HANA都是优秀的基于内存的计算框架,但是现有的计算框架还面临一些挑战。Tachyon的出现就是为了解决这些问题的,那么当前基于内存的大数据计算框架到底遇到了那些问题呢?接下来我就以Spark为例,向大家分析一下。

 

第一个问题是数据共享问题,一个集群可能运行着多个计算框架和多个应用,比如说一个集群上有可能运行着Spark,同时还运行着Hadoop,在现在的情况下他们两个之间的数据共享是通过HDFS的。也就是说如果一个Spark的应用结果的输出是另外一个MapReduce任务输入的话,中间结果就必须要通过写入和读取HDFS才能实现,大家知道HDFS的读写首先是一个磁盘的IO,另外由于它的备份策略,默认它有三份的备份,这样又会引入网络的IO,这是一个很低效的过程。第二个问题是缓存数据丢失的问题,像Spark这样的计算框架它的内存管理模块和计算执行器是在同一个JVM里面的,如果它的执行器发生一些异常导致执行出现错误,从而导致JVM退出的话,那么缓存在JVM堆空间中的数据就会同时丢失掉,这样的话就会导致缓存数据丢失。第三个问题是GC的问题,因为现在大部分的大数据的计算框架都是运行在JVM上的,GC的开销就是一个不可避免的问题。对于像Spark这样一个基于内存的计算框架而言,GC的问题尤其突出,它会把大量的数据缓存在JVM堆空间里面,这些数据是计算时需要用到的数据,GC是没办法清除掉的,每一次full GC都会对这些数据做一个全局的扫描,这是很耗时间的,而且随着计算时间的增长和堆内存数据的增加,GC的开销会越来越大。

  

解决方法是什么呢?我们首先分析导致这个问题的根本原因是什么,根本的原因就是现有的基于内存的计算框架缺乏脱离于JVM的内存管理模块。解决方法就是伴随着Spark而产生的基于内存的分布式存储系统Tachyon。Tachyon的设计思想主要有两个,第一个是基于内存的OffHeap的分布式存储,就是一定要把数据存储在JVM堆空间之外,这样的话能够避免GC。第二个是通过在存储层保存数据的Lineage实现容错,这是在Spark中引入的思想,Lineage记录了源数据以及源数据经过什么样的计算得到的当前数据,Tachyon将这些本来是在计算层才关心的信息放到了存储层。Tachyon仅仅保存一份数据在内存中,内存是很宝贵的资源。而HDFS它为了实现容错在磁盘上默认保存了三份,那么如果Tachyon某一个节点是没有这份数据的时候,它会通过网络去读。因为在远端的结点上数据也是在内存中间的,因此远端读取没有磁盘的IO只有网络的开销,所以说也是很高效的。当数据丢失的时候,Tachyon会根据数据的Lineage进行数据的恢复,这个过程有点像Spark中的数据重算,但是它比Spark走的更远。因为Spark中的重算是在程序运行时的操作,当程序运行的时候发现某个节点挂掉了,它会重新计算来管不着数据,问题是如果整个Job已经结束之后,数据再发生丢失就没有办法了,Tachyon可以解决这个问题。因为Tachyon在存储层存储了整个数据的依赖关系,包括了这个数据是由什么样的框架,通过什么样的执行过程生成的,当数据丢失的时候Tachyon会重新启动这些应用然后生新成这些数据,实现数据恢复。

 

这是Tachyon设计的目标,Tachyon在整个大数据处理软件栈中所处的位置,最下层是存储层,像HDFS、S3。在上层有Spark、H2O,Tachyon相当于是在存储层和计算层之间的cache层,Tachyon并不是要替代任何的存储系统,它的作用是加快计算层对存储层的访问速度。这张是Tachyon的基本架构,从这张图大家可以看到Tachyon和HDFS是很像的,有Master和Worker。Master用于管理整个集群中间所有数据的元数据,包括数据的大小、数据的位置。Worker用于管理每个节点上的内存数据,所有的内存数据是存储在Ramdisk上的,Ramdisk是把一段内存空间映射成一个block设备,Tachyon就可以以内存的速度去读写文件。Worker会定期与Master通讯,把Worker上的数据报告给Master,Master会根据Worker汇报的信息给Worker发送命令。在图的最左边还有Zookeeper,它会选择一个最可用的Master作为主节点。还有一个模块在这张图上没有的,那就是Client。它是提供应用程序的编程接口,应用程序通过Client向Tachyon中读写数据。

 

Tachyon中数据的容错有两种,一种是元数据的容错,就是Master节点上的数据容错,另外一种是内存数据容错,是Worker上的数据容错。元数据的容错和HDFS是很像的,通过日志实现的。Image存储了元数据,Editlog记录了最近对元数据的修改记录。而内存数据容错是Tachyon的特有的,例如:Fileset A通过一个Spark Job生成了Fileset B,Fileset C通过另一个Spark Job生成了Fileset D,同时File set C和File set D又通过一个Mapreduce Job生成了Fileset E,这样一个数据生成的过程会在Tachyon中被保存下来,如果Fileset E丢失了,而Fileset B和Fileset D都存在,那么Tachyon就会重启Mapreduce job通过Fileset B和Fileset D重新生成Fileset E,如果Fileset B和Fileset E也都不存在了,那么Tachyon就会重新起用Spark Job生成 FilesetB 和 Fileset D, 最后再启动Mapreduce Job由FilesetB和Fileset D生成Fileset E。

  

现在回顾一下我之前讲的现有的基于内存的计算框架所面临的三个问题,在有了Tachyon之后这个问题是怎么得到解决的?数据共享问题,Spark和Hadoop可以通过Tachyon去存储中间结果数据,如果MapReduce需要Spark的输出结果,可以直接读取Tachyon获得,而不需要访问HDFS。缓存数据丢失的问题,Spark可以将RDD缓存在Tachyon中,这样当Spark的应用Crash的时候这些缓存的数据是不会丢失的。第三个是GC的开销,这个显而易见的,由于数据在Tachyon中,GC不会管理这部分数据。

  

下面向大家介绍一下Tachyon怎么和现有的大数据处理框架集成的,首先是Mapreduce,MapReduce是没有和Tachyon做任何集成的,如果要在MapReduce里面使用Tachyon的话,就要把Tachyon当做外来的包或者是库来引用。第一种方法是把Tachyon的jar包放在Hadoop的Class path里面,第二种是放在Hadoop的Lib的目录里,第三种是作为应用程序的一部分,分发出去。另外还需要对Hadoop做一些配置,需要配置Tachyon文件系统,这样MapReduce就可以直接通过Tachyon加载和写入数据,使用方法和HDFS一样。Spark已经集成了Tachyon,如果在Spark中间使用Tachyon的话,只需要对Spark做一些简单的配置就可以了,在SparkConf里面配置Tachyon Master的URI,这样Spark就可以把所有的RDD的数据缓存在Tachyon中,通过设置RDD的Storagelevel为OFF_HEAP,Spark就会自动的把RDD放在Tachyon里面。如果Spark要通过Tachyon去加载和写入数据的话,就需要在像在Mapreduce中一样配置Tachyon的文件系统,这样的话Spark就可以像读写HDFS一样从Tachyon里面去读写数据。

  

下面我讲一下Tachyon基本的工作原理,首先是它的通讯机制,Tachyon使用thrift进行通讯,可以通过配置Master Client和Worker之间的接口自动生成。还有就是Herabeat通讯,是为了在Tachyon各组件之间保持连接关系,Master和Worker也会通过Heartbeeat交换信息,Worker会把自己节点上最近增加的数据或者是数据的改变提交给Master,Master会根据Worker提供的文件信息去修改Master上的元数据,Master也会返回给Worker一些信息。如果Worker提供给Master的文件信息的元数据在Master已经没有了,Master就会告诉Worker删掉该文件。如果Worker有一段时间没有和Master通讯,Master就会认为Worker已经和他断开连接,当Worker又和Master通讯的时候,Master就会告诉Worker需要重新注册,把节点上所有的文件信息重新发给Master。Worker也有自检措施,如果它检测到和Master通讯超时,会重新向Master注册。Client和Master Worker之间都是有连接的,Client发送给Master的Heartbeat,Master暂时没有处理。Client和Worker之间的通讯就是为了维持Client和Worker之间的关系,如果Worker检查到Client连接超时,Worker就会释放掉分配给Client的资源。

  

Tachyon里文件的组织方式,首先介绍Worker上的组织方式,Worker有两种文件系统,一种是Ramdisk,即内存文件系统,另一种是底层文件系统,最常用的是HDFS。在内存文件系统里面,文件是以Block的方式存储的。在底层文件系统上以整个文件的方式存储的。在内存文件系统上文件名是BlockID,而在底层文件系统里文件名是FileID。Tachyon的元数据的组织,大家可以看到这是一个树状的结构,每个节点都是一个Inode,Inode记录了文件的信息,所有的文件都从根节点开始的,根据路径的名字可以一步一步的找到。如果Inode代表一个目录,它会记录自己目录里面所有的子目录和文件,如果Inode代表一个文件,它会记录这个文件所有的Block,以及这个文件是不是在底层文件系统上有一个备份,以及备份的文件在的路径。

  

应用通过Tachyon Client读写数据,读数据时,Client向Master发送请求,从Master获取要读取的Block的信息,包括Block的ID和位置信息,拿到这个信息之后Client首先请求Worker lock该block,表明该block正在被访问,Lock之后Client就会读取文件,读取完了之后再要求Worker去unlock这个文件,最后还要求Worker更新该block的访问时间,这是因为在写数据的时候,如果空间不足Worker会根据访问时间做基于LRU的文件删除操作。如果这个文件并没有在本地Worker上,Client就会去Remote Worker上去读,Remote Worker在接受到请求之后会以通过网络将数据传回给Client。

  

Tachyon在读取数据的时候有两种读取方式,第一种是CACHE的方式,意思是说如果本地有数据那么直接读取,如果本地没有数据从远程读数取时,读完之后会在本地创建一个缓存的副本。这个策略的目的是说,用户认为数据会在接下来还会反复被使用,与其说从远程反复去读取数据还不如直接在本地创建一个副本,以节省开销,如果是从底层文件系统读呢?CACHE策略也会在本地的内存中创建一个副本,而No-cache的策略是只读一次,用户认为接下来不会再去访问这个文件了。

  

写文件时,Client向worker申请内存空间,Worker首先判断自己的内存空间还够不够,如果不够的话它就会根据某种特定的算法,在当前是LRU的算法,将最近没有被访问的block文件直接的删除掉,释放空间。在分配好内存之后就会告诉给Client分配成功,Client就会要将他要的数据写进本地的Ramdisk。Client写完之后会通知Worker去cache这个文件,cache的过程是从把数据从用户目录移动到数据目录,而Worker在cache完之后会发给Master新的block文件的信息。在写入方面也是有多种策略的,首先有一个MUST-CACHE,Client要求必须把文件写在内存中间,如果内存不够没办法写的话,Tachyon就会报错。而TRY-CACHE就是说尽可能的把数据写进内存中。而THROUGH是直接把文件写进底层的文件系统,不会写内存。而CACH而-THROUGH就是会保存两个拷贝,而ASYNC-THROUGH就是将文件写进本地内存直接就保存了,Tachyon从内存备份到底盘文件当中。前2种策略是对读做了优化,如果这个文件只是一个临时文件不需要做永久的存储,而且它在写之后可能马上会被读取甚至是反复的读取,要把它放在内存中间,并且由于它是一个临时文件不需要在底层文件做永久的存储。而THROUGH是只在底层的文件系统写入这个文件,如果这个文件是一个应用的输出结果,而且在写入之后在短时间之内是不会被访问的,那么就把这个文件直接写入到Tachyon管理的底层文件系统的空间中。如果这个结果在将来被使用的话,那可以根据用户的需要把文件新放进Tachyon的内存中实现快速的访问。而CACHE-THROUGH就兼顾了上面的两种,而异步的THROUGH不保证存储到底层文件系统,它是为了提高响应的时间、减少延迟,但是它和CACHE-THROUGH的最终效果是一样的。

  

Tachyon的用户接口,Tachyon提供了两种用户接口,第一种接口是命令行。这和HDFS的命令行接口很像,提供一些基本的文件系统操作命令,像cat、ls、mkdir、rm等,是为了方便用户对Tachyon内存中间的文件做一些最基本的操作。第二种接口是编程接口,在Tachyon中主要有两个为用户程序提供服务的接口,一个是TachyonFS,这是Tachyon提供的最基本的编程接口,这里面涵盖了所有Tachyon提供给用户程序的功能,其中包括像delete、mkdir、rename等,通过对这些基本的功能编程,可以实现文件系统的操作。另一个是TachyonFile,提供了一些更上层的接口。比如说获取一个文件的InStream 或者 OutStream。

  

我们在Tachyon上的使用经验,第一个是我们团队自己开发的日志数据处理的原型系统。在github上可以找到这个项目,它的名字叫Thunderain。流式的数据首先会被放在Kafka里面,它是一个消息队列,在Kafka里面的数据经过SparkStraming处理后会被写进Tachyon的In-memory Tables里面,因为这些数据是存储在In Memory Table里面,它的访问数据是很快的,所以可以支持在后台运行一些在线的分析或者是交互式的查询,对响应时间延迟比较敏感的应用。图下方的处理流程,Kafka中的数据还可以通过ETL的处理保存在HDFS里面,这些数据可以作为历史的数据,历史数据和Tachyon 内存数据可以做组合的查询。比如说一个视频网站,用户对视频点击的日志可以通过上面的流式处理,后台的应用人员就可以很快速的会去查询到,在最近的这段时间里面什么视频被播放的次数最多,哪些视频是最热门的,而历史数据的作用是,在线数据可以和历史数据做一个对比,就是说在前一天或者是上一个小时,什么视频是最热的。

 

另外一个Tachyon应用实例是OffHeap的存储,这是我们做过国内某视频网站的case,这个case的目的是做视频内容的推荐,这是一个图算法N度级联的问题。N度级联的意思是计算一个图中2个节点之间跳跃N次,经过N次的条约他们之间相关度是多少,就是说有N距离的相关度是多少,它的算法在这里。它的算法大概是这个意思,假定两个节点之间有M条长度都是N的路径,那么Weightk(X、Y)表示第K条路径的权重?N度级联要求的就是所有的M条路径的权重的合,每一条路径的权重等于整个路径上所有边的权重级,因为每条边的权重都是一个0到1之间的数,权重积必定也是0到1的浮点数,而且随着N的增大他们的相关度是减小的。在现实当中这个是可以在社交网络里面计算不同用户的相关性,电商网站的产品推荐等。在解决这个问题的时候,我们用了两种图的框架实现的,一种是bagel一种是Graphx。首先我讲一讲Bagel的实现, Spark在实现Bagel的时候,把每个superStep中生成的节点的新的数据和该节点发送给下一个superstep的各个节点的的消息都放在了Spark的RDD中,我们发现随着迭代次数的增加,GC的开销是相当大的,因为数据量是很大的,每次迭代都会把这个数据cache起来。我们的解决方法就是用Tachyon去缓存了这一部分的数据,相当于让Bagel运行在Tachyon上,这样的好处是能够解决大量GC的问题。第二种实现是GraphX,它的计算过程是这样子的,首先是收集阶段,它会收集每个节点和边的数据,然后生成发送给各个节点的消息,在计算阶段每个节点会根据收到的消息生成新的节点数据,在这一步就相当于是从一个旧的图生成了一个新的图。在这个阶段Spark其实也是把这一部分的数据给缓存起来了,我们在使用的时候也是把这个数据缓存在Tachyon上,然后缓解GC的问题。

 

第三个用户的case是远程数据访问,用户有多个集群,其中有一个集群是专门提供存储服务的,还有若干个计算集群,在这些集群上有可能运行的Spark或者Mapreduce。他们有这样的需求,在某个计算集群上,应用需要多次的去访问提供存储服务集群上的数据,这个开销是很大的,相当于在两个集群间做数据的读取。我们提供的解决方案就是使用Tachyon,应用只需要从远程提供存储服务的集群上读取一次数据,把数据缓存在本地Tachyon上,然后在Tachyon上对数据做反复的访问和计算,这样的话就可以节省大量的数据读取的开销。

  

Tachyon适用的场景,第一个场景计算中间结果需要在不同的应用和计算框架中间共享的时候,也就是说中间结果有可能会被不同的后台应用所使用。第二个场景是需要快速响应对延迟比较敏感,比如后台的用户有可能会做一些在线的查询,或者是一些交互式查询的时候,使用Tachyon其实可以起到提高响应和降低延迟的效果。第三个是内存数据量比较大,并且拥有长时间和迭代式的计算需求,我们之前做过一个用户的case里面使用了Tachyon之后,在性能上也提高了30%以上。第四个场景是需要多次访问大量的远程数据,Tachyon的作用就是可以把远程的数据放在本地做多次的访问,这样可以减少远程访问的开销。但是Tachyon也是有局限的,第一个局限是CPU的负载会增加,因为Tachyon中数据是以文件的方式存的,那么就会有序列化和反序列化的开销,这是一个很消耗CPU的工作。第二个局限是Tachyon暂时只能使用内存做存储空间,这个局限在接下来的版本就不会存在了,因为我们现在正在用其他的高速存储器像SSD去扩展Tachyon的存储空间。

  

Tachyon当前的发展状况,Tachyon是一个很新的项目,是2012年的夏天开始的,有5个发起者,最左边的是李浩源,他是Tachyon的作者,是个中国人,现在UC Berkeley读博士。Tachyon的现在最新的发行版本是0.5.0,主分支版本是0.6-SNAPSHOT,Tachyon在国内还很少有公司使用,国外已经有50多家公司在尝试使用了。现有的Spark和MapReduce的程序可以不经过修改直接在Tachyon上运行,Spark将Tachyon作为默认的Offheap的存储系统,上面的这些是对Tachyon有贡献的一些组织,在国内有两所大学在做Tachyon方面的工作,南京大学和清华大学。

 

最后讲讲英特尔的贡献,我们有3位贡献者,一共有100多个提交,这些提交里面,我们做了重要的功能组件,也做了提高可用性和易用性的工作,还有一些BUG的修补,接下来我向大家介绍我们做的一个功能组件:多层级的本地存储。它解决的问题就是我刚刚提到的,Tachyon现在只能使用内存作为它的存储空间。我们使用SSD、HDD去扩展Tachyon的存储空间,整个存储结构是一个金字塔的结构,最上层空间比较小但是速度很快,中间层和下层是速度慢一点但是空间很大,我们通过设计一些策略,将最热的数据放在最顶层,达到比较快的访问速度,暂时不太热的数据放在SSD或HDD上,当数据将来再次变热的时候可以重新把它给放回内存中间。这样的好处是当内存不够的时候可以把数据放在SSD或HDD,而不是将数据丢掉,同时对性能没有太大的影响。

 

最后希望大家能够尝试着使用Tachyon,因为从我了解的情况来看,国内的公司用Tachyon的很少,也希望大家能够加入Tachyon的社区,Tachyon的项目发布在github上,大家在使用的时候发现有任何的问题可以给我们提一些意见。

 

PPT:http://qiniuppt.qiniudn.com/shimingfei.pdf

视频:http://qiniu-opensource.qiniudn.com/ecug-2014-shimingfei.mp4