項目地址
使用和rocket相似
// producer
public class ProducerExample {
static final String BROKER = "localhost:8989";
static final String TOPIC = "test-topic";
public static void main(String[] args) {
DefaultProducer producer = new DefaultProducer();
producer.start();
producer.addBroker(TOPIC, BROKER);
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setTopic(TOPIC);
msg.setBody(("this is body" + i).getBytes(Charset.forName("UTF-8")));
SendResult result = producer.send(msg);
if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println("success");
} else {
System.out.println("error");
}
}
}
}
// consumer
public class ConsumerExample {
static final String BROKER = "localhost:8989";
static final String TOPIC = "test-topic";
public static void main(String[] args) {
DefaultConsumer consumer = new DefaultConsumer();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(Message msg) {
System.out.println(new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
consumer.addBroker(TOPIC, BROKER);
consumer.subscribe(TOPIC);
}
}
后期準(zhǔn)備做的
- 完善客戶端channel管理秃殉。
- broker分布式姥份。這依賴于客戶端的負載均衡树埠,將topic分片存儲祠丝。由于目前無持久化疾呻,因此使用同步雙寫保證數(shù)據(jù)一致性。
- 數(shù)據(jù)持久化写半。