为什么选择Kafka

为什么选Kafka?鉴于弘大的数据量,须要将其做成分布式,这时须要将Q里面的数据分到许多机器上进行存储,除此之外还有分布式的打算需求。
同时须要支持多措辞,如Java、GO、php等,其余还有高可用的需求。

Kafka集群

瓜子phpDataPipeline丨瓜子二手车基于Kafka的构造化数据流 Vue.js

Realtime的Kafka集群通过Mirror Maker将数据全部同步到Analysis的Kafka集群。

Realtime的Kafka集群紧张卖力在线实时读写,Analysis卖力很多事情,诸如数据的导入导出,数据的多次流出给集群和网络硬盘带来了较大压力。
为担保线上的稳定性,要担保两边是隔开的。
其余关于Topic目前有五万多,每秒可能会有100多万的数据流入流出。

Kafka的用户利用问题

1.参数配置问题, Kafka有很多参数须要配置,常用的集群配置,延迟,主要性等,须要封装。

2. 开拓测试未便利,利用者常日会有这样的需求:我的数据写进去没,消费没,他写的数据长什么样子,构造化的数据还需自己写代码来解析,等等。
这些问题没有工具和平台来办理,会大大降落开拓效率。

3. Topic申请未便利,topic是不能开放自己创建的,我们曾在测试环境开放过Topic,创造一周内涨到了好几万,而且参数千奇百怪,有全用默认参数的,有根据文档,韶光先来10个9的,也有partition直接来100的。
工单办法对管理者很不友好,须要登上做事器敲命令,效率低下,且随意马虎出错。

4. 构造化数据查询未便利,瓜子的构造化利用的是AVRO, 序列化之后的数据很难查看原始数据。

5. 消费非常定位不便,比如消费的数据或者位置不对,如果想要回滚重新消费或跳过脏数据就面临各种问题。
从哪个offset开始重新消费呢?或者跳到之后的哪个offset呢?其余便是滚动重启了一个做事,结果创造消费的数据少了一批,很有可能是某一个隐蔽的consumer同时在用这个consumer group,但是找了一圈没找到哪个做事还没关掉。

6. 不知道下贱,如果写了生产者生产的Topic数据,却不知道有哪些consumer,如果要对Topic信息发生改变时,不知该关照谁,这是很繁芜的事情。
要么先上,下贱出问题了自己叫,要么犹豫不前,先网络下贱,当然实际情形一样平常是前者,常常鸡飞狗跳。

7. 运维繁芜,日常运维包括topic partition增加,帮助定位脏数据(由于他们不知道有脏数据),帮助打消配置问题等等。

办理方案:Kafka platform

为办理上述问题,瓜子上线了Kafka platform,紧张面向用户和管理两方面的功能。

面向用户包括:查看数据,理解消费情形,方便地添加监控报警,以及如果涌现问题后,快速查错的工具。

管理方面包括: 权限管理, 申请审批,还有一些常用操作。
比如,seek offset, 或是删掉一个Topic,对partitions进行扩容等。

1. 数据查询

可以通过offset查询对应offset的数据,也可以通过进入Kafka的大致韶光,查询那段内的数据,可以看到每条信息的partition,offset,入Kafka的韶光,AVRO的版本信息等。

2. 消费查询

通过下图显示的界面可以查看一条,理解哪些consumer group已经消费了,哪些没有消费。

同时可以查看它现在正在被哪个IP进行消费,这时我们可以方便地定位到有个consumer没有关闭,它是在哪台机器上,这些来自于我们的实践履历。
还可以看到每个consumer group的消费延迟情形,精确到条数,partition的延迟。
也可以看到partition总数,可以排查一些不均的问题。

下图为监控报警,可以理解Topic的流入、流出数据,每秒写入多少条,多大的size,每秒流出的情形。

报警是对Topic建一些流量报警,或是一些延迟报警,建好之后只须要订阅一下即可,非常方便。

瓜子构造化数据流

目前有许多利用场景,比如前端埋点,tracking日志,Mysql数据同步,操作日志,一些诸如做事之间的交流,基于SQL的streaming,APM的数据,还有基于SQL的ETL等,都可以通过构造化将其快速同步到大数据中做后续剖析。

我们是通过confluent供应的一整套办理方案来实现的。
个中最紧张的两个组件是:Schema Registry和Kafka Connect。
Schema Registry用于存储schema信息,Kafka connect用于数据转移。

目前,瓜子除日志部分外,90%以上为构造化。
为什么选择Avro?由于Avro速率快,并且跨措辞支持,所有的Schema AVSC都是用JSON做的,对JSON支持的特殊好,如果可能没人想为一个schema定义再学一门措辞吧。
而且通过JSON无需code generation。

但仅有avro还不足,我们在利用中会面临更多的问题,比如:

- 统一的schema中央,这与配置中央的必要性是一样的道理,没有统一的地方,口口相传,配置乱飞不是我们想看到的。

- 多版本的需求,schema是肯定会有更新需求的,也肯定有回滚需求,也会有兼容需求,以是多版本是须要知足的。

- 版本兼容性检讨,设想一下上游改了一个schema的列名,下贱写到hive的时候就蒙了,历史数据咋办啊,现在这列数据又怎么处理。
以是得有版本兼容,而且最好知足下贱所有组件的需求。

- schema得有注释,给人展示的schema最好能有给人读的注释,很多人昨天定义的enum,本日就忘了,这个事情很常见。

为办理这些问题,我们引入了confluence的Schema Registry。
Confluence的Schema registry,通过RESTful接口,供应了类似配置中央的能力,还有完善的UI,支持版本兼容性检讨,支持多版本等,完备知足了我们的需求。
而且自带HA,通过Kafka存储配置信息,担保同等性。

瓜子的实践

当然,仅有这些还不足,我们在实践中碰着了很多问题,比如schema注册不可能完备开放,历史见告我们完备的自由意味着混乱。
为在实践中利用好avro,我们前后改了两个方案,来担保schema可控。

1. 最初的方案

为实现统一管控,所有schema会通过git来管理,如果须要利用可以fork该git。
如果要做一个上线,更新或添加一个schema,可以通过提merge request提交给管理员。
管理员检讨没有问题后直接通过gitlab-ci自动注册,管理员只需完成确认的操作。

但这样会涌现一些问题,首先是上线流程太长,要上线或更新一个schema时,须要提交merge request,要等管理员收到邮件后才可查看,届时如果管理员创造schema写的不对,还需重新再走一次流程,中间可能花一天韶光。
且回滚繁芜,没有权限管理。
而且很多人会犯同样的缺点,客服表示相称的摧残浪费蹂躏韶光。

平台化办理方案

通过平台化办理方案,我们供应了一个类似于git的页面,可在上面直接提交schema,不才面直接点击校验,在评估新上线的schema是否有问题后,等待后台审批即可。
个中可以加诸如权限管理等一些功能。

为什么用到Kafka connect

Kafka connect专注copy数据,把一个数据从data source转到Kafka,再从Kafka转到其它地方。
它支持批和流,同时支持实时和批处理,比如5min同步一次。

其余,它支持多个别系之间相互copy,数据源可能是Mysql、SQL Server 也可能是Oracle 。
sink可以是Hbase、Hive等。
它自己定义了一套plugin接口,可以自己写很多数据源和不支持的sink。

并且他自己做到了分布式并行,支持完善的HA和load balance,还供应方便的RESTful 接口。

在没有Kafka connect之前,运维ETL非常麻烦。
拿canal来说,canal有server和client,都需手动支配,如果你有100台canal节点1000个数据库,想想看吧,管理员如何知道哪台机器上跑了哪些库表,新增的任务又放在哪台机来运行。

此外,如果Mysql修正了一个字段,还须要让程序员去机器上看一下那张表是如何修正的,相应的所有下贱都需相应的完成表构造修正之后, 才能跑起来,相应速率非常慢。

目前Kafka connect已经办理了这些问题。
其具备一个非常主要的特性,如果上游数据根据AVRO兼容性进行的修正,connect会不才游同样做一些兼容性的修正,自动变动下贱表构造,减轻了运维包袱。

我们来看看Kafka connect 的架构,Kafka connect会把所有信息存到Kafka 中,个中config topic存元数据,Stutas topic指当前哪些节点正在跑什么样的job,offset topic指当前比如某个Topic的某个partitions到底消费到哪条数据。

WorKer都是无状态的,在上面可以跑许多task,同样一个task1,可能对应5个partitions,如果只给它三个并发,它会分布在三台机器上。
如果一台机器挂了,这些job都会分配到其余两台机器,而且是实时同步的。

瓜子Plugins

瓜子对Kafka connect的很多plugins做了修正。

1. Maxwell

个中我们把canal通过maxwell更换,并且把maxwell做成了Kafka connect的plugin。

原生的Maxwell不支持AVRO,瓜子通过debezium思想对Maxwell进行了修正,使其支持avro格式,并用Mysql管理meta,并且支持Mysql的数据库切换。

2. HDFS

我们采取的是confluence公司的hdfs插件,但是其本身存在很多问题,比如写入hive的时候会把当做partition的列也写到主表数据中,虽然不影响hive的利用,但是影响presto读取hive,这里我们改了源码,去掉了主表中的这些列。

Hdfs在插件重启时会去hdfs中读取所有文件来确定从哪个offset连续,这里会有两个问题:耗时太长,切换集群时offset无法接续,我们也对他做了修正。

plugin写入hive时支持用Kafka的timestamp做分区,也支持用数据内的某些列做分区,但是不支持两者同时用,我们也修正了一下。

3. HBase

Hbase的plugin只支持最原始的导出,我们会有些分外的需求,比如对rowkey自定义一下,常日mysql主键是自增ID,hbase不推举用自增ID做rowkey,我们会有reverse的需求,还有多列联合做rowkey的需求等,这个我们也改了源码,支持通过配置自定义rowkey天生。

原始plugin不支持kerberos,而我们online hbase是带权限的,以是也改了一下

还有一些小功能,比如把所有类型都先转成string再存,支持delete,支持json等。

4. KUDU

我们对kudu的利用很多,kudu开源的plugin有些bug,我们创造后也fix了一下。

Kudu的数据来源都是mysql,但是常常会有mysql刷库的情形,这时量就会很大,kudu sink会有较大的延时,我们改了一下plugin,添加了自适应流量掌握,自动扩充成多线程处理,也会在流量小时,自动缩容。

瓜子数据库的Data Pipeline

瓜子的数据仓库完备是基于Kafka、AVRO的构造化数据来做的。
数据源非常多,须要将多个业务线的几千张表同步到数仓,对外供应做事。

全体数据仓库采取Lambda架构,分为T+1的存量处理和T+0.1的增量处理两个流程。

先说T+1的存量处理部分,目前瓜子将所有mysql表通过Maxwell插件放到Kafka中,再通过Kafka connect写到Hbase里,Hbase每天晚上做一次snapshot(快照),写到hive中,然后经由多轮ETL:DWB-->DWD-->DW-->DM,末了将DM层数据导入Kudu中,对外供应BI剖析做事,当然离线olap剖析还是通过presto直接访问Hive查询。

再说T+0.1的增量流程,同T+1一样,数据通过maxwell进入Kafka,这部分流程共用,然后增量数据会实时通过kudu的插件写入kudu中,再通过impala做ETL,天生数据对外供应T+0.1的查询,对外供应的查询是通过另一套impala来做的。
未来我们还会考虑通过Flink直接读取Kafka中数据来做实时ETL,提高实时性。

这是我们数仓架构的整体架构图

点击链接,免费试用DataPipeline↓↓↓