基于 Elasticsearch 构建千亿流量日志搜索平台实战

作者简介:
王拓,七牛云大数据高级工程师,硕士毕业于中国科学技术大学,曾就职于 PPTV 的广告团队,主要从事视频广告系统、ad exchange 交易平台、dsp 系统的设计和开发工作。于 2016 年加入七牛云,主要负责日志搜索云服务 logdb 的架构和开发工作。目前 logdb 承载公司每天近千亿、近百 TB 数据的日志增量。
今天的分享会从五个方向展开:一,背景;二,系统的设计目标;三,我们在做这个系统过程中遇到的挑战;四,如何应对;五,简单的总结。

介绍这个系统之前,先介绍一下我们整个七牛云 Pandora 大数据平台产品的形态图。


这个产品形态图最左边会有数据源,用户可以把数据导到 Pandora 大数据平台上,通过计算任务的加工,数据可以到一个消息队列。这时候用户可以选择把数据导到日志检索,如果用户有日志检索需求;


另外一条线可以把这个数据再导到新的计算任务,再到消息队列里面,这时候他可以选择将数据导到时序数据库,如果数据带有时间戳的特征。通过时序数据库可以对数据进行一些报警或者监控分析;


另外也可以把数据导到七牛云的对象存储中,有什么好处呢?对象存储其实是七牛的存储服务,这里的定位和 hdfs 类似 ,用户可以通过 Xspark 对已经存放在对象存储里的数据进行多维数据分析。



如图 5 所示是整个 Pandora 的系统架构图。最左边是接入层,我们可以通过 SDK 等方式把数据接入到 Pipeline 管道上,通过 transform 进行计算,这些 transform 就是刚刚对应的计算任务。然后可以通过 Export 导到指定的产品中。右边可以通过 Xspark 等对它进行分析。


今天我们要讨论的 Topic 是位于整个产品中的子产品,在 LogDB 的环节上。LogDB 是一个搜索云服务。我们对这个产品的定位是这样的,希望这个产品能够基于 Pandora 的数据做一些分析服务,希望对普通用户来讲可以在 5—10 分钟内可以完成整个数据的接入,同时我们希望每天可以搞定 MB 到 100TB 的日志增量。当然对于用户来说,最重要的是云服务要做到 0 运维、0 开发、低成本。


我们对系统最初的设计目标如下:


  • 搞定公有云的海量用户,要支持各种规格;

  • 支持单个用户日志规模从 MB —100TB 每天;

  • 查询秒级响应;

  • 系统必须要有可靠性,主要表现在两方面,一方面不能丢数据,另外一方面系统可用性必须在 99.9% 以上;


    要拥抱开源,可以适配 Kibana、grafana 等。我们希望以前使用 ELK 的用户可以用非常低的成本迁移到我们的上面,以前使用 ELK 的范式切过来,对你来说没有任何变化。
    有了设计目标之后,我们马上要做出一个多租户模型。

    我们一开始就认为单个 ES 集群搞不定海量 repo ,因为大家都知道 ES 在 Master 被选举之前是一个 P2P 的系统,但是当 Master 被选取后,它的管理本质上是 Master 和 slave 的模式。所以随着集群规模的增大,以及 repo 数量的增多,整个集群管理负担越来越大,对于 Master 要求会越来越高,最终会导致集群挂掉。


    因为我们团队都是比较有经验的 ES 工程师,所以我们一开始就提出
    产品必须是多集群的海量 Cluster 的模型。如果 Cluster 出现资源不够的情况,我们从运维的角度加一些新的 node 即可。另外如果某一个 Cluster 里面 repo 达到了一定数量,超过了 ES 本身系统资源的瓶颈,我们就会选择增加一个新的 Cluster 来搞定这个事情。

    当我们敲定了多租户模型之后很快迭代出了第一版系统,第一版系统架构图如图 7 所示,最上面一层会有 Portal、logical、SDK 等等。右边是 export,我们的数据从 export 过来,所以要跟 export 对接。


    在 API Server 这层我们做了一些服务的解藕,首先是一个 search 集群,一个索引集群,同时其他类型业务会被集中放在一个集群。我们的数据存在 Mongo 里面,下面是整个 ES 集群的集合,最左边需要有一个管理的服务,对 ES 集群进行维护,比如对 ES 进行优化等。
    我们第一版系统上线之后遇到什么问题?


    首先,因为打点的 QPS 非常高,导致我们的 Mongo 成为系统里最先达到瓶颈的一环,它的查询压力非常大。那么,怎么解决这个问题?其实,我们需要一套 Cache 系统,这样就可以把大量请求拦在 Mongo 之前来解决问题。不过还好,因为我们已经有了一套缓存系统,所以不需要花太长时间来解决这个问题。


    七牛的缓存系统是一个二级缓存系统,第一级指的是 Local Cache,第二级是指 Memory Cache。举个例子,当一个请求发出来,如果 Local Cache 没有命中,它会去 Memory Cache 找,如果 Memory Cache 也没有找到,这时候它会通过 qconf Master 到 Mongo 里找,如果 Mongo 找到之后,会把这个值回写到 Memory Cache 和 Local Cache ,那么第二次请求时直接在 Local Cache 会被拦掉。如果这个请求很不幸命中了另外一个API Server,实际上 Local Cache 是没有的,但是 Memory Cache 也是有的,所以直接去 Memory Cashe 就可以拿到值了,然后再回写 Local Cache。


    通过计算,我们发现,基本上 99% 的请求都会被整个 Cache 系统拦住,而这个系统里基本上有 80% 的请求会被 Local Cache 拦住,也就是说整个 Memory Cache 和 Mongo 的压力都是非常小的。所以我们马上迭代出了新一版的系统,即我们加入了一个 Qconf Cluster。


    这时候我们遇到的新问题是什么?第一个问题是 LAG,经常有用户反映这个系统查不到最近 10 分钟或者 1 小时的数据。那么,LAG 是怎么产生的?我们认为整个写点代码是有问题的,写点的整个效率有问题。所以,我们要对写点做一些优化,在开始优化前,我们首先要了解 Benchmark,对整个效率有一个正确的认识。首先,我们要正确知道 ES 真正的吞吐量在哪里,因为我们整体资源是够的。比如,我们有十台机器,但是为什么我们搞不定这些量,对用户产生 LAG 了呢?所以我们先对 ES 集群做一个 Benchmark。


    我们拿到一个具体的值,ES 集群做一个 Benchmark 要怎么做?官方说,首先你一个并发控制 batchSize,从 0 加到 1 万,当你的 batchSize 增加没有任何收益时,固定这个值,再去增加它的并发数。当并发数增加也没有收益的时候,固定的这两个值就代表了 ES 最佳的效率。
    那也就是说,其实 ES 打点的吞吐量其实取决于客户端,对于这种推送系统其实大部分都是这样。即我们对客户端程序要求比较高,当然我们的数据如果是直接从内存里面来的,没有问题,我们可以一直维持这个并发数。但事实上我们的数据时 export 打给我们的,它发的请求可能是零散的,可能一个 batch 有可能一个只有一条数据或者一万条数据,这种零散的数据打到 ES 之后,整个吞吐量会下滑。


    所以,我们其实在 ES 之前需要做一个数据传输系统,这套系统要搞定 export 打给我们数据零散的点,我们要把它包装好,以 ES 最佳的效率打给 ES,同时这套系统应该是多租户的,我们内部称这个系统叫 Producer。那么,这个系统应该怎么做?首先,我们对数据传输做一个简单的思考,很多人经常会说,你做一个下游系统,对我的服务应该是稳定的,不管我以什么样的姿势跟你交互。或者说传输的下游消费速度也是稳定的,很多时候我们也会认为,这个链路整体传输速度其实取决于上游和下游的影响。但事实上并非如此,对于上游或者下游的速度来讲,吞吐量核心取决于并发数和并发大小,而维持最大吞吐量的核心是要维持并发数和并发大小。另外,要维持整体吞吐量要考虑三方面因素:拉取效率、链路效率和吞吐效率。


    举个例子,我们要搞定从 Kafka 到 ES 每秒 10K 的流量。我们知道 Kafka 效率很高,可能三个 Patation 可以搞定这个事情。那么我们需要的并发是 3。它的整个batchSize 是 10K,这对 Kafka 是最友好的。但是对于 ES 来说,你要搞定每秒 10K 的流量,其实他的姿势应该是这样的,可能你的并发数是5,你的 batchSize 是 20K。


    对于一个正常的系统,我们的 batchSize 该如何调?肯定是以最慢的为准,调到 20×5 的并发数,但事实上即便这样也解决不了。因为链路本身也有效率的损耗,比如你数据是 JSON 格式,它首先要做 unMarshal,这个时间其实是 CPU 的开销。所以真正要搞定这个问题,你可能需要大概 20K×8 的姿势才能搞定这个问题。那么,如何解决这个问题?我们认为我们需要引进一个东西来对上游下游做解藕,我们称之为 Memory queue。

    所以,我们首先需要一个队列,而且是内存队列,因为我们知道数据从上游拉下来之后进入内存队列,它的效率是最高的。进入内存队列之后,我们需要一个 Source 和 sink,因为我们要动态调整上游的 Source 数量和它的 batchSize 、下游的 sink 数量和它的 batchSize。与此同时,我们要做一个事务,为什么呢?用 ES 的人都知道,当我们通过bulk接口打点的时候,一个请求里面有 2w 个点,ES 其实是非常不友好的,他会告诉你这 2 万个点里有一半失败一半成功。其实这对客户端的负担很重。所以我们希望这套系统带事务,这样对于客户端来说,你打给我的点可以保证要么都成功要么都失败。

    我们基本模型做好之后,马上迭代出了最基本的代码。因为七牛是 Go 的深度用户,所以我们很多系统都是用 Go 做的。在这个系统里,我们用 Go 的 channel 来搞定 memory queue。图 9 是 channel 的一个生产者模型,首先你要事务池里面拿到一个事务并开启,同时要把数据塞给这个事务,然后把事务提交,把数据塞给 memory query,当然可能会失败,要做回滚,回滚的时候是把数据从事务里面删掉,同时告诉用户这次整体是失败的。


    图 10 是 channel 的另外一端,是消费者的一个模型,首先你拿到事务,把数据从 channel 塞到事务里,然后尝试去下游打点,如果成功了你可以提交事务,如果失败了回滚事务。回滚是指把数据从事务回滚到 channal。

    有了这么一个基本模型之后,我们马上要做的事情就是要把这个模型付诸于实践,我们要先做一个单机版的 Producer。首先定义一个新的概念 task,task 由 source、sink 和 channel 组成。同时我们要构造一个新的概念 agent,agent 是对 task 进行管理。因为我们上游的 export 系统是通过 Http 协议来向我们发送数据的,所以我们需要一个 rest 的 source。有了这个基本系统后,我们还需要一个 checkpoint sink 来解决丢点的问题。因为我们的数据是在 memory queue 里面,可能会有丢点的情况。


    有了单机版的 Producer 后,马上我们要解决一个新的问题,解决多租户的 Producer,因为是我们公有云的厂商。那么如何解决呢?

    图 12 是多租户的 Producer 模型。首先我们要对 task 进行基本描述,我们要描述它channel、batchSize 的大小,还有它的并发度。描述好 task 后,我们需要描述第二个东西就是 Producer agent metrics,它代表 cpu 资源、磁盘资源、网络资源,我们需要 Producer 对自己的所在机器的状态能够进行实时描述。有了这两个基本点之后,我们马上要做分布式系统,做分布式系统,我们首先想到的思路是通过 ZooKeeper 来解决,但是我们觉得用 ZooKeeper 来解决我们问题有点太重了,我们的服务是可以接受最终一致性的。所以我们觉得可以换一种新的思路去解决。


    最终的思路如下:


    通过版本戳 + PULL 的模型搞定这个数据一致性问题。


    即通过定时到上游拉数据,由版本戳来判定这个数据是不是最新的,通过这种方式,整个系统的管理最终都会是一致的。


    那么这个架构就很明显了,我需要一个 Master 对整个集群进行协调,同时我需要一个Producer agent,它表达的是资源单位。Producer agent 会定期把自己的状态上报给 Master,Master 知道现在整个系统有多少个 task,它需要对 task 以及整体资源池作编排,我们叫它 rebalance。这样每个 agent 就知道我要搞定多少 task,每个 task 状态如何。


    因为假如我们的 agent 非常多,对 Master 的压力则非常大,所以用到了上文提到的 Qconf 集群来搞定 Cache 的问题。当然,我们相信任何自动化都是不靠谱的,总有 case 可能跑在已有的规则以外。所以这套系统刚开始设计的时候就增加了一个 admin 后台,我们可以对它进行人工干预。有了这套系统后,基本能保证我以最大的效率从 export 把数据导入到 ES。即我们通过这套系统搞定了 LAG。


    紧接着,我们马上会遇到一些新的挑战,这个挑战是什么?就是大量的查询超时。我们系统上线之后经常有用户抱怨,譬如有用户发现用在系统搜索十次,其中有八次都在超时,根本不可用。


    怎样解决这个问题?我们对线上数据做了一些采样分析,然后发现以下规律:


    用户的 Query 总是多样的,有可能一个 Query 扫的数据量只有一百万,也可能是一个亿、一百亿、一千亿。


    搜索的过程其实有点像 mapper-reduce 的过程,比如 1 个数据有 1 个 shard,它搞定了 1 个 G 的量,在搜索的过程中如果是 10 毫秒没问题。当数据增加的时候,可能需要 10 个搞定 10 个 G 的量,这时候每个的时间可能都是 10 毫秒,但是整体响应时间其实大于 10 毫秒,因为这个时候做 reduce 的节点(es 叫 coordinating node)要 merge 的数据量更大。也就是随着 shard 的增减,它的搜索体验会越来越差,即当客户尝试搜索 1 千亿条数据,它可能落在 100 个 shard 上,那么它的搜索响应时间是不可估算的。


    听起来这个问题很难去解,我们觉得问题是很复杂的,但是我们可以抓住其中一些主要的变量,那这是什么?我们首先还是要做 Benchmark,这样才能正确认识到瓶颈到底在哪里。
    譬如我们测了一个 shard ,他搞定数据,实际的 QPS 是多少。这时候 QPS 是很重要的关键点,我们可以控制 QPS。比如每个用户的 Query 是很复杂的,我可以控制 QPS 对限制某个用户对资源的掠夺,这样可以保证整个 shard 的 QPS 不会出现特别高的情况,是在可控范围之内。另外要对日志的 Query 进行优化,针对特定的 Query 调优会比对 es 参数的调优收到比显著的效果。


    所以我们的思路是:


    首先通过 QPS 控制整体的量,然后再做 Query 的优化,把一些大的 Query 拆成小的 Query。


    那么如何优化?首先看一下日志 Query 的特点,在我们看来日志 Query 天生带有时间的 Tag,另外它的排序是时间排序,不像一个广告系统的排序可能是相关度。有了这两点之后我们就可以展开了。


    举个例子,一个用户 Query 来了之后,它是带时间的,我们知道这个时间所需要的 Shard 大概在哪一天,这样我们可以避免无用的资源消耗,相当于是把大的 Query 拆成小的 Query,因为之前我们的 search是要扫所有的 shard。假如我确定它在第一天,这时我到对应的 shard 上搜即可。还有就是满足条件预判呢?比如,它对这次搜索的结果希望是反馈 200 条数据,那因为你是时间排序,很有可能你去搜索的时候第一个Shard 就已经有 200 条数据了,这时候我们可以提前返回。


    第二个思路是,我们要做一个执行计划的可控。那么,怎么去控制一个执行计划?我们刚才有讲到,搜索体验随着 Shard 的增多而增多。同时搜索 100 个 Shard ,它的体验非常差,所以我们要控制。经过测试发现 5 个 Shard 的搜索体验最好。


    我们的执行计划是这么做的,比如你要搜索最近一年的数据,那怎样去做呢?如果按照之前的姿势,你会把整个系统资源占掉,把系统拖垮,还拿不到数据。而现在我们在后台把这个做成串行的,先给你搜索 5 个 Shard,5 个 Shard 一直叠加,而用户看到的是一个进度条,通过这种方式,用户更容易接受,不会感到茫然。因为我们之前的搜索时间经常到 60 秒或者 60 秒开外,这时候用户会感到茫然,说我等了这么长时间你给我看一个 503。


    我们其实关于 Query Excecutor 这块的优化点其实蛮多,我挑了关键的三个点讲一下。搞定了查询的超时问题似乎大部分问题搞定了,但其实也不是这样,我们的挑战永远都存在。我们经常会在晚上 24 点被用户叫来,说你们的集群挂掉了,我们经常凌晨 2、3 点一起讨论到底什么原因导致的。


    关于 24 点挂掉的原因,用 ES 的人可能都清楚,ES 的自带的 TTL 机制其实很笨,效率非常差。那我们要避免要使用这种姿势,按天建立索引,所以不可避免在 24 点会有一些问题,比如到 24 点都要对它进行创建索引,而且这个创建索引非常扎堆,扎堆是指比如现在有 1000 用户,可能需要 2000 个 shard 搞定这个事情,也就是说在 24 点这一刻马上创建 2000 个 Shard,这个对于整个集群的压力是非常大的,因为整个任务会持续堵塞,会造成整个集群内存爆炸,最终导致 Master 挂掉,集群解散。


    这个问题解法其实非常简单,我们可以提前一天平滑创建。24 点的问题核心是扎堆创建,所以我们要平滑,我们可以提前一天的 3 点一个一个去创建索引,保证整个集群的压力是可控的。


    这个事情听下来还是有点玄乎,因为感觉都不太好控制,所以我们还是根据之前的解决方案,抓大头,只要搞定写点和查询即可。之前已经强调过写点的 batchSize ,查询的 QPS 我们都是有的。那我们可以考虑对 shard 进行切分,70% 的资源用来搞定写点,30% 的资源用来搞定查询。这时候整个 shard 都是一个可量化的指标,譬如,这个 Shard 可以搞定多少写点,这个 Shard 可以搞定多少查询。有了 shard 这个标准化概念后,我们要马上要搞定它。


    但搞定它之前,应该先看看现在面临的问题是什么样?我们的系统是基于 ES 构建的,但是它 rebalance 算法是有局限性的。用过 ES 的人都知道,它 rebalance 的考虑因子主要有以下几个方面:


    第一,对磁盘的使用率,第二,认为每一个 Shard 都是一样的,他不知道随着时间的推移每一个 Shard 能搞定的量是多少。按天的方式,第一天我们有 10 个节点 10 个Shard,每个节点搞定一个 Shard。但是经过一天的发展,每个节点上的 Shard 数据量量都不一样。这时候到了第二天,某几个节点的磁盘使用率可能非常低,这 10 个 Shard 有可能出现互相的堆积,集中在其中 3—4 台节点上。也就是说我们不能依靠 ES 搞定这个事情,我们需要自己动手。


    再看看另外一边,我们能够正确描述一个 Shard,因为 Shard 编排是双向的,首先你要表达清楚需要多少资源,然后才能根据我的现有资源进行安排。但是在另外一边,用户打点的量和他查询的 K8S ,其实都是弹性的,因为你不知道公有云用户什么时候来我们平台,你根本不知道他的流量需求、查询需求是多少。如果他是来查问题的。比如查一下日志,当然这种 QPS 非常低,大概每天几百个。他有可能用你的服务构建一个系统,这种系统写点可能比较低,查询比较高。所以现在首先 ES 不能帮我们搞定这个事情,我们需要自己动手搞。其次,用户流量不可预期。


    那么,怎么解这个问题?我们认为需要从四个方面考虑:
    首先是冷启动。对于一个新用户来到我们平台之后,我们对他其实是没有任何认知的,这时候我们需要给他一个冷启动的标配。比如,他来之后给他一个默认的标配,给他分一定量资源。


    另外要做流量预估,对于一个已经在我们平台上跑了很多天的用户,他的流量是可预期的。我们可以根据他历史上多少天对流量进行预估,这跟广告比较像,广告里面有一个库存的概念,你要提前预估库存才能进行售卖,同理,要预估流量来采购资源,保证系统平滑运行。这套系统现在是在跑在 XSpark 上的。


    另外需要一个实时扩容缩容的东西。什么意思呢?就是当某用户是在我们平台上跑了大概三个月,表现非常好,流量很稳定。但如果是一个电商用户,比如他 618 大促,这种我们算法搞不定,它跑在我们的算法之外。所以我们需要进行动态的扩容缩容,它不依赖于任何算法去做预测,只是根据现有的情况,实时增加容量或者减少容量,它是基于我们之前的 Pandora workflow 做的,我们也不需要做基建,只需要搞定这个系统就可以了。


    搞定了流量预估、动态扩缩容,我们应该不会存在热点了。这里面还有一个问题需要注意,就是稳定性的事情。就是我们的编排算法一定要稳定,这个稳定是什么意思?


    之前我们在介绍的 Producer 的时候大家也注意到,Producer 是一个轻状态的server,它的迁移成本非常低,而对于 ES Shard ,迁移成本非常高,比如这个 Shard 本来已经搞定了 100G 的量,这时候属于算法不稳定,每天算法跑完之后编排结果出来之后,需要对 Shard 进行大量挪动迁移,其实对整个集群压力非常大。因为你想 1 个 T 的 Shard 如果从这个磁盘拷到另外一个磁盘,大致也是需要好几个小时的。


    所以这个算法必须是稳定,必须能够以最小的 Shard 编排结果搞定热点问题。搞定了 Shard 编排系统,又搞定了 Producer 系统,还搞定了搜索的查询执行计划系统,看似我们的系统应该是完美了。

    因为我们团队 ES 工程师有经验的比较多,所以我们觉得可以用 ES 搞定这个问题。但是在项目选型之初,我们觉得不希望系统被 ES 绑定,所以我们的系统其实是跟ES 解耦的,包括 API 的一些接口定义都是跟 ES 无关的,也就是我们希望我们的系统后面能挂到更多的引擎,事实上我们现在在搞定之前一些比较头疼的问题之后,我们现在也在尝试一种新的可能,尝试让我们的系统可以挂在更多的引擎解决这个问题。


    我说完那么多,那么,我们到底搞定了多少事情?


    第一,支撑海量用户;第二,支撑每天 100T+、2000 亿+ 的流量;第三,没有 LAG;第四,查询可以保证秒级返回;第五,接近 0 运维,因为即便我们系统现在做的特别好,但是总有一些不在我们的现在系统能够做的范围之内,这个指的是编排等分配问题,所以有时候需要人工去干预一下;第六,现在系统能够做到可用性在 99.9%。


    最后再给大家做个预告,七牛云 Pandora 大数据平台内测邀请中,点击阅读原文即可参与。