博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka streams的join实例
阅读量:6672 次
发布时间:2019-06-25

本文共 4161 字,大约阅读时间需要 13 分钟。

  hot3.png

本文简单介绍一下kafka streams的join操作

join

A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.

实例

KStreamBuilder builder = new KStreamBuilder();        KStream
left = builder.stream("intpu-left"); KStream
right = builder.stream("intpu-right"); KStream
all = left.selectKey((key, value) -> value.split(",")[1]) .join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner
() { @Override public String apply(String value1, String value2) { return value1 + "--" + value2; } }, JoinWindows.of(30000)); all.print();

由于join操作是根据key来,所以通常一般要再次映射一下key

测试

sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-leftsh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-right

左边输入诸如

1,a2,b3,c3,c4,d1,a2,b3,c1,a2,b3,c4,e5,h6,f7,g

右边输入诸如

a,hellob,worldc,hehehec,aaad,eeea,ccccb,aaaaaac,332435a,ddddb,2324c,ddddde,23453h,2222222f,0o0o0o0g,ssss

输出实例

[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd[KSTREAM-MERGE-0000000014]: b , 2,b--b,23242017-10-17 22:17:34.578  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed2017-10-17 22:17:34.578  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_02017-10-17 22:17:34.585  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_1

join类别

这里使用的是inner join,也有left join,也有outer join。如果要记录在时间窗口没有匹配上的记录,可以使用outer join,额外存储下来,然后再根据已经匹配的记录再过滤一次。

输出实例

[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd[KSTREAM-MERGE-0000000014]: f , 4,f--f,ddddddd2017-10-17 22:31:12.530  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed2017-10-17 22:31:12.530  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_02017-10-17 22:31:12.531  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_12017-10-17 22:31:12.531  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 1_02017-10-17 22:31:12.531  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_22017-10-17 22:31:12.533  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 1_12017-10-17 22:31:12.533  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 2_02017-10-17 22:31:12.539  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 1_22017-10-17 22:31:12.540  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 2_12017-10-17 22:31:12.541  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 2_2[KSTREAM-MERGE-0000000014]: g , 5,g--null[KSTREAM-MERGE-0000000014]: h , 6,h--null[KSTREAM-MERGE-0000000014]: h , 6,h--h,ddddddd

小结

kafka streams的join操作,非常适合不同数据源的实时匹配操作。

转载于:https://my.oschina.net/go4it/blog/1552165

你可能感兴趣的文章
轻量函数式 JavaScript:一、为什么要进行函数式编程?
查看>>
替代SSD?Crossbar进军中国存储市场
查看>>
云基础设施建设第二季度继续保持增长
查看>>
【1971-2050 计算革命简史】从摩尔定律到“消失”的计算机
查看>>
5G 第五代移动通信系统你知多少?
查看>>
导致硬盘录像机卡死的十大原因分析
查看>>
大数据广告技术公司WindyVale获百万美元投资
查看>>
《Servlet和JSP学习指南》一2.3 cookie
查看>>
乘风破浪,抚州“智慧”之旅扬帆起航
查看>>
《Hadoop实战第2版》——1.6节Hadoop数据管理
查看>>
大型数据中心100GbE部署将大增
查看>>
专访田渊栋 | Torch升级版PyTorch开源,Python为先,强GPU加速
查看>>
别再鼓吹神通广大的黑客了 只有务实才能让高管和董事会加大网络安全投入
查看>>
Wi-Fi频谱的未来 有望突破传统频谱
查看>>
【首发】OpsWorld大会主题分享《抽丝剥茧之MySQL疑难杂症排查》
查看>>
《Lua游戏开发实践指南》一3.3本章小结
查看>>
《Android程序设计》一3.2 活动、意图和任务
查看>>
2016 机器学习之路:一年从无到有掌握机器学习
查看>>
红杉计越:AI、大数据、SaaS、云计算为何在中国一体迸发?
查看>>
阿里张勇:数据驱动的透明是平台治理的基础
查看>>