Kafka Streams - Explain the reason why KTable and its associated Store only get updated every 30 seconds -
i have simple ktable definition generates store:
ktable<string, jsonnode> table = kstreambuilder.<string, jsonnode>table(orders_topic, orders_store); table.print();
i publish messages orders_topic store isn't updated until every 30 seconds. log there message committing because 30000ms time has elapsed:
2017-07-25 23:53:15.465 debug 17540 --- [ streamthread-1] o.a.k.c.consumer.internals.fetcher : sending fetch partitions [orders-0] broker exprf026.suministrador:9092 (id: 0 rack: null) 2017-07-25 23:53:15.567 info 17540 --- [ streamthread-1] o.a.k.s.p.internals.streamthread : stream-thread [streamthread-1] committing tasks because commit interval 30000ms has elapsed 2017-07-25 23:53:15.567 info 17540 --- [ streamthread-1] o.a.k.s.p.internals.streamthread : stream-thread [streamthread-1] committing task streamtask 0_0 2017-07-25 23:53:15.567 debug 17540 --- [ streamthread-1] o.a.k.s.processor.internals.streamtask : task [0_0] committing state 2017-07-25 23:53:15.567 debug 17540 --- [ streamthread-1] o.a.k.s.p.i.processorstatemanager : task [0_0] flushing stores registered in state manager f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec {"uid":"string","productid":0,"orderid":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"orderplaced","state":"pending_product_reservation"} [ktable-source-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productid":0,"orderid":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"orderplaced","state":"pending_product_reservation"}<-null) 2017-07-25 23:53:15.569 debug 17540 --- [ streamthread-1] o.a.k.s.state.internals.threadcache : thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-streamthread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1 2017-07-25 23:53:15.576 debug 17540 --- [ streamthread-1] o.a.k.s.p.internals.recordcollectorimpl : task [0_0] flushing producer
i found property controls commit.interval.ms
:
props.put(streamsconfig.commit_interval_ms_config, 10);
why set 30000ms default (sounds long time) , implications of changing 10ms?
if instead of ktable work kstream...
kstream<string, jsonnode> kstream = kstreambuilder.stream(orders_topic); kstream.print();
...i can see messages right away, without having wait 30000ms, why difference?
it's related memory management in particular, ktable
caches: http://docs.confluent.io/current/streams/developer-guide.html#memory-management
ktable
updated time , if use "interactive queries" access underlying state store, can each update immediately. however, ktable
cache buffers updates reduce downstream load , each time commit triggered, cache needs flushed downstream avoid data loss in case if failure. if cache size small, might see downstream records if key get's evicted cache.
about commit interval: in general, commit interval set relatively large value, reduce commit load on brokers.
Comments
Post a Comment