flink sink 2 mysql demo
我們先看一個(gè)自定義sink 的demo步做,將 nc 的數(shù)據(jù)寫入到mysql 中虫溜。
import myflink.learn.model.Student;
import myflink.learn.sink.SinkToMySQL;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author wtx
* @Date 2019/1/23
*/
public class Flink2MysqlDemo {
public static void main(String[] args) throws Exception {
AtomicInteger atomicInteger = new AtomicInteger(0);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// 設(shè)置數(shù)據(jù)源
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
// DataStream<Student> studentDataStream = text.map(new MapFunction<String, Student>() {
// @Override
// public Student map(String s) throws Exception {
// Student student = new Student();
// student.setName(s);
// student.setId(atomicInteger.addAndGet(1));
// return student;
// }
// });
DataStream<Student> studentDataStream = text.map((str) -> {
Student student = new Student();
student.setName(str);
student.setId(atomicInteger.addAndGet(1));
return student;
});
studentDataStream.addSink(new SinkToMySQL());
env.execute();
}
}
@Slf4j
public class SinkToMySQL extends RichSinkFunction<Student> {
PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into student(id, name) values(?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//關(guān)閉連接和釋放資源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
@Override
public void invoke(Student value, Context context) throws Exception {
//組裝數(shù)據(jù),執(zhí)行插入操作
ps.setInt(1, value.getId());
ps.setString(2, value.getName());
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "");
} catch (Exception e) {
log.error("exception e:", e);
}
return con;
}
}
RichSinkFunction 的類結(jié)構(gòu)
可以看到自定義的sink 繼承自RichSinkFunction. 來看 RichSinkFunction 的類結(jié)構(gòu)
/**
* The base interface for all user-defined functions.
*
* <p>This interface is empty in order to allow extending interfaces to
* be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p>
*/
@Public
public interface Function extends java.io.Serializable {
}
public interface SinkFunction<IN> extends Function, Serializable{
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
@Public
interface Context<T> {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
}
在上面的 SinkFunction 接口中實(shí)際只有一個(gè)方法,invoke(),將類型為IN 的value 寫入到sink 中。Context: 寫入value 時(shí)的上下文
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
private transient RuntimeContext runtimeContext;
@Override
public void open(Configuration parameters) throws Exception {}
@Override
public void close() throws Exception {}
}
而在AbstractRichFunction 只有默認(rèn)的生命周期方法 open() 和 close() 的空實(shí)現(xiàn)。 留給我們自己的比如上面的 SinkToMySQL那樣 實(shí)現(xiàn) 對(duì)于mysql 的 open() close() 另外可以類似的實(shí)現(xiàn)對(duì)于redis 的sink 類裳凸。查看flink-connector-redis 發(fā)現(xiàn)已經(jīng)有了RedisSink 類贱鄙。
我們先來看看簡(jiǎn)單的使用:只需要將 new SinkToMySQL() -> new RedisSink
studentDataStream.addSink(new SinkToMySQL());
==>
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
studentDataStream.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisExampleMapper()));
public static final class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "flink");
}
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString();
}
}
回到 RedisSink,也是通過繼承自RichSinkFunction:
public class RedisSink<IN> extends RichSinkFunction<IN> {
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
this.flinkJedisConfigBase = flinkJedisConfigBase;
this.redisSinkMapper = redisSinkMapper;
RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
this.redisCommand = redisCommandDescription.getCommand();
this.additionalKey = redisCommandDescription.getAdditionalKey();
}
}
通過傳入: conf 和 redisSinkMapper 構(gòu)造出來RedisSink,然后 override invoke().
@Override
public void invoke(IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
case PUBLISH:
this.redisCommandsContainer.publish(key, value);
break;
case ZADD:
this.redisCommandsContainer.zadd(this.additionalKey, value, key);
break;
case HSET:
this.redisCommandsContainer.hset(this.additionalKey, key, value);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}
通過上面的 switch 我們知道,flink -> redis 目前只支持8個(gè)最基礎(chǔ)的redisCommand姨谷,要想調(diào)用其他的redisCommand贰逾,目前看還是需要自己實(shí)現(xiàn)。
另外在 RedisSink 中 還Override 了open 和 close 方法實(shí)現(xiàn)了對(duì)于redis 的連接和關(guān)閉菠秒。
RedisSink open() 方法
我們來看看open()
@Override
public void open(Configuration parameters) throws Exception {
this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
}
這里使用 RedisCommandsContainerBuilder 構(gòu)造了一個(gè) redisCommandsContainer疙剑,RedisCommandsContainer 是一個(gè) 接口(The container for all available Redis commands.)
剛好對(duì)應(yīng)上面的 switch (redisCommand)
void hset(String key, String hashField, String value);
void rpush(String listName, String value);
void lpush(String listName, String value);
void sadd(String setName, String value);
void publish(String channelName, String message);
void set(String key, String value);
void pfadd(String key, String element);
void zadd(String key, String score, String element);
void close() throws IOException;
在來看 RedisCommandsContainerBuilder。它通過flinkJedisConfigBase 構(gòu)造出來redisCommandsContainer践叠。其中 FlinkJedisConfigBase 定義了4個(gè)redis 連接時(shí)常用的屬性connectionTimeout言缤,maxTotal,maxIdle禁灼,minIdle
public abstract class FlinkJedisConfigBase implements Serializable {
private static final long serialVersionUID = 1L;
protected final int maxTotal;
protected final int maxIdle;
protected final int minIdle;
protected final int connectionTimeout;
protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative");
this.connectionTimeout = connectionTimeout;
this.maxTotal = maxTotal;
this.maxIdle = maxIdle;
this.minIdle = minIdle;
}
回到RedisCommandsContainerBuilder管挟,可以看到FlinkJedisPoolConfig 的實(shí)現(xiàn)類有3種,對(duì)應(yīng)
FlinkJedisPoolConfig jedis 連接池的方式
FlinkJedisClusterConfig redis cluster 方式
FlinkJedisSentinelConfig redis sentinel 方式
public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
} else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
} else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
}
我們看 jedis pool 的方式,最終生成一個(gè) 由jedisPool 構(gòu)造的 RedisContainer
public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
jedisPoolConfig.getDatabase());
return new RedisContainer(jedisPool);
}
這里的RedisContainer 實(shí)現(xiàn)了 上面 RedisCommandsContainer 接口弄捕,并且通過jedis pool 的方式真正實(shí)現(xiàn)了接口中的hset 等8個(gè)方法僻孝。
同樣的 redis cluster 方式 最終生成的一個(gè) RedisClusterContainer
public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
return new RedisClusterContainer(jedisCluster);
}
同樣: RedisClusterContainer 實(shí)現(xiàn)了 上面 RedisCommandsContainer 接口,并且通過jedis cluster 的方式真正實(shí)現(xiàn)了接口中的hset 等8個(gè)方法守谓。
sink 2 kafka
flink 同樣實(shí)現(xiàn)了 到 kafka 的寫入穿铆,先將 SinkToMySQL 換成 FlinkKafkaProducer
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
text.addSink(new FlinkKafkaProducer<>("flink_2_kafka_demo", new SimpleStringSchema(),
properties));
除了 kafka 所需的 properties 外,還有個(gè) SimpleStringSchema斋荞,按照上面的 Sink2Mysql 和 RedisSink荞雏,我們可以很容易的想到 FlinkKafkaProducer 實(shí)現(xiàn)了 RichSinkFunction,來看 源碼:
public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {
}
sink 2 kafka 比較復(fù)雜,其中 TwoPhaseCommitSinkFunction 除了實(shí)現(xiàn)寫入kafka 消息外平酿,還有 兩階段提交協(xié)議的實(shí)現(xiàn)凤优。 <基本上還是依賴kafka的事務(wù)處理實(shí)現(xiàn)的,下篇文章在詳細(xì)分析>
先來看構(gòu)造函數(shù)
public FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
super(new FlinkKafkaProducer.TransactionStateSerializer(), new FlinkKafkaProducer.ContextStateSerializer());
this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null");
this.schema = checkNotNull(serializationSchema, "serializationSchema is null");
this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
this.flinkKafkaPartitioner = checkNotNull(customPartitioner, "customPartitioner is null").orElse(null);
this.semantic = checkNotNull(semantic, "semantic is null");
this.kafkaProducersPoolSize = kafkaProducersPoolSize;
checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");
ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
ClosureCleaner.ensureSerializable(serializationSchema);
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
}
if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer");
this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
LOG.warn("Property [{}] not specified. Setting it to {}", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
}
// Enable transactionTimeoutWarnings to avoid silent data loss
// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
// The KafkaProducer may not throw an exception if the transaction failed to commit
if (semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long transactionTimeout;
if (object instanceof String && StringUtils.isNumeric((String) object)) {
transactionTimeout = Long.parseLong((String) object);
} else if (object instanceof Number) {
transactionTimeout = ((Number) object).longValue();
} else {
throw new IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
+ " must be numeric, was " + object);
}
super.setTransactionTimeout(transactionTimeout);
super.enableTransactionTimeoutWarnings(0.8);
}
this.topicPartitionsMap = new HashMap<>();
}
構(gòu)造函數(shù)看起來很長(zhǎng)蜈彼,基本都是在給屬性賦值筑辨。其中:
defaultTopicId: kafka的 tpoicId
serializationSchema
producerConfig
customPartitioner
semantic:
kafkaProducersPoolSize: default KafkaProducers pool size
這里面有個(gè) enum Semantic:
public enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}
. Semantic.EXACTLY_ONCE 有且僅有一次
the Flink producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint.
. Semantic.AT_LEAST_ONCE 最少一次
the Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint
代碼量很大,我們先看兩個(gè)簡(jiǎn)單的close和open幸逆,與mysql 和 redis 的close,open相比棍辕,代碼行數(shù)也是比較大的。
close 方法中對(duì)于EXACTLY_ONCE 的Semantic秉颗,首先拿到 currentTransaction痢毒,如果不為空送矩,flush(),對(duì)于AT_LEAST_ONCE 和 NONE 類型的蚕甥,需要手動(dòng)調(diào)用 currentTransaction.producer.close();
然后 將pendingTransactions 的transaction closeQuietly。
@Override
public void close() throws FlinkKafkaException {
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null) {
// to avoid exceptions on aborting transactions with some pending records
flush(currentTransaction);
// normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus
// we need to close it manually
switch (semantic) {
case EXACTLY_ONCE:
break;
case AT_LEAST_ONCE:
case NONE:
currentTransaction.producer.close();
break;
}
}
try {
super.close();
}
catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
}
// make sure we propagate pending errors
checkErroneous();
pendingTransactions().forEach(transaction ->
IOUtils.closeQuietly(transaction.getValue().producer)
);
}
open 方法比較簡(jiǎn)單栋荸,根據(jù)是否logFailuresOnly菇怀,構(gòu)造不同的 Callback,用于在 發(fā)送給kafka消息成功后凭舶,調(diào)用不用的 Callback
@Override
public void open(Configuration configuration) throws Exception {
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
else {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
super.open(configuration);
}
在來看看invoke方法,最終調(diào)用transaction里面的producer 去發(fā)送消息給kafka。 transaction.producer.send(record, callback);
@Override
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
byte[] serializedValue = schema.serializeValue(next);
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
/**
* pendingRecords 是一個(gè)AtomicLong
* private final AtomicLong pendingRecords = new AtomicLong();
*/
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}
里面有個(gè)flinkKafkaPartitioner
public abstract class FlinkKafkaPartitioner<T> implements Serializable{
public void open(int parallelInstanceId, int parallelInstances) {
// overwrite this method if needed.
}
// Determine the id of the partition that the record should be written to
// 決定了record 應(yīng)該寫入到哪個(gè)分區(qū)爱沟。返回該分區(qū)的id
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
目前 flink 中只剩下一個(gè)具體的實(shí)現(xiàn),partitions[parallelInstanceId % partitions.length];
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return partitions[parallelInstanceId % partitions.length];
}
}
總結(jié)
flink 通過 繼承 RichSinkFunction 實(shí)現(xiàn)對(duì)不同存儲(chǔ)的sink帅霜。并且只需要 overide 里面的open,close呼伸,invoke 三個(gè)方法即可