一、參數(shù)調(diào)優(yōu)
//batch.size當(dāng)批量的數(shù)據(jù)大小達到設(shè)定值后,就會立即發(fā)送,不顧下面的linger.ms
properties.put("batch.size", /16384 * 100/ 1024 * 128); //16384是默認(rèn)值 16k 改成128k
properties.put("linger.ms", 10);//延遲1ms發(fā)送,這項設(shè)置將通過增加小的延遲來完成--即就漾,不是立即發(fā)送一條記錄,producer將會等待給定的延遲時間以允許其他消息記錄發(fā)送
以上兩個參數(shù)是配置batch提交策略念搬,緩存滿提交OR定時flash
properties.put("buffer.memory", 128 * 1024 * 1024 ); //默認(rèn)值為:33554432合計為32M 修改為128M
properties.put("max.request.size",10 * 1024 * 1024); //每次發(fā)送給Kafka服務(wù)器請求的最大大小 默認(rèn)是1M 調(diào)整為10M
二抑堡、修改sink分區(qū)策略
自定義分區(qū)器
public class RandomKafkaPartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private static final ThreadLocalRandom ran = ThreadLocalRandom.current();
public RandomKafkaPartitioner() {
}
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
return partitions[this.ran.nextInt(1000000) % partitions.length];
}
public boolean equals(Object o) {
return this == o || o instanceof RandomKafkaPartitioner;
}
public int hashCode() {
return RandomKafkaPartitioner.class.hashCode();
}
}
創(chuàng)建Sink 算子
new FlinkKafkaProducer011<>("LOGWRITTER_OTHER_TEST",
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
propertiesResult,
Optional.of(new RandomKafkaPartitioner<>()),
FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE,
10)