SparkStreaming從kafka讀取文件流時(Java)饭豹,默認是utf-8的,如果源文件的編碼不是utf-8务漩,那就會出現(xiàn)亂碼現(xiàn)象拄衰,但是kafka的官網參數(shù)里沒有關于編碼的參數(shù),不過kafka的源碼里面是有的饵骨。源碼如下:
class String Encoder(props:VerifiableProperties=null) extends Encoder[String]{
val encoding = if (props==null) "UTF8" else props.getString("serializer.encoding","UTF8")
?根據上面的源碼翘悉,設置serializer.encoding就可以了,經過測試宏悦,設置serializer.encoding確實是有效的镐确。下面是SparkStream從kafka讀取數(shù)據代碼:
...
HashSet_TopicsSet = newHashSet(Arrays.asList("a1_topic","a2_topic"));
Map_KafkaParams = new HashMap();
_KafkaParams.put("metadata.broker.list","dn1:9092,nn0:9092,dn0:9092");
_KafkaParams.put("group.id", "groupid");
_KafkaParams.put("fetch.message.max.bytes","5120000");
_KafkaParams.put("serializer.encoding", "gb18030");
JavaDStream_MessagesLines = KafkaUtils.createDirectStream(_JavaStreamingContext,String.class, String.class,StringDecoder.class,StringDecoder.class,_KafkaParams, _TopicsSet)
.map(newFunction, String>() {
publicString call(Tuple2 tuple2) {
returntuple2._2();
}
});
...