Tachyon帮助Spark变小时级任务到秒
发布日期:2016-4-6 16:4:37
Tachyon帮助Spark变小时级任务到秒 本文的作者是Gianmario Spacagna与Harry Powell,Barclays的数据科学家。 集群计算与阿里云大数据技术已经取得了很多进展,然而现在很多大数据应用使用的还是HDFS这一分布式分件系统。HDFS是一个基于磁盘的文件系统,将数据存储在磁盘上有一些问题,比如面临法律的监管,而且,由磁盘读写造成的延迟也比较高。要避免这些问题可将处理过的数据暂时放在内存中。Tachyon可以帮你让这些数据长期处于内存中并且还能在不同应用之间共享。 在巴克莱我们没有把数据存储在HDFS上,而是使用了RDMBS关系型数据库,我们还开发了一套让Spark从RDBMS直接读取数据的流程。对于数据库的schema我们作为读取数据的一方并不完全清楚,因此我们先读取为动态类型的Spark DataFrame,分析了数据结构与内容之后再转换为RDD。 这套流程有一个弊端。由于我们的数据集比较大,所以从RDBMS读取阿里云数据要花挺长时间。虽然按理说我们不应该频繁地读取数据,但Spark缓存的数据一崩溃一重启就丢了。这时候就得重新读取数据一次,这样的话来一次我们的系统就得挂一半个小时,一天重读个几次是很常见的。哪怕我们做完了数据的映射之后,运行Spark job也还得重新读取数据,加入新特性,改模型,测试的时候都得这么干。 所以我们找到了Tachyon。Tachyon现在已经改名为Alluxio,它是一个数据存储层,它让所有的Spark应用可直接通过文件API来读取数据。既方便与现有应用的集成又很简单。 现有架构的问题 如前所述,最主要的问题就是数据的加载。虽然Spark有缓存功能,但当我们重启context,更新依赖或者重新提交job的时候缓存的数据就丢失了,只有从数据库中重新加载这一个办法。 图1表示加载数据到6个Spark节点所需要花费的时间(以分钟计)。横坐标代表阿里云数据的行数,绿色代表是晚上六点数据库比较闲的时候,灰色代表是早晨十点数据库使用率比较高的时候,蓝色是下午两点数据库非常忙的时候。 图1 从图1我们可以看出加载数据的时间从几分钟到几小时不等。考虑到我们一天要重启很多次,所以光靠Spark的缓存肯定是不够的。我们想要达到的目标有下面三点:
这三点汇成一句话其实就是要一个内存存储系统。 Tachyon Tachyon不单位我们解决了数据存储的问题还将目前的部署速度提升到了一个新台阶。作为一种内存分布式文件系统,Tachyon可以存储任何文本格式或Parquet、Avro和Kryo等高效数据类型。我们还可以将Tachyon结合进Snappy或LZO等压缩算法来减少对内存的占用。 与Spark应用的集成非常简单,仅需调用DataFrame与RDD的加载存储API并指定路径URL和Tachyon协议即可。 我们存储原始数据的目的是快速地迭代探索式分析与测试。现在我们可以直接从原始数据来构建最简可行产品而不用在数据的处理上多花时间。图2是我们部署Tachyon之后的工作流程。 图2 橙色箭头代表我们将数据的中间结果存储到Tachyon,以方便以后读取。 Tachyon的配置 在巴克莱我们将Tachyon配置为与tmpfs文件系统配合(unix系统中的路径为/dev/shm),Tachyon主节点上的配置由下面五步组成: 1.修改tachyon-env.sh配置文件 export TACHYON_WORKER_MEMORY_SIZE=${TACHYON_WORKER_MEMORY_SIZE:-24GB} TACHYON_JAVA_OPTS中我们则保留默认配置: -Dtachyon.worker.tieredstore.level0. -Dtachyon.worker.tieredstore.level0.dirs.path=${TACHYON_RAM_FOLDER} -Dtachyon.worker.tieredstore.level0.dirs.quota=${TACHYON_WORKER_MEMORY_SIZE} 2.将配置复制到worker节点中 ./bin/tachyon copyDir ./conf/ 3.格式化Tachyon ./bin/tachyon format 4.部署Tachyon。注意NoMount选项,NoMount不需要root权限: ./bin/tachyon-start.sh all NoMount 图3是我们整个架构的示意图: 图3 Tachyon与Spark的结合使用 Tachyon中数据的读写很简单,因为它所提供的文件API同Java类似。往Tachyon中写DataFrame: dataframe.write.save("tachyon://master_ip:port/mydata/mydataframe.parquet") 从Tachyon中读取DataFrame: val dataframe: DataFrame = sqlContext.read.load("tachyon://master_ip:port/mydata/mydataframe.parquet") 写入RDD: rdd.saveAsObjectFile("tachyon://master_ip:port/mydata/myrdd.object") 读取RDD: val rdd: RDD[MyCaseClass] = sc.objectFile[MyCaseClass] ("tachyon://master_ip:port/mydata/myrdd.object") 这里要注意MyCaseClass的类型需与写入时一致,不然的话会出错。 效果 我们使用Spark、Scala、DataFrame、JDBC、Parquet、Kryo和Tachyon创建出了一套数据项目流程,它不仅具有扩展性好和速度快等优点,质量也足以直接部署到生产环境中。 Tachyon使我们能够直接读取原始数据而不必从数据库中加载。数据写入Tachyon之后也可以迅速开始分析,提高了工作的效率。 使用Tachyon将数据存储在内存中读写只需几秒钟,所以在我们的流程中扩展几乎不影响性能。迭代一次所需的时间从以前的几个小时降低到了现在的几秒钟。 未来展望 Tachyon自身还在发展,我们对Tachyon的使用也还在进一步的探索中,所以有一些限制是难免的。下面我们就列出了一些还有待提高的地方,如以下四点: 1.我们没有指定底层的存储设备所以Tachyon所能够存储的数据量受到所分配空间的限制。Tachyon提供了分层存储功能来解决这个问题。 2.设置JDBC驱动、分区策略和类映射还比较麻烦而且不够易用。 3.Spark和Tachyon共享内存,所以为了避免重复和过度的垃圾回收还需要做一些调整。 4.如果Tachyon worker挂了,数据就会丢失。因为我们没有使用分层存储所以Tachyon自己不会重新加载数据。对巴克莱来说还好但对于其他企业来说可能还需要进一步的改进。 总的来说我们还是很看好Tachyon的,它应该会对企业中的数据项目有所影响。
|