【译】日志:每个软件工程师都应该了解实时数据的统一抽象【二】
非常经典的一篇博文,由于原文太长,所以按照不同的 part 进行翻译,此文是 part two。
贴心的目录在这里:
【译】日志:每个软件工程师都应该了解实时数据的统一抽象【一】
这一部分通过提出问题,阐述问题的解结思路的方式展开。首先提出了数据集成的难题,包括应用系统中事件信息的大量涌现,难以处理;再加上多个系统之间的生产和消费。于是,Kafka 这种消息队列中间件就应运而生。
现在看来 Kafka 是大型系统的必要组件,合情合理;但是放在当时的场景,如何在多个冗杂的系统中,将消息的订阅和分发给解耦出来,确实也是不容易的。
Kafka 的详细文档见这里。
原文:The Log: What every software engineer should know about real-time data's unifying abstraction
发布时间:2013 年 12 月 16 日
第二部分:数据集成
让我先说一下我所说的“数据集成”是什么意思以及为什么我认为它很重要,然后我们将看看它与日志的关系。
数据集成正在使组织的所有数据在其所有服务和系统中可用。
“数据集成”这个短语并不常见,但我不知道更好的短语。更容易识别的术语ETL通常只涵盖数据集成的有限部分——填充关系数据仓库。但我所描述的大部分内容,可以被认为是 ETL 泛化以涵盖实时系统和处理流程。
在围绕大数据概念的所有令人窒息的兴趣和炒作中,您并没有听到太多关于数据集成的信息,但尽管如此,我相信“使数据可用”这个平凡的问题是组织可以关注的更有价值的事情之一.
数据的有效使用遵循一种马斯洛的需求层次。金字塔的基础涉及收集所有相关数据,能够将它们放在适用的处理环境中(无论是花哨的实时查询系统,还是只是文本文件和 python 脚本)。这些数据需要以统一的方式建模,以便于读取和处理。一旦满足了以统一方式收集数据的这些基本需求,就可以在基础设施上以各种方式处理这些数据——MapReduce、实时查询系统等。
值得注意的一点是:没有可靠和完整的数据流,Hadoop 集群只不过是一个非常昂贵且难以组装的空间加热器。一旦数据和处理可用,人们就可以将注意力转移到更好的数据模型和一致且易于理解的语义等更精细的问题上。最后,注意力可以转向更复杂的处理——更好的可视化、报告以及算法处理和预测。
根据我的经验,大多数组织在这个金字塔的底部都有巨大的漏洞——他们缺乏可靠的完整数据流——但希望直接跳到高级数据建模技术。这完全是倒退。
所以问题是,我们如何才能在组织的所有数据系统中建立可靠的数据流?
数据集成:两个难题
两个趋势使数据集成变得更加困难。
事件数据流水线
第一个趋势是事件数据的兴起。事件数据记录发生的事情,而不是事情本身。在 Web 系统中,这意味着用户活动记录,以及机器级事件和统计信息——用于可靠地操作和监控数据中心机器的价值。人们倾向于称其为“日志数据”,因为它通常被写入应用程序日志,但这将形式与功能混淆了。这些数据是现代网络的核心:毕竟,谷歌的财富,就是由建立在点击和印象(即事件)之上的关联渠道产生的。
而且这些东西不仅限于网络公司,只是网络公司已经完全数字化,所以它们更容易检测。长期以来,财务数据一直以事件为中心。RFID将这种跟踪添加到物理对象。我认为随着传统业务和活动的数字化,这种趋势将继续下去。
这种类型的事件数据记录了发生的事情,并且往往比传统数据库使用大几个数量级。这对处理提出了重大挑战。
专业数据系统的爆炸式增长
第二个趋势来自专业数据系统的爆炸式增长,这些系统在过去五年中变得流行并且通常免费提供。专业的数据系统应用在OLAP、搜索、简单 在线 存储、批处理、图形分析等 。
更多种类的更多数据,以及将这些数据输入更多系统的愿望,一起导致了巨大的数据集成问题。
日志结构的数据流
日志是处理系统间数据流的自然数据结构。原理很简单:
获取组织的所有数据并将其放入中央日志以进行实时订阅。
每个逻辑数据源都可以建模自己的日志。数据源可以是记录事件(例如点击或页面查看)的应用程序,也可以是接受修改的数据库表。每个订阅系统都尽可能快地从该日志中读取数据,将每条新记录应用到自己的存储中,并移动其在日志中的位置。订阅者可以是任何类型的数据系统——缓存、Hadoop、另一个站点中的另一个数据库、搜索系统等。
例如,日志概念为每个更改提供了一个逻辑时钟,所有订阅者都可以根据该时钟进行测量。这使得推测不同用户系统相对于彼此的状态要简单得多,因为每个用户系统都有一个他们已经读取到的“时间点”。
为了使这一点更具体,考虑一个简单的情况,其中有一个数据库和一组缓存服务器。该日志提供了一种方法,来同步所有这些系统的更新并推断每个系统的时间点。假设我们用日志条目 X 写了一条记录,然后需要从缓存中读取。如果我们想保证我们看不到陈旧的数据,我们只需要确保我们不会从任何尚未复制到 X 的缓存中读取。
日志还充当缓冲区,使数据生产与数据消费异步。这很重要,原因有很多,尤其是当有多个订阅者可能以不同的速率消费时。这意味着订阅系统可能会崩溃或停机进行维护,并在它恢复时赶上:订阅者以它控制的速度消费。像 Hadoop 或数据仓库这样的批处理系统可能只需要每小时或每天消费一次,而实时查询系统可能需要实时更新。原始数据源和日志不用关心各种数据目标系统,因此可以在不改变数据流的情况下添加和删除消费者系统。
特别重要的是:目标系统只知道日志,而不知道源系统的任何细节。消费者系统不需要关心数据是来自关系数据库管理系统、新型的键值存储,还是在没有任何类型的实时查询系统的情况下生成的。这似乎是一个小问题,但实际上很关键。
我在这里使用术语“日志”而不是“消息系统”或“发布订阅”,因为它更具体地描述了语义,并且更详细地描述了在实际实现中支持数据复制所需的内容。我发现“发布订阅”不仅仅意味着消息的间接寻址——如果你比较任何两个实现发布-订阅的消息传递系统,你会发现它们保证的东西非常不同,并且大多数模型在这个领域没有用处。您可以将日志视为一种具有持久性保证和强排序语义的消息传递系统。在分布式系统中,这种通信模型有时被称为原子广播(有点可怕) 。
值得强调的是,日志仍然只是基础设施。掌握数据流的故事还没有结束:故事的其余部分围绕元数据、模式、兼容性以及处理数据结构和演变的所有细节。但是,除非有一种可靠的、通用的方法来处理数据流的机制,否则语义细节是次要的。
在 LinkedIn
随着 LinkedIn 从集中式关系数据库转移到分布式系统的集合,我不得不面对这个数据集成问题的迅速出现。
如今,我们的主要数据系统包括:
其中每一个都是专门的分布式系统,在专业领域提供高级功能。
甚至在我来到这里之前,这种使用日志进行数据流的想法就已经在 LinkedIn 上流传。我们开发的最早的基础设施之一是一项名为databus的服务,它在我们早期的 Oracle 数据表上提供了一个日志缓存抽象,用来扩展对数据库更改的订阅,使得我们可以提供社交图和搜索索引。
我将提供一点历史背景。我自己在 2008 年左右开始参与这件事,当时我们已经发布了我们的键值存储。我的下一个项目是尝试启动一个有效的 Hadoop 环境,并将我们的一些推荐流程移到那里。由于在这方面经验不足,我们自然而然地预算了几周的时间来输入和输出数据,剩下的时间用于实现优质的预测算法。于是开始了漫长的征程。
我们最初计划只是从我们现有的 Oracle 数据仓库中抓取数据。第一个发现是,快速从 Oracle 中获取数据是一种黑魔法。更糟糕的是,数据仓库处理不适用于我们为 Hadoop 计划的批处理产品——大部分处理是不可逆转的,并且特定于正在完成的报告。我们最终避开了数据仓库,直接访问了源数据库和日志文件。最后,我们实现了另一个管道将数据加载到我们的键值存储中以提供结果。
这种平凡的数据复制最终成为原始开发的主要项目之一。更糟糕的是,任何时候任何管道出现问题,Hadoop 系统基本上都是无用的——在坏数据上运行优质的算法只会产生更多的坏数据。
尽管我们以相当通用的方式构建了程序,但每个新数据源都需要自定义配置来设置。它也被证明是大量错误和失败的根源。我们在 Hadoop 上实现的站点功能变得流行起来,我们发现自己拥有一长串感兴趣的工程师。每个用户都有一个他们想要集成的系统列表和一长串他们想要的新数据推送。
古希腊的 ETL。没有太大变化。
我慢慢明白了一些事情。
首先,我们建造的管道虽然有点乱,但实际上非常有价值。仅仅在新的处理系统 (Hadoop) 中提供数据的过程,就开启了许多可能性。以前很难对数据进行新的计算。许多新产品和分析只是将以前锁定在专门系统中的多条数据组合在一起。
其次,很明显,可靠的数据加载需要数据流的深入支持。如果我们收集了我们需要的所有结构,我们可以使 Hadoop 数据加载完全自动化,这样就无需手动添加新数据源或更爱处理模式——数据会神奇地出现在 HDFS 中,并且会自动为新生成 Hive 表提供适当列的数据源。
第三,我们的数据覆盖率仍然很低。也就是说,如果您查看 LinkedIn 拥有的在 Hadoop 中可用的数据的总体百分比,它仍然非常不完整。考虑到操作每个新数据源所需的工作量,完成工作并非易事。
我们一直在为每个数据源和目标构建自定义数据加载,这种方式显然是不可行的。我们有几十个数据系统和数据存储库。连接全部系统,将导致在每对系统之间都构建自定义管道,如下所示:
请注意,数据通常双向流动,因为许多系统(数据库、Hadoop)既是数据传输的源又是目的地。这意味着我们最终将为每个系统构建两条管道:一条用于实现数据输入,一条用于实现数据输出。
这显然需要一支军队来建造,而且永远无法运作。当我们接近完全连通性时,我们最终会得到类似 O() 的管道。
相反,我们需要这样的通用内容:
我们需要尽可能地将每个消费者与数据源隔离开来。理想情况下,它们应该与一个单一的数据存储库集成,使得它们可以访问所有内容。
这个想法是,添加一个新的数据系统——无论是数据源还是数据目标——都应该仅将其连接到单个数据流,而不是每个数据消费者。
这段经历使我专注于构建Kafka,将我们在消息传递系统中看到的内容与数据库和分布式系统内部流行的日志概念相结合。我们希望首先充当所有活动数据的中央管道,最终用于许多其他用途,包括 Hadoop 外的数据部署、监控数据等。
长期以来,Kafka 作为一种有点独特的基础设施产品(有些人会说很奇怪)——既不是数据库,也不是日志文件收集系统,也不是传统的消息传递系统。但最近亚马逊提供了一项与 Kafka 非常相似的服务,称为Kinesis。相似之处在于处理分区、保留数据的方式,以及 Kafka API 中高级和低级消费者之间相当奇怪的划分。我对此感到非常高兴。创建了良好的基础设施抽象的标志,是 AWS 将其作为服务提供!他们对此的看法似乎与我所描述的完全相似:它是连接他们所有分布式系统(DynamoDB、RedShift、S3 等)的管道,也是使用 EC2 进行分布式流处理的基础。
与 ETL 和数据仓库的关系
让我们稍微谈谈数据仓库。数据仓库是为了建立干净、集成数据的存储库,可以支持分析。这是一个好主意。对于那些不知道的人,数据仓库方法论包括:定期从源数据库中提取数据,将其转换成某种可理解的形式,最后将其存储到中央数据仓库中。对于数据密集型分析和处理来说,拥有包含所有数据干净副本的中心仓库,是一项非常宝贵的资产。在高层次上,无论你使用 Oracle、Teradata 还是 Hadoop 等传统数据仓库,这种方法都不会发生太大变化,尽管你可能会改变导入和修改的顺序。
存储干净的集成数据的仓库是一项非凡的财产,但获得它的机制有点过时了。
以数据为中心的组织的关键问题是,将干净的集成数据捆绑到数据仓库。数据仓库是一种批量查询基础设施,非常适合多种报告和专门分析,尤其是当查询涉及简单的计数、聚合和过滤时。但是,将批处理系统作为唯一的干净且完整的数据仓库,意味着数据无法用于需要实时推送的系统——实时处理、搜索索引、监控系统等。
在我看来,ETL 真的是两件事。首先,它是一个提取和数据清洗过程——本质上是释放锁定在组织中各种系统里的数据,并删除特定于系统的无意义信息。其次,为数据仓库查询重新构建数据(例如,调整为适合关系数据库的类型,强制转换为星形或雪花模式,或者分解为高性能列 格式等)。把这两件事混为一谈是个问题。干净且集成的数据存储库应当具有实时可用性,并且能够用于低延迟处理,以及其他实时存储系统中的索引。
我认为,这给数据仓库 ETL 在组织上更具可扩展性带来了额外好处。数据仓库团队的经典问题是,他们负责收集和清洗组织中其他团队生成的所有数据。激励措施是不一致的:数据生产者通常不太了解数据仓库中数据的使用情况,导致创建的数据难以使用,或需要大量难以扩展的转换才能变为可用形式。当然,中心团队也永远无法完全按照组织中其他部门的节奏进行扩展,因此导致数据覆盖范围总是参差不齐,数据流很脆弱,而且变化很慢。
更好的方法是拥有一个中心数据管道,即日志,以及一个定义明确的 API 用于添加数据。数据的生产方,应当保证提供干净、结构良好的数据。这意味着,作为系统设计和实施的一部分,他们必须考虑数据应该以良好地结构在中心管道中进行传输,包括输入和输出。添加新的存储系统对数据仓库团队没有影响,因为它们有一个中心集成点。数据仓库团队,只处理这些简单的问题:从中央日志中获取结构化数据推送,以及执行特定于其系统的转换。
当考虑采用传统数据仓库之外的其他数据系统时,关于组织可扩展性的这一点变得尤为重要。例如,假设一个人希望提供对组织的完整数据集的搜索功能。或者,假设你希望通过实时趋势图和告警,提供对数据流的亚秒级监控。在这两种情况下,传统数据仓库甚至 Hadoop 集群的基础架构都不合适。更糟糕的是,为支持数据库负载而构建的 ETL 处理管道可能无法为这些系统提供数据,这使得搭建这些基础设施的工作与采用数据仓库一样大。这并不是可行的方案,并且可能可以解释:为什么大多数组织没有这些能够方便地访问数据的功能。相比之下,如果组织已经构建了统一、结构良好的数据源,那么让任何新系统完全访问所有数据只需要一点连接到管道的集成工作。
这个结构还针对特定清理或转换提供了不同的选项:
它可以由数据生产者在将数据添加到公司范围的日志之前完成;
它可以作为对日志的实时转换来完成(这反过来会产生一个新的转换日志);
它可以作为转换到某些目标数据系统的过程中的一部分来完成。
最好的模型是在数据生产者将数据发布到日志之前进行清理。这意味着确保数据采用规范形式,并且不会保留来自特定代码或维护它的存储系统的任何遗留信息。这些细节最好由生产数据的团队处理,因为他们最了解自己的数据。在这个阶段应用的任何逻辑都应该是无损且可逆的。
任何能够实时完成的增值转换,都应该在原始日志生成之后再处理。这将包括事件数据的会话化,或添加其他普遍感兴趣的派生字段。原始日志仍然可用,但这种实时处理会生成包含增强数据的派生日志。
最后,只有特定于目标系统的聚合,才应该作为加载过程的一部分执行。这可能包括将数据转换为特定的星形或雪花模式,以便在数据仓库中进行分析和报告。因为这个阶段,可以最自然地映射到传统 ETL 过程,现在是在一组更干净、更统一的数据流上完成的,所以会大大简化。
日志文件和事件
让我们稍微谈谈这种架构的一个附带的好处:它支持解耦的、事件驱动的系统。
Web 行业中获取活动数据的典型方法是将其记录到文本文件中,然后可以将其提取到数据仓库或 Hadoop 中以进行聚合和查询。这样做的问题与所有批处理 ETL 的问题相同:它将数据流与数据仓库的能力和处理流程耦合在一起。
在 LinkedIn,我们以日志为中心的方式构建了我们的事件数据处理。我们使用 Kafka 作为中央的多订阅者事件日志。我们已经定义了数百个事件类型,每个事件类型都获取关于特定类型动作的独特属性。这涵盖了从页面浏览量、广告印象、搜索,到服务调用和应用程序异常的所有内容。
为了理解这样做的好处,想象一个简单的事件——在招聘页面上显示一个职位发布。招聘页面应仅包含显示职位所需的逻辑。但是,在一个相当动态的站点中,这很容易被与显示职位无关的附加逻辑所掩盖。例如,假设我们需要集成以下系统:
我们需要将此数据发送到 Hadoop 和数据仓库以进行离线处理;
我们需要对观看次数进行计数,以确保观看者没有尝试进行某种内容抓取;
我们需要聚合此视图,用以显示在职位发布者的分析页面;
我们需要记录视图,以确保我们正确地对该用户的任何职位推荐进行展示限制(我们不想一遍又一遍地显示相同的内容);
我们的推荐系统可能需要记录视图,以正确跟踪该工作的受欢迎程度;
Etc。
很快,展示职位的简单行为变得相当复杂。当我们添加其他显示职位的地方时——移动 app 等等——这个逻辑必须被延续,复杂性也会增加。更糟糕的是,我们需要交互的系统现在缠绕在一起——开发职位展示功能的人需要了解许多其他系统和功能,并确保它们被正确集成。这只是问题的玩具版本,任何真正的应用程序都会更复杂,而不是更少。
“事件驱动”风格提供了一种简化这一点的方法。职位推荐页面现在只展示职位,并记录职位的事实以及相关属性、查看器以及关于职位推荐的任何其他有用的事实。其他每个感兴趣的系统——推荐系统、安全系统、招聘海报分析系统和数据仓库——都只是订阅数据并进行处理。职位推荐代码不需要知道这些其他系统,并且如果添加了新的数据消费者也不需要更改。
构建可扩展的日志
当然,将发布者与订阅者分开并不是什么新鲜事。但是,如果您想保留提交日志,并把它作为实时日志用以记录消费者规模网站上发生的所有事情,同时支持多订阅者,那么可扩展性将是一个主要挑战。如果我们不能构建一个足够快速、廉价和可扩展的日志,并满足大规模使用,那么使用日志作为通用集成机制将永远只是一个不切实际的幻想。
系统人员通常认为分布式日志是一种缓慢、重量级的抽象(通常只将其与 Zookeeper 提供的“元数据”用途相关联)。但是,对于专注于记录大数据流的深思熟虑的实施,这不一定是真的。在 LinkedIn,我们目前每天通过 Kafka 运行超过 600 亿条不同的消息写入(如果算上数据中心之间镜像的写入量,则为数千亿条)。
我们在 Kafka 中使用了一些技巧来支持这种规模:
对日志进行分区;
通过批处理读取和写入来优化吞吐量;
避免不必要的数据复制。
为了允许水平扩展,我们将日志分成多个分区:
每个分区都是一个完全有序的日志,但分区之间没有全局排序(除了可能包含在消息中的一些壁钟时间)。消息到特定分区的分配由开发者控制,大多数用户选择通过某种键(例如用户 ID)进行分区。分区允许在没有分片之间协调的情况下发生日志追加,并允许系统的吞吐量与 Kafka 集群大小保持线性伸缩。
每个分区都通过可配置副本的数量进行复制,每个副本都有分区日志的相同副本。在任何时候,其中一个分区将充当领导者;如果领导者失败,其中一个副本将接管领导者。
缺乏跨分区的全局顺序是一个限制,但我们还没有发现它是一个主要限制。事实上,与日志的交互通常来自成百上千个不同的进程,因此谈论它们的行为的总顺序是没有意义的。相反,我们提供的保证是每个分区都保持顺序,并且 Kafka 保证:来自单个发送者追加到特定分区的日志,将按照它们发送的顺序进行交付。
像文件系统一样,日志很容易针对线性读写模式进行优化。日志可以将小的读取和写入组合成更大的高吞吐量操作。Kafka 积极地追求这种优化。在发送数据、写入磁盘、服务器之间的复制、向消费者传输数据以及确认已提交的数据时,从客户端到服务器会进行批处理。
最后,Kafka 使用一种简单的二进制格式,在内存日志、磁盘日志和网络数据传输之间进行维护。这使我们能够利用许多优化,包括零拷贝数据传输。
这些优化的累积效应,使得你通常能够以磁盘或网络支持的速率写入和读取数据,即使在维护大大超出内存的数据集时也是如此。
这篇文章主要不是关于 Kafka 的,所以我不会详细介绍。您可以在此处阅读有关 LinkedIn 方法的更详细概述,并在此处阅读对 Kafka 设计的全面概述。
评论