Pandora 大数据平台演化和 APM 实践

编者按:2017 年大数据发展飞速,大数据也逐渐应用到各行各业中,越来越多的公司开始部署大数据战略。同时,大数据技术也使得商业发展的速度更快、效率更高。本期实践日让我们随着七牛资深大数据工程师崔文正一探大数据的魅力,以下是演讲内容的整理。

崔文正,资深大数据工程师。曾在百度大数据平台部门任技术负责人。于2016年加入七牛云,负责 Pandora大数据平台的建设工作。主要负责实时流数据处理平台、时序数据库,以及离线数据仓库等各项目的架构与开发工作。承载公司每天数千亿、近百TB增量的大数据业务。

|为什么有 Pandora


图 1
一个大数据工程师需要做哪些工作?如果要做大数据分析,那么首先就要收集大量的数据,要有类似如 Logstash 和 Telegraf 这样的数据收集器,以及 Flume 这样的高性能数据管道。数据收集起来之后就要考虑这些数据的存储问题。如果需要实时计算,就需要 Kafka 这样的分布式消息队列做存储。如果做离线分析,就需要搭建 Hadoop 集群等,将数据存储到HDFS 上。存储的问题解决了之后就要考虑如何进行分析、计算。无论是实时计算,还是离线计算,首要的就是搭建资源管理、调度平台如 Yarn 和 Mesos,并基于此选择合适的计算引擎和工具,比如 Hadoop MapReduce 或者 Spark。当然有了计算平台和计算结果,大数据分析的工作还远没有完成,大数据工程师还需要将数据可视化为生动的图表和动画,让业务人员可以更直观的发掘数据的价值,做出正确的判断。这所有的工具都搭建起来了,还需要做平台的监控和调优,要保证这套复杂的系统稳定、高效的运转也是要付出很大的努力的。

因此在大数据工程师想要做一件事情的时候,会首先面临着以下几个重要的问题:

  • 大数据组件必须配套成体系才能发挥
  • 搭建可靠的大数据分析平台很繁琐、困
  • 运维数据组件稳定性需要丰富的经验和很的投入
  • 存储和计算成本很高
    工程师需要搭建很多的技术组件,而且往往是从数据采集到数据的存储到数据的计算,到最后可视化都是缺一不可。但是搭建可靠的大数据平台,运维、存储和计算的成本很高。

|为什么做 Pandora

我们推出大数据产品 Pandora,就是想从存储到数据的可视化,可以提供给工程师一个完整的大数据分析工具集,可以简单地进行数据分析,挖掘数据的价值。致力于为大数据工程师提供以下最优服务:

  • 从存储到数据可视化,全栈的数据分析产品
  • 用户使用 Workflow 管理自己的数据流,只需极少的大数据背景
  • Workflow 实现可视化数据流监控,降低运维成本
  • 集成优秀社区组件,优化并能做的更好

|Pandora 是什么

图 2

图 2 是 Pandora 几个产品的截图。其中左边 是 Pandora 最核心的组件,叫工作流。工作流中起始节点代表了数据收集的源头节点,可以使用开放 API 和数据收集组件将数据导入数据源。其中不同的中间节点代表了不同的流式处理逻辑。比如计算任务包含了流式 SQL 和自定义计算两种计算方式,实时进行数据加工。又比如数据导出,可以将数据导出到多种 Pandora 下游产品如 LogDB、TSDB 或者七牛的对象存储。也可以将数据导出到用户自己的 HTTP 服务,或者 Mongodb 等数据库服务。

图 3


图 4

图 3 和图 4 分别是 Pandora 服务组件和技术组件的架构图。数据源可以是来自日志收集工具,也可以是客户 WEB 服务前后端埋点导入数据,也可以来自 IoT 设备的实时数据,也可以是来自七牛云存储的离线数据。针对于常见的文件、数据库、消息队列等数据源的数据收集场景,我们提供了一套灵活可靠的数据收集工具 Logkit,它可以根据客户端的数据产出情况进行同步或者异步,多并发发送。可以结合服务端的情况对数据进行重组和拆分,智能重试。保证在任何复杂的情境下数据尽可能的得到完整的导出。

Pandora 的实时管道 Pipeline 是基于 Kafka 的 Topic 的一层抽象。用户只需要关心数据仓库(Repo)的概念,关注数据仓库的元信息和实时数据。而 Pandora 会根据用户的数据量在指定配额内尝试进行分区数量调整来进行适配,其次也会在后台进行用户集群的纵向切分,以此实现分集群的升级和运维操作。

这些数据由各种工具或者组件导入到 Pandora 的实时流管道中之后,就可以利用 Spark Streaming 的流式 SQL 或者自定义的计算逻辑进行分析和处理。而后导出到多种数据下游,包括用于监控、预警的 TSDB,可以搜索定位日志的 LogDB,以及支持大规模离线分析的云存储。每个服务我们都提供了开放 API,支持用户对各种类型数据在 XSPARK 进行统一的分析。

如之前所述,Pandora 这套实时计算最核心的部分就在于,多 Kafka 集群能够容纳大量的实时数据,导出服务数据流动快达到秒级延迟的水平。TSDB 可以实时监控系统指标,LogDB 搜索定位具体时刻具体日志信息。KODO 对象存储可以做为 HDFS 的替代存储方案,可以存放大批量离线数据,使用 Spark 进行多种离线分析。

Pandora 的系统架构和功能特点

|Kafka 集群


图 5

接下来介绍一下 Pandora 的主要技术组件和它们的设计发展过程。由于时间原因,这一次我主要跟大家分享多Kafka集群和实时导出服务的技术演进。

首先 Kafka 集群是 Pandora 的核心组件,它的健康稳定是保证数据流稳定通畅的基础。我们在技术选型过程中没有选择直接使用 Kafka,而是用 Confluent,它的 Schema Registry 功能为我们提供了便利的 Schema 管理功能,Rest API 提供了语言无关的接口,考虑到 Pandora 多种语言混合的特点,Confluent 非常契合我们的需求。首先 Kafka 集群是 Pandora 的核心组件,它的健康稳定是保证数据流稳定通畅的基础。我们在技术选型过程中没有选择直接使用 Kafka,而是用 Confluent,它的 Schema Registry 功能为我们提供了便利的 Schema 管理功能,Rest API 提供了语言无关的接口,考虑到 Pandora 多种语言混合的特点,Confluent 非常契合我们的需求。

元数据的加载和校验对数据写入性能有着至关重要影响,尤其是在数据吞吐量达到了每秒上百万的数据量级。基于这个考虑,每个数据入口的元数据都必须要有多级缓存,减少加载元数据带来对数据导入的性能损耗。


图 6

为了应对这个最初原始的需求,就得到这样的 Kafka 集群平台(图 6)。

首先我们会使用 Meta Server 去管理 Repo 的元数据,Meta Server 收到创建 Repo的请求,通知调度系统在集群创建 Kafka Topic,将 Repo 和 Topic 的映射关系更新到qconf 缓存系统中。由 qconf 负责更新数据库和缓存系统。各种任务不会直接访问 Mongodb,而是通过访问qconf,获得数据路由信息,通过集中式缓存和本地缓存降低了加载元数据的消耗。

Points Gate 负责写入数据,Transform 负责读取、计算并且写回数据到Kafka,Export 从集群批量 Load 数据并导出至数据目的地。


图 7

在发展过程中,我们发现数据和分区的数量增长越来越快,最开始的时候数据每分钟数十万左右,在几周内,迅速增长到每分钟上亿条数据的量级。

我们面临的有两方面的挑战,第一个问题是,发现在数据量上涨的时候,Kafka 随着单集群的Partition 数量增大遇到了一些性能问题。第二个问题是,在演化过程中,SPARK 从 1.6 升到 2.1。SPARK 在 2.1 的版本可以支持流式和离线计算的统一,为了支持2.1的功能,需要有多个版本的 Kafka。


图 8

所有就有了两点重大的改造:

1、 我们引入了多 Kafka 集群的概念。首先改造了元数据管理,将数据仓库映射到指定集群的指定Topic。在集群路由的时候我们会有多种策略,比如说这个数据集群可能依赖于新版 SPARK,那么就会分配到 Kafka 0.10 集群。如果某集群需要维护,会将新数据迁移到新Kafka 集群,并且记录迁移 Offset。等待所有旧数据过期后进行集群维护升级。有如此纵向切分能力之后使得 Kafka 集群维护更加简单灵活。这个架构带来的好处是,每一个集群都可以维护在 10 个节点到 20 个节点之间,Partition 数量过多的时候可以及时进行多集群横向扩展。

2、将 Points Gate、Transfom、Export 进行一个容器化,节约成本,会在固定的时间扩容和缩容,节省下来的成本可以用于其他后台的应用。这个极大的符合七牛业务特点,在晚间会有明显数据请求高峰的特点。容器云平台动态扩容的能力让 Pandora 以最经济的方式承载大量的数据。

|Export 实时数据管道


图 9

在数据流上,实时数据管道我们也遇到很多挑战。导出任务面对着 Kafka 的上游,也面对着各种特点不一的下游产品。下游产品如时序数据库,吞吐量高,写入性能极佳,往往导出任务应对比较自如。但有些下游产品如 ES,在导入数据时候需要创建索引,这是一个比较耗费性能的工作。 Export 需要了解自己的所有的下游,可以调节并发数量,以及并发的大小,最大化他们的乘积。同时需要 Export 能根据下游的一些反馈,能够根据下游的反馈去做动态的调整。同时不同的导出任务需要资源不一样,如网络密集型可能需要网卡有优势的节点,如压缩任务密集的导出任务就需要 CPU 密集的计算型节点。


图 10

如何构建数据管道系统?这是最初的版本(图 10),有一个调度系统,每创建一个 topic 就会去 Zookeeper 写一个新导出的任务,Export Worker 负责去争抢任务。即 Export Worker 会监听 task 节点,如果有新增的任务,就争抢任务,谁争抢到就会进行导出的工作。这个架构是非常简单,同时里面是存在重大的隐患。实际上如果说有一个数据量非常大的任务,无论哪个实例争抢到,如果单机资源被这个任务用尽,那 Worker 可能再也无法及时导出数据。

这在我们的实践中是遇到这类问题的,我们公司存储团队的日志就有达到 40-50W/s 的数据量级,即便拿出最好的机器也会发现单机上网卡都打满,在没有进行业务拆表等调优手段的时候完全无法支撑业务。


图11

在图 10 上看来,从任务整体上看是分布式的,但是单个数据仓库消费不是分布式的,造成大任务容易达到单机吞吐量上限。我们对单个数据仓库消费如图 11 进行拆解,按照 Partition 级别进行调度和导出。增加了 Export Master 这样一个节点,它会根据 task 任务、每个属性信息、每秒的数据数量是多少、根据每一个属性计算权重生成 partition task,由 Master 分配给指定的Worker,监听到分配信息的 Worker 去领取某个 Parition 的任务,进行消费。这样不用 Partition 的数据分布在不同的机器上,如果有大型的数据仓库我们只要调节它的 Partition 数量就可以达到调节吞吐量的效果。
但是用这样的系统之后发现还是不满意,有很多的问题,例如 Export Worker 可能从 Confluent 去拉数据,这个时候会规定一个值,或根据流量去推算一个值去往下游去打,但这个时候下游可能无法承受这样量级的导入数据。当 ES 数据太大的时候,就会响应超时。在数据量小,并发量很高的时候,响应时间也会得到一个飙升。


图 12

所以针对这个问题,提出这样一个简单一个修复版本,拉到一个批数据,会并发打到目标数据源,这个数量和时间是不同的。这个做了之后,有一个非常显著的提升,下游能更均匀接收到数据。但是当这样做之后,又发现新的问题。在 Export Worker 并发去打一瞬间,可能有 5 个并发,这一瞬间接收到并发量非常大,导致这一波的数据的响应时间都是非常高的。而且需要把这一波数据全部打完才能去打一下个数据,等于实际上都在等待最慢的一个去执行完成。

从监控系统得知,响应的时间是波浪形。当一块打的时候响应时间很高,当消费完以后,响应时间会很快下来,这样会对系统造成一个持续的冲击。


图 13

所以说在这一点借鉴 Flume 的方式,不再让请求之间互相等待,而是构造 Transaction Queue 队列,Export Worker 对应 Source ,将这个数据持续并发打入 Transaction Queue 里面,然后可以指定并发大小。例如 ES 我们实验下来一般情况下 1 万的 Batch 大小比较合适,根据数据量调节 Sink 个数。
这样就会非常便利,就是往里面打数据,然后积累数据,但是这样就是有一个问题,Transaction Queue 是一个内存中一个队列,可能存在当我们停机或者重启服务、要更新服务时怎么办?因此会有一个 Disk Queue 磁盘队列,去缓存同步。当机器挂掉后,会从最近的 Snapshot 恢复导出的数据。
这样优化下来整个 Export 的拓展性就会变得非常强。为什么不直接用 Flume,因为系统需要做很多的配制,首先是有很多用户去做不同的事情,然后必须要有自动化的过程。这个过程需要服务能够上报流量,而且需要根据流量进行数据调节,而 Flume 是一个固定配置的数据管道,并不能灵活调节,所以我们并没有最终采用。
大家可以看到做这个简单的数据管道的服务,花了很多的心血在不断看系统哪里有问题,不断去优化,支持各种用户低延迟地观测到自己收集的数据。

TSDB

TSDB 关键技术点

  • Influxdb 的集群解决方案
  • 根据时序进行纵向集群切分
  • 根据 Tag 进行数据库表横向切分
  • 多备份,高可用 单机大吞吐量,可以达到 60w/s
  • 多租户用户多级别隔离

Grafana 标准监控接口支持


图 14

TSDB 是自研数据库(参考 Influxdb),提供集成解决方案。首先可以支持做备份集合。
在最初的集群版本时使用 TSM 底层引擎生成 group,然后根据时间序列,会跟这个进行时间上的区分,按照数据大小和时间区间切表。

但在实践的过程中发现 Influxdb 是一个很吃内存的数据库,实际上 Influxdb 聚合指标过高时,内存会发生暴涨的情况,达到单机无法承担的水平。因此不仅需要在时序上做纵向切分,同时需要根据 tag 实际情况做一个纵向切分,从而做到无限制的水平扩展。

当数据量达到一定程度,Influxdb 是无法继续运行的,但是 TSDB 实际上可以以通过增加资源继续进行扩展。

|LogDB

LogDB 关键技术点

  • 多 ES 集群解决方案:不同集群适配不同种类索引
  • 根据历史流量进行动态索引平衡、索引清理
  • 对ES的搜索进行分步控制、优化,提升效率
  • 支持 Kibana、Grafana 等 ES 社区常用工具


图 15

LogDB 是基于 ES 针对于日志所做的搜索引擎,我们也发现和 Kafka 集群类似,当Shard 过多的时候 ES 稳定性有很大的隐患。所以我们也用同样的思路,使用多 ES 集群,让不同的集群担负不同的职责,可能有一些集群是固定的大流量的数据,可能是大量小数据集合,可能是早晚高峰区别特别明显的集群,不同集群设置不同参数和配额去处理不同的事情。

如图 15 的架构图,LogDB 会根据历史流量进行索引的动态平衡和索引的清理,也会对 ES 的大搜索进行分步优化,保证搜索体验。

最近还有一个比较新的功能,就是定时做数据快照同步冷数据到云存储上。老的数据搜索会从快照恢复,速度虽然会慢一些但是会很大程度降低存储成本,节省客户的支出。

|XSpark

XSpark 关键技术点

  • 基于七牛云存储作为数据仓库
  • 基于容器云平台实现可动态伸缩的Spark集群
  • 集成数据可视化、监控、集群管理


图16

如图 16 XSpark 实际上是基于七牛云存储做的数据库,然后集成数据可视化监控的平台。Pandora 提供这样的部署,按照用户制订的规格,会进行一个调度。现在七牛大数据平台达到每分钟实时写入的数据增量达到上百 GB,每日增量数据数百 TB,每分钟写入的数据条目达到数亿,每日数千亿增量数据,由于七牛云存储的低成本帮我们减轻了很多存储上的压力,Pandora 实际上机器用不并不多,可能所有服务加上只需要 70-80 台,未来也会提供给后续接入客户。

基于Pandora的应用和实践

|直播APM实践


图 17


图 18


图 19

我们公司有一个 APM 团队,实际上底层的基础计算引擎是 Pandora的 平台做,包括两个事情,直播 APM 的实践以及系统 APM 实践。直播的客户可能比较关注的指标,一个是卡顿率,直播客户按照每分钟上报一卡顿数,通过卡顿率了解出问题的地方在哪。二是首开时间,即用户打开页面直到主播出现的时间。

对于我们 APM 的团队,如图 17 把数据导入各项存储,按照一些运营商最后导出到实时数据库,可以观察全国卡顿的热力图分布(图 18),也可以看到按照时序的卡顿的分布(图 19)。


图 20


图 21

超清首开时间会进行统计,图 20 上已经看到是有一些厂商表现不是很好。平台也提供对运营人员可以检索的日志(图 21),比如说每一个直播的主播的人数暴涨,或者是有一个奇怪的事情,会根据主播 ID,检索出以前的历史记录,如有特殊出现可能要及时掐断。

|系统APM实践


图 22

对于系统 APM,最关心的无非是 1、响应时间;2、调用次数能够查询具体发生细节。

Pandora 可以实时 APM 监控(图 22),比如说进行简单的服务,可以观察码设一条黄线或者红线,超过这个预值进行报警。可以知道每秒请求数,个数和一些详细信息,也可以根据区域去做一个区域图,那么一个区域的次数大概是多少等等。


图 23

Pandora 还可以对它进行整体的分析,比如说过去一个月,每一台服务器显示的时间是怎么样,可能会看到显示时间,以及可以相应的分级。比如设定一个正常的响应,那么就可以区分慢一点跟快一点的,这样可以做一个整体的分级展示。也可以观察某一个时间段数据的表现情况,是不是做数据的优化。


图 23

Pandora 还可看一些地域来源的报表图,实际这个图把数据放在七牛的云存储,然后直接在云存储上进行分析,然后直接出报表展示,还可以切换不同的图表的方式,而且同样这里面做一些交互式动态表。


图 24


图 25

作为一个数据工程师,定义一个动态表,这里面可以有几种值就去筛选数据,实际上数据工程师可以图表展示,不需要进入前端,极大节省数据工程师的投入,还可以用我们提供搜索的服务。

Q&A

现场嘉宾:你刚才说的整个架构经过多人的演进,在演进的过程,我们线上的系统肯定具备的性能,或者处理能力,存储量,还没有达到我们的用户需求,当我们新的解决方案,没有完全落地的时候,这个时候我们采取那些措施来保障我们的线下系统能够正常运行,另外一个问题是 Pandora 的引进计划?

崔文正:这个问题问的非常好,实际上我们会给用户提供一些业务上的解决方案,比如说把一个大的数据仓库,切分成一个小的数据仓库,按照原来的分布式方案去解决整个问题,其实用户发现实际上设计上是有一定不合理,实际上应该把数据仓库切分成小的仓库,方便查询和计算的。在优化以后他们也得到一些好处,我们也得到一些时间发展我们的系统。后期的发展在商业和用户的成本上降低我们要对不同的行业给出一些简单的解决方案,让我们原来的大数据的工程师,在用东西在把很快的系统对接上,这个是我们要做的事情。

想了解更多关于「Pandora」信息,可点击 https://www.qiniu.com/products/pandora 了解。