ERROR WorkerSourceTask{id=mysql-source-binlog-336-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:252)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3900896 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
查看 kafka connect的日志 INFO ProducerConfig values中max.request.size 的值
mysql-source-binlog-336-jobId-dbhistory? 的max.request.siz改掉了。但是數(shù)據(jù)的max.request.siz沒(méi)改依然報(bào)錯(cuò)
connector-producer-mysql-source-binlog-336-jobId-0?
參考
https://github.com/confluentinc/cp-docker-images/issues/445
加入?yún)?shù)
"database.history.producer.max.request.size": "157286400",
"max.request.size": "157286400"
到source中解決
---------------------------------------------------------------------------------------------------------------------
{"code":500,"data":null,"msg":"java.lang.IllegalArgumentException: URI is not absolute"}
等待下次再次出現(xiàn)通過(guò) http://elk-ops.iqdnet.cn/ 查看日志分析诞帐,當(dāng)前日志只保留4天庙曙,無(wú)法查看
---------------------------------------------------------------------------------------------------------------------
[2019-11-14 14:10:10,064] ERROR WorkerSourceTask{id=mysql-source-binlog-336-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:281)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:309)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql-source-binlog-336-jobId.slow_log.slow_log :
? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:281)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
? ? ? ? ... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"mysql_source_binlog_336_jobId.slow_log.slow_log","fields":[{"name":"id","type":"string"}],"connect.name":"mysql_source_binlog_336_jobId.slow_log.slow_log.Key"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
? ? ? ? at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
? ? ? ? at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
? ? ? ? at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
? ? ? ? at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
? ? ? ? at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)
? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:281)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:281)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:309)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
查看SchemaRegister日志沒(méi)有發(fā)現(xiàn)明顯錯(cuò)誤陡舅,看到hostname不通
---------------------------------------------------------------------------------------------------------------------
[2019-11-21 16:32:16,681] ERROR Error during binlog processing. Last offset stored = {ts_sec=1574325000, file=mysqld-bin.008559, pos=419190460, row=1, server_
id=463306, event=3}, binlog reader near position = mysqld-bin.008559/430041542 (io.debezium.connector.mysql.BinlogReader:1054)
[2019-11-21 16:32:16,681] ERROR Failed due to error: Error processing binlog event (io.debezium.connector.mysql.BinlogReader:209)
org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-jobId:0 using brokers at null: truncate table mysql.slow_log
? ? ? ? at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
? ? ? ? at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)
? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-jobId:0 using brokers at null: truncate table mysql.slow_log
? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:361)
? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)
? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)
? ? ? ? ... 5 more
Caused by: io.debezium.relational.history.DatabaseHistoryException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation
? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:198)
? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:66)
? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:60)
? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:356)
? ? ? ? ... 7 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation
? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:188)
? ? ? ? ... 10 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for dbhistory.mysql-source-binlog-333-jobId-0:120001 ms has passed since batch creation? ? ? ? ? ? ?
---------------------------------------------------------------------------------------------------------------------
ERROR WorkerSourceTask{id=mysql-source-binlog-358-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.conn
ect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master; the first event 'mysqld-
bin.000060' at 910723425, the last event read from '/data/binlog/qdpp/mysqld-bin.000060' at 123, the last byte read from '/data/binlog/qdpp/mysqld-bin.000060'
at 910723444. Error code: 1236; SQLSTATE: HY000.
? ? ? ? at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
? ? ? ? at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)
? ? ? ? at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1041)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: log event entry exceeded max_allowed_packet; Increase max_allowed_packet on master; the fir
st event 'mysqld-bin.000060' at 910723425, the last event read from '/data/binlog/qdpp/mysqld-bin.000060' at 123, the last byte read from '/data/binlog/qdpp/m
ysqld-bin.000060' at 910723444.
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:914)
? ? ? ? ... 3 more?
單個(gè)事物日志超過(guò)max_allowed_packet配置的限制大小
同時(shí)Error code: 1236代表BINLOG不存在
---------------------------------------------------------------------------------------------------------------------
ERROR WorkerSinkTask{id=elasticsearch-sink-binlog-364-taskId-0} Task threw an uncaught and unrecoverable exception. Task is being ki
lled and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
org.apache.kafka.connect.errors.ConnectException: java.net.SocketTimeoutException: Read timed out
? ? ? ? at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.indexExists(JestElasticsearchClient.java:284)
? ? ? ? at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:290)
? ? ? ? at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:255)
? ? ? ? at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
可能是ES重啟導(dǎo)致甜癞,需要手動(dòng)重啟任務(wù)
參考,說(shuō)的是長(zhǎng)時(shí)間不使用斷開(kāi)鏈接
https://github.com/confluentinc/kafka-connect-elasticsearch/pull/349
添加參數(shù)
max.connection.idle.time.ms
---------------------------------------------------------------------------------------------------------------------
[2019-11-25 20:27:55,399] ERROR WorkerSourceTask{id=mysql-source-binlog-333-jobId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.conn
ect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-binlog-333-j
obId:0 using brokers at null: SAVEPOINT `SAVEPOINT_1`
? ? ? ? at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
? ? ? ? at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:208)
? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:508)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1095)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:943)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)
? ? ? ? at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error recording the DDL statement(s) in the database history Kafka topic dbhistory.mysql-source-b
inlog-333-jobId:0 using brokers at null: SAVEPOINT `SAVEPOINT_1`
? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:361)
? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:694)
? ? ? ? at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:492)
? ? ? ? ... 5 more
Caused by: io.debezium.relational.history.DatabaseHistoryException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartit
ionException: This server is not the leader for that topic-partition.
? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:198)
? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:66)
? ? ? ? at io.debezium.relational.history.AbstractDatabaseHistory.record(AbstractDatabaseHistory.java:60)
? ? ? ? at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:356)
? ? ? ? ... 7 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topi
c-partition.
? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
? ? ? ? at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
? ? ? ? at io.debezium.relational.history.KafkaDatabaseHistory.storeRecord(KafkaDatabaseHistory.java:188)
? ? ? ? ... 10 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
由于kafka調(diào)整參數(shù)重啟悯蝉,leader切換導(dǎo)致的問(wèn)題骗卜。
---------------------------------------------------------------------------------------------------------------------
查看partition情況
./bin/kafka-topics? --bootstrap-server? 10.37.251.101:9092 --topic mysql-source-binlog-324-jobId.longfor_mdm.cp_role_user_relation --describe
./bin/kafka-topics? --bootstrap-server? kafka-platform-01.qiandingyun.com:9092 --topic mysql-source-binlog-337-jobId.databus.ads_fenxiao_hehuoren_detail --describe
發(fā)現(xiàn)分區(qū)數(shù)為5
查看offest
./bin/kafka-consumer-groups --bootstrap-server 10.37.251.101:9092 --describe --group mysql-source-binlog-324-jobId.longfor_mdm.cp_role_user_relation
---------------------------------------------------------------------------------------------------------------------
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409? ?
存在相同版本的schema,直接刪除舊的
http://10.50.6.54:8081/subjects/mysql-source-binlog-336-jobId.slow_log.slow_log-value/versions/1
---------------------------------------------------------------------------------------------------------------------
[2019-12-04 17:05:03,719] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
[2019-12-04 17:05:04,721] WARN [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(_schemas-0 -> (currentLeaderEpoch=Optional[1], leaderEpoch=0)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 0 was disconnected before the response was read
? ? ? ? at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
? ? ? ? at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
? ? ? ? at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)
? ? ? ? at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)
? ? ? ? at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)
? ? ? ? at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
? ? ? ? at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
[2019-12-04 17:05:04,722] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
---------------------------------------------------------------------------------------------------------------------
JDBC source mysql source不加任務(wù)時(shí)區(qū)參數(shù)震庭,time類(lèi)型小于8點(diǎn)瑰抵,可能導(dǎo)致了-8小時(shí)為負(fù)數(shù)導(dǎo)致報(bào)錯(cuò)
Caused by: org.apache.kafka.connect.errors.DataException: Kafka Connect Time type should not have any date fields set to non-zero values.
? ? ? ? at org.apache.kafka.connect.data.Time.fromLogical(Time.java:64)
? ? ? ? at io.confluent.connect.avro.AvroData$7.convert(AvroData.java:287)
? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)
? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)
? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)
? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
? ? ? ? ... 11 more?
source參數(shù)加上時(shí)區(qū)參數(shù)
?serverTimezone=Asia/Shanghai
"db.timezone":"Asia/Shanghai"
后貌似Date類(lèi)型-8小時(shí)導(dǎo)致時(shí)分秒不為零報(bào)錯(cuò)
Caused by: org.apache.kafka.connect.errors.DataException: Kafka Connect Date type should not have any time fields set to non-zero values.
? ? ? ? at org.apache.kafka.connect.data.Date.fromLogical(Date.java:64)
? ? ? ? at io.confluent.connect.avro.AvroData$6.convert(AvroData.java:276)
? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)
? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)
? ? ? ? at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)
? ? ? ? at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:284)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
? ? ? ? ... 11 more? ?
---------------------------------------------------------------------------------------------------------------------
1.
kafka在重啟后,server.log 一直報(bào)WARN
INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Retrying leaderEpoch request for partition _schemas2-0 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
[2019-12-23 16:10:08,438] INFO [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: java.io.IOException: Connection to 0 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
WARN [ReplicaFetcher replicaId=1001, leaderId=0, fetcherId=0] Error when sending leader epoch request for Map(_schemas2-0 -> (currentLeaderEpoch=Optional[1], leaderEpoch=0)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 0 was disconnected before the response was read
? ? ? ? at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
? ? ? ? at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
? ? ? ? at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)
? ? ? ? at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)
? ? ? ? at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)
? ? ? ? at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
? ? ? ? at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)?
通過(guò)命令
./bin/kafka-topics --describe --zookeeper 10.37.251.222:2181 --topic _schemas2
Topic:_schemas2 PartitionCount:1? ? ? ? ReplicationFactor:2? ? Configs:cleanup.policy=compact
? ? ? ? Topic: _schemas2? ? ? ? Partition: 0? ? Leader: 0? ? ? Replicas: 1001,0? ? ? ? Isr: 0
指定優(yōu)先選擇副本作為leader
指定partions的leader
{
"partitions":
? [
? ? {"topic":"_schemas2","partition":0}
? ]
}
./bin/kafka-preferred-replica-election.sh --bootstrap-server 10.37.251.101:9092 --path-to-json-file /tmp/kafka-preferred-replica-election.json
指定replicas級(jí)別leader指定
執(zhí)行
{
"version":1,
"partitions":
? [
? ? {"topic":"_schemas2","partition":0,"replicas":[1001]}
? ]
}
./bin/kafka-reassign-partitions.sh --zookeeper 10.37.251.222:2181? --reassignment-json-file /tmp/reassign-plan.json --execute
驗(yàn)證
./bin/kafka-reassign-partitions.sh --zookeeper 10.37.251.222:2181 --reassignment-json-file /tmp/reassign-plan.json --verify
再次驗(yàn)證
./bin/kafka-topics.sh --zookeeper 10.37.251.222:2181 --describe --topic _schemas2
查詢(xún)zk信息
./bin/zkCli.sh -server 10.37.251.222:2181
修改了brlkers/ids下的[1001,0]為[1001]
修改后部分topic的leader變?yōu)?1器联,手動(dòng)修改指定leader
產(chǎn)生0這個(gè)broker原因是中途打開(kāi)broker.id的配置屬性配置為0導(dǎo)致的二汛,雖然新配置還原重啟但是造成了新broker存在的假象。最終建議加一個(gè)0節(jié)點(diǎn)和1節(jié)點(diǎn)應(yīng)該也能用了
重啟kafka WARN不再報(bào)拨拓,但是kfaka的數(shù)據(jù)丟失同步任務(wù)目的地少數(shù)據(jù)肴颊。
set /brokers/topics/_schemas2/partitions/0/state {"controller_epoch":2,"leader":1001,"version":1,"leader_epoch":6,"isr":[1001]}
set /brokers/topics/mysql-source-binlog-318-jobId.qdp_rosetta.rst_task_problem/partitions/1/state {"controller_epoch":2,"leader":1001,"version":1,"leader_epoch":1,"isr":[1001]}
多個(gè)副本進(jìn)入Isr只有l(wèi)eader可被讀取
通過(guò)對(duì)比qa環(huán)境的schema的topic狀態(tài)發(fā)現(xiàn) dev的Replicas為2
./bin/kafka-topics --describe --zookeeper 10.37.253.31:2181 --topic _schemas
Topic:_schemas? PartitionCount:1? ? ? ? ReplicationFactor:3? ? Configs:cleanup.policy=compact
? ? ? ? Topic: _schemas Partition: 0? ? Leader: 0? ? ? Replicas: 0,1,2 Isr: 0,1?
_schemas2數(shù)據(jù)出現(xiàn)丟失,leader不可用
2.
產(chǎn)生的關(guān)聯(lián)問(wèn)題
重啟kafka后發(fā)現(xiàn)任務(wù)報(bào)錯(cuò)渣磷。_schema中存在mysql-source-binlog-318-jobId.qdp_rosetta.rst_task_item-value 但是ID是421,也就是根據(jù)ID取出schema沒(méi)拿到婿着。但是為什么會(huì)取錯(cuò)ID為403
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mysql-source-binlog-318-jobId.qdp_rosetta.rst_task to Avro:
? ? ? ? at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
? ? ? ? at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
? ? ? ? ... 13 more? ? ? ? ? ? ? ? ? ? ?
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 403
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
? ? ? ? at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)? ?
---------------------------------------------------------------------------------------------------------------------
poll()方法return后沒(méi)有返回,kafkaconnect的日志報(bào)WARN
[2019-12-20 09:54:20,153] WARN [Producer clientId=connector-producer-mysql-source-jdbc-333-jobId-0] Error while fetching metadata with correlation id 3 : {mysql-source-jdbc-333-jobId.devds.=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient:1051)
---------------------------------------------------------------------------------------------------------------------
jdbc sink自動(dòng)建表問(wèn)題
無(wú)法識(shí)別mysql字段類(lèi)型的長(zhǎng)度值,始終使用默認(rèn)值設(shè)定長(zhǎng)度幸海。并且像year(2018)類(lèi)型最終映射成為date(2018-01-01)落地不符合要求
---------------------------------------------------------------------------------------------------------------------
org.apache.kafka.connect.errors.ConnectException: query may not be combined with whole-table copying settings.
jdbc source 中的sql 存在 join 語(yǔ)句不允許使用"table.whitelist" 屬性祟身,因?yàn)槭褂昧?table.whitelist"導(dǎo)致對(duì)應(yīng)的topic 的 schema沒(méi)有創(chuàng)建成功,導(dǎo)致數(shù)據(jù)無(wú)法進(jìn)入kafka而且不報(bào)錯(cuò)
必須topic.prefix屬性指定完整的命名 mysql-source-jdbc-443-jobId.devds.user
---------------------------------------------------------------------------------------------------------------------
數(shù)據(jù)同步少數(shù)物独,停止同步不報(bào)錯(cuò)
[2020-01-08 15:43:11,124] ERROR WorkerSourceTask{id=mysql-source-binlog-353-jobId-0} Failed to flush, timed out while waiting for producer to flush outstanding 3 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
[2020-01-08 15:43:11,124] ERROR WorkerSourceTask{id=mysql-source-binlog-353-jobId-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)
---------------------------------------------------------------------------------------------------------------------
[2020-01-09 14:11:12,933] ERROR WorkerSourceTask{id=mysql-source-binlog-360-jobId-0} Failed to flush, timed out while waiting for producer to flush outstanding 3050 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:431)
[2020-01-09 14:11:12,949] ERROR WorkerSourceTask{id=mysql-source-binlog-360-jobId-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)
---------------------------------------------------------------------------------------------------------------------
[2020-01-10 16:12:17,935] ERROR WorkerSinkTask{id=mysql-sink-binlog-485-taskId-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.common.errors.WakeupException
? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:490)
? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
? ? ? ? at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:693)
? ? ? ? at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454)
? ? ? ? at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1412)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:332)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:360)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:431)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
? ? ? ? at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
? ? ? ? at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
? ? ? ? at java.util.concurrent.FutureTask.run(FutureTask.java:266)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
? ? ? ? at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
? ? ? ? at java.lang.Thread.run(Thread.java:748)
---------------------------------------------------------------------------------------------------------------------
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'users' is RECORD_KEY, but record key schema is missing
配置錯(cuò)誤
"pk.mode": "record_key",改為"pk.mode": "record_value"
---------------------------------------------------------------------------------------------------------------------
Failed due to error: Aborting snapshot due to error when last running 'SELECT * FROM `qding_brick`.`bd_person_addr`': Can''t call rollback when autocommit=true (io.debezium.connector.mysql.SnapshotReader:20
9)
---------------------------------------------------------------------------------------------------------------------
[2020-01-14 01:56:33,803] ERROR WorkerSinkTask{id=mysql-sink-binlog-501-taskId-0} Commit of offsets threw an unexpected exception for sequence number 4899: {mysql-source-binlog-371-jobId.qdp_rosetta.rst_task_item-0=OffsetAndMetadata{offse
t=17870259, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.TimeoutException: The request timed out.
[2020-01-14 01:56:34,053] INFO [Consumer clientId=connector-consumer-mysql-sink-binlog-501-taskId-0, groupId=connect-mysql-sink-binlog-501-taskId] Discovered group coordinator 10.50.6.53:9092 (id: 2147483644 rack: null) (org.apache.kafka.
clients.consumer.internals.AbstractCoordinator:728)
---------------------------------------------------------------------------------------------------------------------
debezium做snapshot時(shí)使用jdbc查詢(xún)tinyint(1)將大與0的數(shù)變?yōu)榱?返回
設(shè)置這個(gè)參數(shù)解決
"database.tinyInt1isBit":"false"
---------------------------------------------------------------------------------------------------------------------
A slave with the same server_uuid/server_id as this slave has connected to the master
不明原因袜硫,task重啟解決了,google查詢(xún)懷疑和mysql服務(wù)有關(guān)系
---------------------------------------------------------------------------------------------------------------------
Caused by: io.debezium.text.ParsingException: no viable alternative at input
直接拋棄解析不了的sql "database.history.skip.unparseable.ddl": "true"
搜索可以升級(jí)到1.1.0.Beta1解決該bug
---------------------------------------------------------------------------------------------------------------------
[2020-03-12 12:42:28,414] ERROR WorkerSinkTask{id=mysql-sink-binlog-536-taskId-0} Commit of offsets threw an unexpected exception for sequence number 56: {mysql-source-binlog-378-jobId.qdp_rosetta.rst_task_item-0=OffsetAndMetadata{offset=19974922, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)
max.poll.records
max.poll.interval.ms
offset.flush.timeout.ms=10000
---------------------------------------------------------------------------------------------------------------------
No producer is available. Ensure that 'start()' is called before storing database history records.
---------------------------------------------------------------------------------------------------------------------
Failed to flush, timed out while waiting for producer to flush outstanding 14581 messages
---------------------------------------------------------------------------------------------------------------------
? ERROR WorkerSinkTask{id=mysql-sink-binlog-498-taskId-0} Commit of offsets threw an unexpected exception for sequence number 604: {mysql-source-binlog-366-jobId.qdp_rosetta.rst_task-0=OffsetAndMetadata{offset=264911, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:259)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.