Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything -


after restarting kafka connect s3 sink task, restarted writing way beginning of topic , wrote duplicate copies of older records. in other words, kafka connect seemed lose place.

so, imagine kafka connect stores current offset position information in internal connect-offsets topic. topic empty presume part of problem.

the other 2 internal topics connect-statuses , connect-configs not empty. connect-statuses has 52 entries. connect-configs has 6 entries; 3 each of 2 sink connectors have configured: connector-<name>, task-<name>-0, commit-<name>.

i manually created internal kafka connect topics specified in docs before running this:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact /usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact /usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact 

i can verify connect-offsets topic seems created correctly:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets topic:connect-offsets   partitioncount:50   replicationfactor:3 configs:cleanup.policy=compact     topic: connect-offsets  partition: 0    leader: 1   replicas: 1,2,3 isr: 1,2,3     topic: connect-offsets  partition: 1    leader: 2   replicas: 2,3,1 isr: 2,3,1     topic: connect-offsets  partition: 2    leader: 3   replicas: 3,1,2 isr: 3,1,2   <snip> 

this 3 server cluster running confluent platform v3.2.1 running kafka 10.2.1.

is connect-offsets supposed empty? why else kafka connect restart @ beginning of topic when restarting task?

update: response randall hauch's answer.

  • explanation regarding source connector offsets vs sink connector offsets explains empty connect-offsets. explanation!
  • i'm not changing connector name.
  • if connector down ~five days , restarted afterwards, there reason connector offset position expire , reset? see __consumer_offsets has cleanup.policy=compact
  • auto.offset.reset should take affect if there no position in __consumer_offsets, right?

i'm using system defaults. sink config json follows. i'm using simple custom partitioner partition on avro datetime field rather wallclock time. feature seems have been added in confluent v3.2.2 won't need custom plugin functionality. i'm hoping skip confluent v3.2.2 , go straight v3.3.0 when available.

{   "name": "my-s3-sink",    "tasks.max": 1,   "topics": "my-topic",   "flush.size": 10000,    "connector.class": "io.confluent.connect.s3.s3sinkconnector",   "storage.class": "io.confluent.connect.s3.storage.s3storage",   "format.class": "io.confluent.connect.s3.format.avro.avroformat",   "schema.generator.class": "io.confluent.connect.storage.hive.schema.timebasedschemagenerator",   "partitioner.class": "mycompany.partitioner.timefieldpartitioner",    "s3.bucket.name": "my-bucket",   "s3.region": "us-west-2",    "partition.field.name": "timestamp",    "locale": "us",   "timezone": "utc",   "path.format": "'year'=yyyy/'month'=mm/'day'=dd/'hour'=hh",    "schema.compatibility": "none",    "key.converter": "io.confluent.connect.avro.avroconverter",   "key.converter.schema.registry.url": "http://localhost:8081",   "value.converter": "io.confluent.connect.avro.avroconverter",   "value.converter.schema.registry.url": "http://localhost:8081" } 

the default offset retention period kafka consumers 24 hours (1440 minutes). if stop connector , therefore make no new commits longer 24 hours offsets expire , start on new consumer when restart. can modify retention period on __consumer_offsets topic using offsets.retention.minutes parameter


Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -