vortex是一款輕量級的分布式流式計算框架。vortex中文意為旋渦蚌父,代表著數(shù)據(jù)流不斷地流入這個旋渦然后被平穩(wěn)地輸出。
vortex屬于內(nèi)存計算型的流式框架毛萌,適用于高可用苟弛,高并發(fā),實時計算的業(yè)務(wù)場景阁将。
vortex是基于SpringBoot框架之上開發(fā)的膏秫,它依賴微服務(wù)分布式協(xié)作框架tridenter實現(xiàn)集群特性,vortex微服務(wù)內(nèi)嵌了獨立的TCP服務(wù)器(默認(rèn)通過Netty4實現(xiàn))做盅,vortex微服務(wù)集群中的應(yīng)用程序通過tridenter多播功能相互發(fā)現(xiàn)并建立長連接缤削,實現(xiàn)高可用,去中心化和負(fù)載均衡吹榴,使得整個Spring應(yīng)用程序集群具備實時計算的能力亭敢。
vortex項目一共包含3部分:
- vortex-common
vortex框架的agent端jar包 - vortex-spring-boot-starter
vortex框架的核心jar包,添加到SpringBoot應(yīng)用使其成為vortex服務(wù)端 - vortex-metrics
基于vortex的分布式時序計算框架图筹,它是vortex重要的獨立子項目
服務(wù)端安裝
<dependency>
<groupId>com.github.paganini2008.atlantis</groupId>
<artifactId>vortex-spring-boot-starter</artifactId>
<version>1.0-RC3</version>
</dependency>
agent端安裝
<dependency>
<groupId>com.github.paganini2008.atlantis</groupId>
<artifactId>vortex-common</artifactId>
<version>1.0-RC3</version>
</dependency>
目前基于vortex框架的開源項目有3個:
- 分布式微服務(wù)監(jiān)控系統(tǒng) Jellyfish
- 分布式時序計算框架 Vortex Metrics
- 分布式網(wǎng)絡(luò)爬蟲Greenfinger
如何在你的應(yīng)用中使用vortex的API
前面說過帅刀,vortex服務(wù)端接收數(shù)據(jù),vortex agent端發(fā)送數(shù)據(jù),vortex提供了HTTP和TCP兩種協(xié)議來接收和發(fā)送外部數(shù)據(jù)劝篷。
- vortex服務(wù)端要實現(xiàn)Handler接口實現(xiàn)定制哨鸭,比如:
@Slf4j
public class TestHandler implements Handler{
@Override
public void onData(Tuple tuple) {
log.info(tuple.toString());
}
}
- 而agent端通過TransportClient實現(xiàn)類來發(fā)送數(shù)據(jù)
下面以jellyfish中日志收集模塊為例民宿,參考源碼:
服務(wù)端:
public class Slf4jHandler implements Handler {
private static final String TOPIC_NAME = "slf4j";
@Autowired
private IdGenerator idGenerator;
@Autowired
private LogEntryService logEntryService;
@Value("${atlantis.framework.jellyfish.handler.interferedCharacter:}")
private String interferedCharacterRegex;
@Override
public void onData(Tuple tuple) {
LogEntry logEntry = new LogEntry();
logEntry.setId(idGenerator.generateId());
logEntry.setClusterName(tuple.getField("clusterName", String.class));
logEntry.setApplicationName(tuple.getField("applicationName", String.class));
logEntry.setHost(tuple.getField("host", String.class));
logEntry.setIdentifier(tuple.getField("identifier", String.class));
logEntry.setLoggerName(tuple.getField("loggerName", String.class));
logEntry.setMessage(tuple.getField("message", String.class));
logEntry.setLevel(tuple.getField("level", String.class));
logEntry.setReason(tuple.getField("reason", String.class));
logEntry.setMarker(tuple.getField("marker", String.class));
logEntry.setCreateTime(tuple.getField("timestamp", Long.class));
if (StringUtils.isNotBlank(interferedCharacterRegex)) {
logEntry.setMessage(logEntry.getMessage().replaceAll(interferedCharacterRegex, ""));
logEntry.setReason(logEntry.getReason().replaceAll(interferedCharacterRegex, ""));
}
logEntryService.bulkSaveLogEntries(logEntry);
}
@Override
public String getTopic() {
return TOPIC_NAME;
}
}
Agent端, 你需要自己實現(xiàn)一個Agent端, 向vortex服務(wù)端不斷發(fā)送數(shù)據(jù)娇妓,可參考jellyfish-slf4j的TransportClientAppenderBase.java源碼:
@Override
protected void append(ILoggingEvent eventObject) {
if (transportClient == null) {
return;
}
Tuple tuple = Tuple.newOne(GLOBAL_TOPIC_NAME);
tuple.setField("clusterName", clusterName);
tuple.setField("applicationName", applicationName);
tuple.setField("host", host);
tuple.setField("identifier", identifier);
tuple.setField("loggerName", eventObject.getLoggerName());
String msg = eventObject.getFormattedMessage();
tuple.setField("message", msg);
tuple.setField("level", eventObject.getLevel().toString());
String reason = ThrowableProxyUtil.asString(eventObject.getThrowableProxy());
tuple.setField("reason", reason);
tuple.setField("marker", eventObject.getMarker() != null ? eventObject.getMarker().getName() : "");
tuple.setField("timestamp", eventObject.getTimeStamp());
Map<String, String> mdc = eventObject.getMDCPropertyMap();
if (MapUtils.isNotEmpty(mdc)) {
tuple.append(mdc);
}
transportClient.write(tuple);
}
說明一下,vortex服務(wù)端和agent端的交互數(shù)據(jù)可以是Map, Tuple對象或json字符串活鹰,但最終都被包裝成Tuple對象
具體使用哈恰,可參考vortex的源碼: https://github.com/paganini2008/vortex.git