重要的參數(shù)
partition.duration.ms
connector會將數(shù)據(jù)按時間劃分文件夾峰伙,比如這個參數(shù)設為1800000
扣囊,意味著每半小時為一個topic新開一個文件夾。
partitioner.class & path.format
默認值partition.class=TimeBasedPartitioner.class & path.format=YYYY/MM/dd/HH:mm:ss
在這個默認值下爽锥,文件路徑大概會像這樣如下字符串绪颖,由時間+topic+partition+offset組成:
/topics/{topicName}+3+0000000000.json
timezone
默認使用UTC+0時間锚贱,沒有特殊需求盡量不要管動它
timestamp.extractor & timestamp.field
如何獲取記錄的時間的配置,有以下幾種配置:
- Record: 默認值吆寨,使用Kafka原本的消息時間
- Wallclock:connector處理到這條消息的系統(tǒng)當前時間
- RecordField:使用Record內(nèi)某個字段作為時間戳赏淌,配合timestamp.field使用。不建議此方法啄清,容易因為沒有相關(guān)字段直接拋出ConnectorException異常
- 其它:自提供一個TimestampExtractor的實現(xiàn)類六水,需要給出類的全名。
flush.size
& rotate.interval.ms
最多多少條記錄提交一次文件辣卒,以及最多多長時間提交一次文件掷贾,二者滿足其一就會創(chuàng)建一個新的s3文件出來。
s3.part.retries
& s3.retry.backoff.ms
寫入s3的失敗重試次數(shù)荣茫,以及每次失敗后的等待時長想帅。
其它參數(shù)
aws.access.key.id
: s3用戶名
aws.secret.access.key
:s3密碼
store.url
: 存儲的s3地址
s3.bucket.name
: 存儲的s3 bucket
tasks.max
:最多幾個task
format.class
:數(shù)據(jù)以什么形式寫入s3,有JsonFormat \ ByteArrayFormat \ AvroFormat等啡莉,可以通過實現(xiàn)io.confluent.connect.storage.format.Format
接口來自定義想要的格式和內(nèi)容港准。