Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything -
after restarting kafka connect s3 sink task, restarted writing way beginning of topic , wrote duplicate copies of older records. in other words, kafka connect seemed lose place.
so, imagine kafka connect stores current offset position information in internal connect-offsets
topic. topic empty presume part of problem.
the other 2 internal topics connect-statuses
, connect-configs
not empty. connect-statuses
has 52 entries. connect-configs
has 6 entries; 3 each of 2 sink connectors have configured: connector-<name>
, task-<name>-0
, commit-<name>
.
i manually created internal kafka connect topics specified in docs before running this:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact /usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact /usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
i can verify connect-offsets
topic seems created correctly:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets topic:connect-offsets partitioncount:50 replicationfactor:3 configs:cleanup.policy=compact topic: connect-offsets partition: 0 leader: 1 replicas: 1,2,3 isr: 1,2,3 topic: connect-offsets partition: 1 leader: 2 replicas: 2,3,1 isr: 2,3,1 topic: connect-offsets partition: 2 leader: 3 replicas: 3,1,2 isr: 3,1,2 <snip>
this 3 server cluster running confluent platform v3.2.1 running kafka 10.2.1.
is connect-offsets
supposed empty? why else kafka connect restart @ beginning of topic when restarting task?
update: response randall hauch's answer.
- explanation regarding source connector offsets vs sink connector offsets explains empty
connect-offsets
. explanation! - i'm not changing connector name.
- if connector down ~five days , restarted afterwards, there reason connector offset position expire , reset? see
__consumer_offsets
hascleanup.policy=compact
auto.offset.reset
should take affect if there no position in__consumer_offsets
, right?
i'm using system defaults. sink config json follows. i'm using simple custom partitioner partition on avro datetime field rather wallclock time. feature seems have been added in confluent v3.2.2 won't need custom plugin functionality. i'm hoping skip confluent v3.2.2 , go straight v3.3.0 when available.
{ "name": "my-s3-sink", "tasks.max": 1, "topics": "my-topic", "flush.size": 10000, "connector.class": "io.confluent.connect.s3.s3sinkconnector", "storage.class": "io.confluent.connect.s3.storage.s3storage", "format.class": "io.confluent.connect.s3.format.avro.avroformat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.timebasedschemagenerator", "partitioner.class": "mycompany.partitioner.timefieldpartitioner", "s3.bucket.name": "my-bucket", "s3.region": "us-west-2", "partition.field.name": "timestamp", "locale": "us", "timezone": "utc", "path.format": "'year'=yyyy/'month'=mm/'day'=dd/'hour'=hh", "schema.compatibility": "none", "key.converter": "io.confluent.connect.avro.avroconverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.avroconverter", "value.converter.schema.registry.url": "http://localhost:8081" }
the default offset retention period kafka consumers 24 hours (1440 minutes). if stop connector , therefore make no new commits longer 24 hours offsets expire , start on new consumer when restart. can modify retention period on __consumer_offsets
topic using offsets.retention.minutes
parameter
Comments
Post a Comment