如何在YDB中消费KAFKA的数据

如何在YDB中消费KAFKA的数据

一、     通过Kafka实时导入数据

1.     Kafka配置的注意点

第一,kafka安装的好坏,直接影响kafka的稳定性,关于kafka的配置要点 请一定要参考改篇文章
http://ycloud.net.cn/file/201705/356.html
YDB中只用来消费kafka的数据如果想了解如何向kafka里生产数据,请参考kafka的官方
http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

第二:在ydb_site.yaml中添加如下的配置,并更改相关连接参数

ydb.reader.list: "default,filesplit,kafka_json"

#如果您要使用其他的消息中间件,可以自定义reader,默认为kafka的实现
ydb.reader.read.class.kafka_json: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

#如果您的数据格式是非标准的JSON格式,可以自定义parser,默认为按照JSON方式解析
ydb.reader.parser.class.kafka_json: "cn.net.ycloud.ydb.server.reader.JsonParser"

kafka.topic.kafka_json: "kafkaydb"
kafka.group.kafka_json: "kafkaydb_group"
bootstrap.servers.kafka_json: "192.168.3.2:6667"

 

2.     重启YDB,10分钟后会开始导入数据

3.     Kafka导入的数据格式如下

 

{"tablename":"ydbexample","ydbpartion":"20151005","data":{"indexnum":4,"label":"l_4","userage":14,"clickcount":4,"paymoney":4.132,"price":4.12,"content":"4 4 4 延云 ydb 延云  测试  中文分词 中华人民共和国 沈阳延云云计算技术有限公司","contentcjk":"4 4 4 延云 ydb 延云  测试  中文分词 中华人民共和国 沈阳延云云计算技术有限公司"}}

 

默认的Kafka导入数据只支持Json格式,如果需要支持其他格式,需要自己通过javaParser.

 

 

 

 

4.     Kafka模式实时导入数据,为什么会有重复数据

YDB能确保从Kafka消费到的数据0丢失,但是由于Kafka的实现机制,以下情况会导致出现重复数据

具体原理可以参考这篇文章

http://www.iteblog.com/pdf/1716

进程异常退出(主动kill或者进程BUG等原因导致)

Kafka采用commitoffset的方式提交数据,由于此时会存在数据已经消费,但是Kafkaoffset没有来得及提交,这样会导致数据重复。

KafkaRebalancing

由于进程退出,或者首次启动时,会产生一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumerpartition的关系重新分配。也就会发生了rebalancing

如果一个消息已经被消费了,但是还没有提交offset,就开始了Rebalancing,这个时候会造成数据的重复。这个在Kafka里已经积累了一部分数据后的首次启动时最为明显。

如果仅仅发生了一个进程异常退出,但是没有导致Rebalancing,那么最多重复的数据条数就是这个进程还没有来得及提交的部分。

如果发生了Rebalancing(进程异常退出也会导致Rebalancing,那么则要按全部没有来得及提交的线程数来计算。

 

5.     Kafka模式对数据可靠性的几种配置

ydb.realtime.kafka.commit.intervel用来控制Kafkaoffset commit频率,每次commit也会导致binglog(wal)同步,所以该值一般要小于等于ydb.realtime.binlog.sync.intervel的频率

 

 

尽量减少进程重启 导致的数据重复的配置(每个线程32条重复数据)

ydb.realtime.kafka.commit.intervel: 32

ydb.realtime.binlog.sync.intervel: 1024

 

 

尽量增加吞吐量的配置, 可能有重复(每个线程1024条重复数据)

 

ydb.realtime.kafka.commit.intervel: 1024

ydb.realtime.binlog.sync.intervel: 2048

6.     多个Kafka Topic一起消费

ydb.reader.list: "default,filesplit,kafka_json,kafka_json2,kafka_json3"

 

##kafka_json##

ydb.reader.read.class.kafka_json: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

ydb.reader.parser.class.kafka_json: "cn.net.ycloud.ydb.server.reader.JsonParser"

kafka.topic.kafka_json: "a961"

kafka.group.kafka_json: "bn961n_groupv1_kafka_json"

bootstrap.servers.kafka_json: "192.168.3.2:6667"

 

##kafka_json2##

ydb.reader.read.class.kafka_json2: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

ydb.reader.parser.class.kafka_json2: "cn.net.ycloud.ydb.server.reader.JsonParser"

kafka.topic.kafka_json2: "b961"

kafka.group.kafka_json2: "bn961n_groupv1_kafka_json2"

bootstrap.servers.kafka_json2: "192.168.3.2:6667"

 

##kafka_json3##

ydb.reader.read.class.kafka_json3: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

ydb.reader.parser.class.kafka_json3: "cn.net.ycloud.ydb.server.reader.JsonParser"

kafka.topic.kafka_json3: "c961"

kafka.group.kafka_json3: "bn961n_groupv1_kafka_json3"

bootstrap.servers.kafka_json3: "192.168.3.2:6667"