KafkaSpout
1.poutConfig繼承KafkaConfig补君,可以通過(guò)SpoutConfig設(shè)置kafkaSpout基本屬性
spoutConfig.forceFromStart可以設(shè)置不從kafka初始位置消費(fèi),以免重復(fù)消費(fèi)數(shù)據(jù)描孟。
2.Config.TOPOLOGY_MAX_SPOUT_PENDING配置可以動(dòng)態(tài)對(duì)kafka消費(fèi)進(jìn)行限流鳞溉。
EsBolt
1)向Es發(fā)數(shù)據(jù)時(shí)發(fā)生了NullPointerException:
at org.codehaus.jackson.util.TextBuffer.findBuffer(TextBuffer.java:207)
refer
2)用戶自定義esIndex
builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("bolt1");
builder.setBolt("testBolt", new EsBolt("{esIndex}/" + "test", conf), 2).shuffleGrouping("bolt2");
在Bolt1中定義esIndex
public class TimeBasedIndexNameBuilder {
public static String build(String indexPrefix, Date collectTime) {
return indexPrefix + "_" + new SimpleDateFormat("yyyy-MM-dd").format(collectTime);
}
}
String esIndex = TimeBasedIndexNameBuilder.build("agentX", new Date());
如上恍风,EsBolt就會(huì)傳入agentX_2016-10-26/test堪簿,在ES服務(wù)器上生成index:agentX_2016-10-26,type:test
bolt繼承多spout
現(xiàn)有bolt需要接受來(lái)自spout1和spout2的數(shù)據(jù)流喘批,可通過(guò)getSourceComponent來(lái)判斷數(shù)據(jù)流來(lái)自哪個(gè)spout撩荣,然后做進(jìn)一步處理。
//spou1
builder.setSpout("spout1", new Spout1(spoutConfig), 2);
//spou2
builder.setSpout("spout2", new Spout2(), 2);
builder.setBolt("bolt1", new Bolt1(), 2).allGrouping("spout1").shuffleGrouping("spout2");
Bolt1部分代碼如下:
@Override
public void execute(Tuple input) {
//判斷數(shù)據(jù)流來(lái)自Spout1
if(input.getSourceComponent().equals("spout1")) {
...
} else {
...
}
}