Flink默認提供了很多開箱即用的連接器,比如與Kafka、RabbitMQ础淤、HDFS、ElasticSearch等對接的連接器电禀。還有一些不那么常用的連接器則由Apache Bahir項目(官網(wǎng)很簡陋,見這里)來提供笤休,其中就包含Redis Sink尖飞。這個項目的文檔有點缺乏,本文先記錄一下用法店雅。
引入如下Maven依賴政基。目前bahir-flink項目比較停滯,最新版本是1.1-SNAPSHOT底洗。
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_${scala.bin.version}</artifactId>
<version>${bahir.version}</version>
<scope>compile</scope>
</dependency>
以最常見的單機Redis情景來討論腋么,該插件提供的核心類有三個,分別是:
- FlinkJedisPoolConfig類:Jedis連接池的相關參數(shù)亥揖;
- RedisMapper接口:從用戶數(shù)據(jù)中提取鍵值,并構成Redis命令的映射器,需要用戶自己實現(xiàn)费变;
- RedisSink類:根據(jù)構建好的FlinkJedisPoolConfig和RedisMapper將流數(shù)據(jù)寫入Redis摧扇。
先生成一個FlinkJedisPoolConfig實例。
// 這個叫ParameterUtil的類是自己寫的挚歧,專門用來讀有占位符的屬性文件扛稽,看官勿誤會
// 當然也可以直接寫明文,不過放在屬性文件里方便管理滑负,還能按Maven profile作區(qū)分
Properties redisProps = ParameterUtil.getFromResourceFile("redis.properties");
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost(redisProps.getProperty("redis.host"))
.setPort(NumberUtils.createInteger(redisProps.getProperty("redis.port")))
.setPassword(redisProps.getProperty("redis.pass", ""))
.setDatabase(NumberUtils.createInteger(redisProps.getProperty("redis.db")))
.build();
接下來就寫一個RedisMapper的實現(xiàn)類在张,它負責將窗口統(tǒng)計出來的PV和UV數(shù)據(jù)以JSON形式表示。一點都不難矮慕。
public static final class RedisWindowPvUvMapper
implements RedisMapper<WindowedPvUvResult> {
// 被統(tǒng)計的對象類別帮匾,當參數(shù)傳進來
private String itemType;
public RedisStringMapper(String itemType) {
this.itemType = itemType;
}
// 指定命令,這里要寫字符串痴鳄,所以是set
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
// 從POJO構造key
@Override
public String getKeyFromData(WindowedPvUvResult result) {
StringBuilder builder = new StringBuilder("flink:log_pvuv:");
builder.append(result.getWindowEnd());
builder.append("_");
builder.append(itemType);
builder.append("_");
builder.append(result.getItemId());
return builder.toString();
}
// 從POJO構造value
@Override
public String getValueFromData(WindowedPvUvResult result) {
return new JSONObject()
.fluentPut("pv", result.getPv())
.fluentPut("uv", result.getUv())
.toJSONString();
}
}
最后就可以構造RedisSink了瘟斜。
dataStream.addSink(new RedisSink<>(jedisPoolConfig, new RedisWindowPvUvMapper("partner")));
這個Redis連接器簡單易用,但是有兩個地方差強人意:一是不支持設定key的過期時間(TTL)痪寻,二是不支持流水線(pipeline)螺句。在窗口比較稀疏、寫入量沒那么大的情況下橡类,流水線是可有可無的蛇尚,但過期時間還是很重要,所以下面要稍微改造一下顾画。
將項目代碼clone到本地取劫,找到flink-connector-redis項目中的RedisCommand枚舉,加上setex命令亲雪。
SETEX(RedisDataType.STRING),
然后來到RedisCommandsContainer接口勇凭,它其中定義的都是具體的命令邏輯,加上setex()方法的定義义辕。
void setex(String key, int seconds, String value);
RedisCommandsContainer接口有兩個實現(xiàn)類:針對單機的RedisContainer和針對集群的RedisClusterContainer虾标,寫入setex()方法的具體實現(xiàn)。
// RedisContainer.setex()
@Override
public void setex(final String key, final int seconds, final String value) {
Jedis jedis = null;
try {
jedis = getInstance();
jedis.setex(key, seconds, value);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
key, e.getMessage());
}
throw e;
} finally {
releaseInstance(jedis);
}
}
// RedisClusterContainer.setex()
@Override
public void setex(final String key, final int seconds, final String value) {
try {
jedisCluster.setex(key, seconds, value);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command SETEX to key {} error message {}",
key, e.getMessage());
}
throw e;
}
}
有了方法的具體實現(xiàn)灌砖,那么如何接收TTL的參數(shù)呢璧函?回到上面提到過的RedisMapper接口,在其中加上一個獲取TTL秒數(shù)的方法聲明基显。為了方便蘸吓,還可以用default語法提供一個默認值。
default int getExpireSeconds(T data) {
return 0;
}
萬事俱備只欠東風撩幽,來到RedisSink.invoke()方法(之前已經(jīng)講過库继,RichSinkFunction的子類必須實現(xiàn)這個方法)箩艺,加上我們之前寫的東西就可以了,如下宪萄。
@Override
public void invoke(IN input, Context context) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
// 取得過期時間
int expireSec = redisSinkMapper.getExpireSeconds(input);
Optional<String> optAdditionalKey = redisSinkMapper.getAdditionalKey(input);
switch (redisCommand) {
case RPUSH:
this.redisCommandsContainer.rpush(key, value);
break;
case LPUSH:
this.redisCommandsContainer.lpush(key, value);
break;
case SADD:
this.redisCommandsContainer.sadd(key, value);
break;
case SET:
this.redisCommandsContainer.set(key, value);
break;
// 新寫的setex邏輯
case SETEX:
if (expireSec > 0) {
this.redisCommandsContainer.setex(key, expireSec, value);
}
break;
case PFADD:
// ...以下原樣艺谆,略去
}
}
用Maven重新構建、打包并發(fā)布到倉庫就可以用了拜英。在實際應用時静汤,如果需要設定TTL,用戶邏輯中的RedisMapper就可以這樣寫:
public static final class RedisStringMapperWithTTL
implements RedisMapper<WindowedPvUvResult> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SETEX);
}
@Override
public String getKeyFromData(WindowedPvUvResult result) {
// ...
}
@Override
public String getValueFromData(WindowedPvUvResult result) {
// ...
}
@Override
public int getExpireSeconds(WindowedPvUvResult data) {
return 3 * 24 * 60 * 60; // 3天
}
}
}
so easy~