scala - Spark Structured Streaming not restarting at Kafka offsets -
we have long running spark structured streaming query reading kafka, , query pick left off after restart. however, have set startingoffsets
"earliest
" , see after restarts query reads start of kafka topic again.
our basic query looks this:
val extract = sparksession .readstream .format("kafka") .option("kafka.bootstrap.servers", "server:port") .option("subscribe", "topic") .option("startingoffsets", "earliest") .load() val query: streamingquery = extract .writestream .option("checkpointlocation", s"/tmp/checkpoint/kafka/") .foreach(writer) .start()
we see checkpoint directory getting created correctly , offsets expect in offset files.
when restart see message like:
25-07-2017 14:35:32 info consumercoordinator:231 - setting newly assigned partitions [kafkatopic-2, kafkatopic-1, kafkatopic-0, kafkatopic-3] group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver
we telling query start @ "earliest
" documentation says:
this applies when new streaming query started, , resuming pick query left off.
shouldn't mean restarting our application results in query resuming left off?
setting "group.id
" kafka not allowed spark structured streaming. see this: note following kafka params cannot set , kafka source throw exception.
i tried adding queryname
, in case being used identify query across runs did not have effect.
we using spark 2.1 on yarn.
any ideas on why not work or doing wrong?
update logs:
Comments
Post a Comment