序
本文简单介绍一下kafka streams的join操作
join
A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.
实例
KStreamBuilder builder = new KStreamBuilder(); KStreamleft = builder.stream("intpu-left"); KStream right = builder.stream("intpu-right"); KStream all = left.selectKey((key, value) -> value.split(",")[1]) .join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner () { @Override public String apply(String value1, String value2) { return value1 + "--" + value2; } }, JoinWindows.of(30000)); all.print();
由于join操作是根据key来,所以通常一般要再次映射一下key
测试
sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-leftsh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-right
左边输入诸如
1,a2,b3,c3,c4,d1,a2,b3,c1,a2,b3,c4,e5,h6,f7,g
右边输入诸如
a,hellob,worldc,hehehec,aaad,eeea,ccccb,aaaaaac,332435a,ddddb,2324c,ddddde,23453h,2222222f,0o0o0o0g,ssss
输出实例
[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd[KSTREAM-MERGE-0000000014]: b , 2,b--b,23242017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_02017-10-17 22:17:34.585 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1
join类别
这里使用的是inner join,也有left join,也有outer join。如果要记录在时间窗口没有匹配上的记录,可以使用outer join,额外存储下来,然后再根据已经匹配的记录再过滤一次。
输出实例
[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd[KSTREAM-MERGE-0000000014]: f , 4,f--f,ddddddd2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_02017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_12017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_02017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_22017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_12017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_02017-10-17 22:31:12.539 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_22017-10-17 22:31:12.540 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_12017-10-17 22:31:12.541 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_2[KSTREAM-MERGE-0000000014]: g , 5,g--null[KSTREAM-MERGE-0000000014]: h , 6,h--null[KSTREAM-MERGE-0000000014]: h , 6,h--h,ddddddd
小结
kafka streams的join操作,非常适合不同数据源的实时匹配操作。