kafka connect 常見(jiàn)問(wèn)題

ERROR WorkerSourceTask{id=mysql-source-binlog-336-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:252)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3900896 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

查看 kafka connect的日志 INFO ProducerConfig values中max.request.size 的值

mysql-source-binlog-336-jobId-dbhistory? 的max.request.siz改掉了。但是數(shù)據(jù)的max.request.siz沒(méi)改依然報(bào)錯(cuò)

connector-producer-mysql-source-binlog-336-jobId-0?

參考

https://github.com/confluentinc/cp-docker-images/issues/445

加入?yún)?shù)

"database.history.producer.max.request.size": "157286400",

"max.request.size": "157286400"

到source中解決

---------------------------------------------------------------------------------------------------------------------

{"code":500,"data":null,"msg":"java.lang.IllegalArgumentException: URI is not absolute"}

等待下次再次出現(xiàn)通過(guò) http://elk-ops.iqdnet.cn/ 查看日志分析诞帐,當(dāng)前日志只保留4天庙曙,無(wú)法查看

---------------------------------------------------------------------------------------------------------------------

[2019-11-14 14:10:10,064] ERROR WorkerSourceTask{id=mysql-source-binlog-336-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:281)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:309)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql-source-binlog-336-jobId.slow_log.slow_log :

? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:281)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

? ? ? ? ... 11 more

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"mysql_source_binlog_336_jobId.slow_log.slow_log","fields":[{"name":"id","type":"string"}],"connect.name":"mysql_source_binlog_336_jobId.slow_log.slow_log.Key"}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)

? ? ? ? at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)

? ? ? ? at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)

? ? ? ? at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)

? ? ? ? at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)

? ? ? ? at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)

? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:281)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:281)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:309)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

查看SchemaRegister日志沒(méi)有發(fā)現(xiàn)明顯錯(cuò)誤陡舅,看到hostname不通

---------------------------------------------------------------------------------------------------------------------

[2019-11-21 16:32:16,681] ERROR Error during binlog processing. Last offset stored = {ts_sec=1574325000, file=mysqld-bin.008559, pos=419190460, row=1, server_

id=463306, event=3}, binlog reader near position = mysqld-bin.008559/430041542 (io.debezium.connector.mysql.BinlogReader:1054)

[2019-11-21 16:32:16,681] ERROR Failed due to error: Error processing binlog event (io.debezium.connector.mysql.BinlogReader:209)

org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-jobId:0 using brokers at null: truncate table mysql.slow_log

? ? ? ? at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)

? ? ? ? at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)

? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-jobId:0 using brokers at null: truncate table mysql.slow_log

? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:361)

? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)

? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)

? ? ? ? ... 5 more

Caused by: io.debezium.relational.history.DatabaseHistoryException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation

? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:198)

? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:66)

? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:60)

? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:356)

? ? ? ? ... 7 more

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation

? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)

? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)

? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)

? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:188)

? ? ? ? ... 10 more

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation? ? ? ? ? ? ?

---------------------------------------------------------------------------------------------------------------------

ERROR WorkerSourceTask{id=mysql-source-binlog-358-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.conn

ect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master; the first event 'mysqld-

bin.000060' at 910723425, the last event read from '/data/binlog/qdpp/mysqld-bin.000060' at 123, the last byte read from '/data/binlog/qdpp/mysqld-bin.000060'

at 910723444. Error code: 1236; SQLSTATE: HY000.

? ? ? ? at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)

? ? ? ? at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)

? ? ? ? at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1041)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

Caused by: com.github.shyiko.mysql.binlog.network.ServerException: log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master; the fir

st event 'mysqld-bin.000060' at 910723425, the last event read from '/data/binlog/qdpp/mysqld-bin.000060' at 123, the last byte read from '/data/binlog/qdpp/m

ysqld-bin.000060' at 910723444.

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:914)

? ? ? ? ... 3 more?

單個(gè)事物日志超過(guò)max_allowed_packet配置的限制大小

同時(shí)Error code: 1236代表BINLOG不存在

---------------------------------------------------------------------------------------------------------------------

ERROR WorkerSinkTask{id=elasticsearch-sink-binlog-364-taskId-0} Task threw an uncaught and unrecoverable exception. Task is being ki

lled and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)

org.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out

? ? ? ? at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:284)

? ? ? ? at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:290)

? ? ? ? at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:255)

? ? ? ? at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

可能是ES重啟導(dǎo)致甜癞,需要手動(dòng)重啟任務(wù)

參考,說(shuō)的是長(zhǎng)時(shí)間不使用斷開(kāi)鏈接

https://github.com/confluentinc/kafka-connect-elasticsearch/pull/349

添加參數(shù)

max.connection.idle.time.ms

---------------------------------------------------------------------------------------------------------------------

[2019-11-25 20:27:55,399] ERROR WorkerSourceTask{id=mysql-source-binlog-333-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.conn

ect.runtime.WorkerTask:179)

org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-j

obId:0 using brokers at null: SAVEPOINT `SAVEPOINT_1`

? ? ? ? at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)

? ? ? ? at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)

? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)

? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-b

inlog-333-jobId:0 using brokers at null: SAVEPOINT `SAVEPOINT_1`

? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:361)

? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)

? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)

? ? ? ? ... 5 more

Caused by: io.debezium.relational.history.DatabaseHistoryException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartit

ionException: This server is not the leader for that topic-partition.

? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:198)

? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:66)

? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:60)

? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:356)

? ? ? ? ... 7 more

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topi

c-partition.

? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)

? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)

? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)

? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:188)

? ? ? ? ... 10 more

Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

由于kafka調(diào)整參數(shù)重啟悯蝉,leader切換導(dǎo)致的問(wèn)題骗卜。

---------------------------------------------------------------------------------------------------------------------

查看partition情況

./bin/kafka-topics? --bootstrap-server? 10.37.251.101:9092 --topic mysql-source-binlog-324-jobId.longfor_mdm.cp_role_user_relation --describe

./bin/kafka-topics? --bootstrap-server? kafka-platform-01.qiandingyun.com:9092 --topic mysql-source-binlog-337-jobId.databus.ads_fenxiao_hehuoren_detail --describe

發(fā)現(xiàn)分區(qū)數(shù)為5

查看offest

./bin/kafka-consumer-groups --bootstrap-server 10.37.251.101:9092 --describe --group mysql-source-binlog-324-jobId.longfor_mdm.cp_role_user_relation

---------------------------------------------------------------------------------------------------------------------

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409? ?

存在相同版本的schema,直接刪除舊的

http://10.50.6.54:8081/subjects/mysql-source-binlog-336-jobId.slow_log.slow_log-value/versions/1

---------------------------------------------------------------------------------------------------------------------

[2019-12-04 17:05:03,719] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

[2019-12-04 17:05:04,721] WARN [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(_schemas-0 -> (currentLeaderEpoch=Optional[1], leaderEpoch=0)) (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to 0 was disconnected before the response was read

? ? ? ? at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)

? ? ? ? at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)

? ? ? ? at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)

? ? ? ? at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)

? ? ? ? at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)

? ? ? ? at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

? ? ? ? at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)

[2019-12-04 17:05:04,722] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

---------------------------------------------------------------------------------------------------------------------

JDBC source mysql source不加任務(wù)時(shí)區(qū)參數(shù)震庭,time類(lèi)型小于8點(diǎn)瑰抵,可能導(dǎo)致了-8小時(shí)為負(fù)數(shù)導(dǎo)致報(bào)錯(cuò)

Caused by: org.apache.kafka.connect.errors.DataException: Kafka Connect Time type should not have any date fields set to non-zero values.

? ? ? ? at org.apache.kafka.connect.data.Time.fromLogical(Time.java:64)

? ? ? ? at io.confluent.connect.avro.AvroData$7.convert(AvroData.java:287)

? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)

? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)

? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)

? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

? ? ? ? ... 11 more?

source參數(shù)加上時(shí)區(qū)參數(shù)

?serverTimezone=Asia/Shanghai

"db.timezone":"Asia/Shanghai"

后貌似Date類(lèi)型-8小時(shí)導(dǎo)致時(shí)分秒不為零報(bào)錯(cuò)

Caused by: org.apache.kafka.connect.errors.DataException: Kafka Connect Date type should not have any time fields set to non-zero values.

? ? ? ? at org.apache.kafka.connect.data.Date.fromLogical(Date.java:64)

? ? ? ? at io.confluent.connect.avro.AvroData$6.convert(AvroData.java:276)

? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)

? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)

? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)

? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

? ? ? ? ... 11 more? ?

---------------------------------------------------------------------------------------------------------------------

1.

kafka在重啟后,server.log 一直報(bào)WARN

INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas2-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)

[2019-12-23 16:10:08,438] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: java.io.IOException: Connection to 0 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)

WARN [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(_schemas2-0 -> (currentLeaderEpoch=Optional[1], leaderEpoch=0)) (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to 0 was disconnected before the response was read

? ? ? ? at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)

? ? ? ? at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)

? ? ? ? at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)

? ? ? ? at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)

? ? ? ? at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)

? ? ? ? at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

? ? ? ? at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)?

通過(guò)命令

./bin/kafka-topics --describe --zookeeper 10.37.251.222:2181 --topic _schemas2

Topic:_schemas2 PartitionCount:1? ? ? ? ReplicationFactor:2? ? Configs:cleanup.policy=compact

? ? ? ? Topic: _schemas2? ? ? ? Partition: 0? ? Leader: 0? ? ? Replicas: 1001,0? ? ? ? Isr: 0

指定優(yōu)先選擇副本作為leader

指定partions的leader

{

"partitions":

? [

? ? {"topic":"_schemas2","partition":0}

? ]

}

./bin/kafka-preferred-replica-election.sh --bootstrap-server 10.37.251.101:9092 --path-to-json-file /tmp/kafka-preferred-replica-election.json

指定replicas級(jí)別leader指定

執(zhí)行

{

"version":1,

"partitions":

? [

? ? {"topic":"_schemas2","partition":0,"replicas":[1001]}

? ]

}

./bin/kafka-reassign-partitions.sh --zookeeper 10.37.251.222:2181? --reassignment-json-file /tmp/reassign-plan.json --execute

驗(yàn)證

./bin/kafka-reassign-partitions.sh --zookeeper 10.37.251.222:2181 --reassignment-json-file /tmp/reassign-plan.json --verify

再次驗(yàn)證

./bin/kafka-topics.sh --zookeeper 10.37.251.222:2181 --describe --topic _schemas2

查詢(xún)zk信息

./bin/zkCli.sh -server 10.37.251.222:2181

修改了brlkers/ids下的[1001,0]為[1001]

修改后部分topic的leader變?yōu)?1器联,手動(dòng)修改指定leader

產(chǎn)生0這個(gè)broker原因是中途打開(kāi)broker.id的配置屬性配置為0導(dǎo)致的二汛,雖然新配置還原重啟但是造成了新broker存在的假象。最終建議加一個(gè)0節(jié)點(diǎn)和1節(jié)點(diǎn)應(yīng)該也能用了

重啟kafka WARN不再報(bào)拨拓,但是kfaka的數(shù)據(jù)丟失同步任務(wù)目的地少數(shù)據(jù)肴颊。

set /brokers/topics/_schemas2/partitions/0/state {"controller_epoch":2,"leader":1001,"version":1,"leader_epoch":6,"isr":[1001]}

set /brokers/topics/mysql-source-binlog-318-jobId.qdp_rosetta.rst_task_problem/partitions/1/state {"controller_epoch":2,"leader":1001,"version":1,"leader_epoch":1,"isr":[1001]}

多個(gè)副本進(jìn)入Isr只有l(wèi)eader可被讀取

通過(guò)對(duì)比qa環(huán)境的schema的topic狀態(tài)發(fā)現(xiàn) dev的Replicas為2

./bin/kafka-topics --describe --zookeeper 10.37.253.31:2181 --topic _schemas

Topic:_schemas? PartitionCount:1? ? ? ? ReplicationFactor:3? ? Configs:cleanup.policy=compact

? ? ? ? Topic: _schemas Partition: 0? ? Leader: 0? ? ? Replicas: 0,1,2 Isr: 0,1?

_schemas2數(shù)據(jù)出現(xiàn)丟失,leader不可用

2.

產(chǎn)生的關(guān)聯(lián)問(wèn)題

重啟kafka后發(fā)現(xiàn)任務(wù)報(bào)錯(cuò)渣磷。_schema中存在mysql-source-binlog-318-jobId.qdp_rosetta.rst_task_item-value 但是ID是421,也就是根據(jù)ID取出schema沒(méi)拿到婿着。但是為什么會(huì)取錯(cuò)ID為403

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mysql-source-binlog-318-jobId.qdp_rosetta.rst_task to Avro:

? ? ? ? at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)

? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

? ? ? ? ... 13 more? ? ? ? ? ? ? ? ? ? ?

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 403

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)

? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)? ?

---------------------------------------------------------------------------------------------------------------------

poll()方法return后沒(méi)有返回,kafkaconnect的日志報(bào)WARN

[2019-12-20 09:54:20,153] WARN [Producer clientId=connector-producer-mysql-source-jdbc-333-jobId-0] Error while fetching metadata with correlation id 3 : {mysql-source-jdbc-333-jobId.devds.=LEADER_NOT_AVAILABLE}

(org.apache.kafka.clients.NetworkClient:1051)

---------------------------------------------------------------------------------------------------------------------

jdbc sink自動(dòng)建表問(wèn)題

無(wú)法識(shí)別mysql字段類(lèi)型的長(zhǎng)度值,始終使用默認(rèn)值設(shè)定長(zhǎng)度幸海。并且像year(2018)類(lèi)型最終映射成為date(2018-01-01)落地不符合要求

---------------------------------------------------------------------------------------------------------------------

org.apache.kafka.connect.errors.ConnectException: query may not be combined with whole-table copying settings.

jdbc source 中的sql 存在 join 語(yǔ)句不允許使用"table.whitelist" 屬性祟身,因?yàn)槭褂昧?table.whitelist"導(dǎo)致對(duì)應(yīng)的topic 的 schema沒(méi)有創(chuàng)建成功,導(dǎo)致數(shù)據(jù)無(wú)法進(jìn)入kafka而且不報(bào)錯(cuò)

必須topic.prefix屬性指定完整的命名 mysql-source-jdbc-443-jobId.devds.user

---------------------------------------------------------------------------------------------------------------------

數(shù)據(jù)同步少數(shù)物独,停止同步不報(bào)錯(cuò)

[2020-01-08 15:43:11,124] ERROR WorkerSourceTask{id=mysql-source-binlog-353-jobId-0} Failed to flush, timed out while waiting for producer to flush outstanding 3 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)

[2020-01-08 15:43:11,124] ERROR WorkerSourceTask{id=mysql-source-binlog-353-jobId-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

---------------------------------------------------------------------------------------------------------------------

[2020-01-09 14:11:12,933] ERROR WorkerSourceTask{id=mysql-source-binlog-360-jobId-0} Failed to flush, timed out while waiting for producer to flush outstanding 3050 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)

[2020-01-09 14:11:12,949] ERROR WorkerSourceTask{id=mysql-source-binlog-360-jobId-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

---------------------------------------------------------------------------------------------------------------------

[2020-01-10 16:12:17,935] ERROR WorkerSinkTask{id=mysql-sink-binlog-485-taskId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)

org.apache.kafka.common.errors.WakeupException

? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:490)

? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)

? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)

? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)

? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:693)

? ? ? ? at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454)

? ? ? ? at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1412)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:332)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:360)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:431)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

---------------------------------------------------------------------------------------------------------------------

org.apache.kafka.connect.errors.ConnectException: PK mode for table 'users' is RECORD_KEY, but record key schema is missing

配置錯(cuò)誤

"pk.mode": "record_key",改為"pk.mode": "record_value"

---------------------------------------------------------------------------------------------------------------------

Failed due to error: Aborting snapshot due to error when last running 'SELECT * FROM `qding_brick`.`bd_person_addr`': Can''t call rollback when autocommit=true (io.debezium.connector.mysql.SnapshotReader:20

9)

---------------------------------------------------------------------------------------------------------------------

[2020-01-14 01:56:33,803] ERROR WorkerSinkTask{id=mysql-sink-binlog-501-taskId-0} Commit of offsets threw an unexpected exception for sequence number 4899: {mysql-source-binlog-371-jobId.qdp_rosetta.rst_task_item-0=OffsetAndMetadata{offse

t=17870259, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)

org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.

Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.

[2020-01-14 01:56:34,053] INFO [Consumer clientId=connector-consumer-mysql-sink-binlog-501-taskId-0, groupId=connect-mysql-sink-binlog-501-taskId] Discovered group coordinator 10.50.6.53:9092 (id: 2147483644 rack: null) (org.apache.kafka.

clients.consumer.internals.AbstractCoordinator:728)

---------------------------------------------------------------------------------------------------------------------

debezium做snapshot時(shí)使用jdbc查詢(xún)tinyint(1)將大與0的數(shù)變?yōu)榱?返回

設(shè)置這個(gè)參數(shù)解決

"database.tinyInt1isBit":"false"

---------------------------------------------------------------------------------------------------------------------

A slave with the same server_uuid/server_id as this slave has connected to the master

不明原因袜硫,task重啟解決了,google查詢(xún)懷疑和mysql服務(wù)有關(guān)系

---------------------------------------------------------------------------------------------------------------------

Caused by: io.debezium.text.ParsingException: no viable alternative at input

直接拋棄解析不了的sql "database.history.skip.unparseable.ddl": "true"

搜索可以升級(jí)到1.1.0.Beta1解決該bug

---------------------------------------------------------------------------------------------------------------------

[2020-03-12 12:42:28,414] ERROR WorkerSinkTask{id=mysql-sink-binlog-536-taskId-0} Commit of offsets threw an unexpected exception for sequence number 56: {mysql-source-binlog-378-jobId.qdp_rosetta.rst_task_item-0=OffsetAndMetadata{offset=19974922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)

max.poll.records

max.poll.interval.ms

offset.flush.timeout.ms=10000

---------------------------------------------------------------------------------------------------------------------

No producer is available. Ensure that 'start()' is called before storing database history records.

---------------------------------------------------------------------------------------------------------------------

Failed to flush, timed out while waiting for producer to flush outstanding 14581 messages

---------------------------------------------------------------------------------------------------------------------

? ERROR WorkerSinkTask{id=mysql-sink-binlog-498-taskId-0} Commit of offsets threw an unexpected exception for sequence number 604: {mysql-source-binlog-366-jobId.qdp_rosetta.rst_task-0=OffsetAndMetadata{offset=264911, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挡篓,一起剝皮案震驚了整個(gè)濱河市婉陷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌官研,老刑警劉巖秽澳,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異戏羽,居然都是意外死亡担神,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)始花,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)妄讯,“玉大人,你說(shuō)我怎么就攤上這事酷宵『ッ常” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵浇垦,是天一觀(guān)的道長(zhǎng)炕置。 經(jīng)常有香客問(wèn)我,道長(zhǎng),這世上最難降的妖魔是什么朴摊? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任默垄,我火速辦了婚禮,結(jié)果婚禮上仍劈,老公的妹妹穿的比我還像新娘厕倍。我一直安慰自己寡壮,他們只是感情好贩疙,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著况既,像睡著了一般这溅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上棒仍,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天悲靴,我揣著相機(jī)與錄音,去河邊找鬼莫其。 笑死癞尚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的乱陡。 我是一名探鬼主播浇揩,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼憨颠!你這毒婦竟也來(lái)了胳徽?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤爽彤,失蹤者是張志新(化名)和其女友劉穎养盗,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體适篙,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡往核,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嚷节。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片聂儒。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖丹喻,靈堂內(nèi)的尸體忽然破棺而出薄货,到底是詐尸還是另有隱情,我是刑警寧澤碍论,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布谅猾,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏税娜。R本人自食惡果不足惜坐搔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望敬矩。 院中可真熱鬧概行,春花似錦、人聲如沸弧岳。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)禽炬。三九已至涧卵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間腹尖,已是汗流浹背柳恐。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留热幔,地道東北人乐设。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像绎巨,于是被迫代替她去往敵國(guó)和親近尚。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容