一、前言
上面幾篇文章中,介紹了Influx在Linux和Windows上的使用之后膜楷,本節(jié)開(kāi)始介紹Influx在Java中的使用旭咽,先提供一個(gè)InfluxDB Java API 封裝的工具類,方便大家直接上手使用赌厅。
二穷绵、InfluxDB工具類
2.1 導(dǎo)入依賴包
使用maven工具導(dǎo)入如下依賴的jar包:
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.10</version>
</dependency>
2.2 編寫(xiě)工具類代碼
package com.common.utils.influxdb;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Point.Builder;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
@Service("tsdbService")
public class TsdbServiceImpl implements TsdbService{
private static final Logger logger = LoggerFactory.getLogger(TsdbServiceImpl.class);
private static final InfluxDB.ConsistencyLevel CONSISTENCY_LEVEL = InfluxDB.ConsistencyLevel.ANY;
private static final TimeUnit PRECESION = TimeUnit.SECONDS;
@Value("${tsdb.server.hosts}")
private String hosts;
@Value("${tsdb.server.port}")
private String port;
/**
* 用戶名
*/
@Value("${tsdb.server.username}")
private String username;
/**
* 密碼
*/
@Value("${tsdb.server.password}")
private String password;
/**
* 數(shù)據(jù)庫(kù)
*/
@Value("${tsdb.server.database}")
private String database;
/**
* 保留策略
*/
@Value("${tsdb.server.retentionpolicy}")
private String retentionPolicy;
private InfluxDB influxDB;
@PostConstruct
public void init() {
List<String> serverAddressList = new ArrayList<>();
for (String host : hosts.split(",")) {
serverAddressList.add(String.format("%s:%s", host, port));
}
influxDB = InfluxDBFactory.connect(serverAddressList, username, password);
try {
// 如果指定的數(shù)據(jù)庫(kù)不存在,則新建一個(gè)新的數(shù)據(jù)庫(kù)特愿,并新建一個(gè)默認(rèn)的數(shù)據(jù)保留規(guī)則
if (!this.databaseExist(database)) {
createDatabase(database);
createRetentionPolicy();
}
} catch (Exception e) {
// 該數(shù)據(jù)庫(kù)可能設(shè)置動(dòng)態(tài)代理仲墨,不支持創(chuàng)建數(shù)據(jù)庫(kù)
logger.error("[TsdbService] occur error when init tsdb, err msg: {}", e);
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
// Flush every 1000 Points, at least every 100ms
// bufferLimit represents the maximum number of points can stored in the retry buffer
// exceptionHandler represents a consumer function to handle asynchronous errors
// threadFactory represents the ThreadFactory instance to be used
influxDB.enableBatch(BatchOptions.DEFAULTS
.actions(1000)
.flushDuration(100)
.bufferLimit(10)
.exceptionHandler((points, e) -> {
List<Point> target = new ArrayList<>();
points.forEach(target::add);
String msg = String.format("failed to write points:%s\n", target.toString().substring(0, 10000));
logger.error(msg, e);
})
.threadFactory(
Executors.defaultThreadFactory()
));
}
/**
* 測(cè)試連接是否正常
*
* @return true 正常
*/
public boolean ping() {
boolean isConnected = false;
Pong pong;
try {
pong = influxDB.ping();
if (pong != null) {
isConnected = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isConnected;
}
@Override
public void createDatabase(String database) {
influxDB.query(new Query("CREATE DATABASE " + database, ""));
}
@Override
public void dropDatabase(String database) {
influxDB.query(new Query("DROP DATABASE " + database, ""));
}
@Override
public boolean databaseExist(String database) {
return influxDB.databaseExists(database);
}
@Override
public void createRetentionPolicy() {
String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
"default_policy", database, "90d", 3);
this.query(command);
}
@Override
public void createRetentionPolicy(String database, String policyName, String duration, int replication, Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
database, duration, replication);
if (isDefault) {
sql = sql.concat(" DEFAULT");
}
this.query(sql);
}
@Override
public void dropRetentionPolicy() {
this.dropRetentionPolicy(database, retentionPolicy);
}
@Override
public void dropRetentionPolicy(String database, String retentionPolicy) {
String sql = String.format("DROP RETENTION POLICY %s ON %s", retentionPolicy, database);
this.query(sql);
}
@Override
public void createContinuousQuery(String measurement) {
String cqName = String.format("cq_%s", measurement);
String originMeasurement = String.format("%s.%s.%s", database, retentionPolicy, measurement);
String cqMeasurement = String.format("%s.%s.%s_hour", database, extendPolicy, measurement);
String sql = String.format("CREATE CONTINUOUS QUERY \"%s\" ON %s RESAMPLE EVERY 1h FOR 2h BEGIN SELECT MEAN(*) INTO %s FROM %s GROUP BY time(1h),* FILL(none) END",
cqName, database, cqMeasurement, originMeasurement);
this.query(sql);
}
@Override
public boolean continuousQueryExists(String measurement) {
String cqName = String.format("cq_%s", measurement);
return continuousQueryExists(database, cqName);
}
@Override
public boolean continuousQueryExists(String database, String cqName) {
String sql = "SHOW CONTINUOUS QUERIES";
QueryResult result = query(sql);
List<QueryResult.Series> seriesList = result.getResults().get(0).getSeries();
if (seriesList != null) {
for (QueryResult.Series series : seriesList) {
if (database.equals(series.getName())) {
List<List<Object>> continuousQueryList = series.getValues();
if (continuousQueryList == null) {
return false;
} else {
for (List<Object> queryResult : continuousQueryList) {
if (cqName.equals(queryResult.get(0))) {
return true;
}
}
}
}
}
}
return false;
}
@Override
public void dropContinuousQuery(String databaseName, String cqName) {
String sql = String.format("DROP CONTINUOUS QUERY %s ON %s", cqName, databaseName);
QueryResult result = query(sql);
}
@Override
public boolean measurementsExists(String measurement) {
return measurementsExists(database, measurement);
}
@Override
public boolean measurementsExists(String database, String measurement) {
String sql = String.format("SHOW MEASUREMENTS ON %s", database);
QueryResult result = query(sql);
if (result != null) {
List<QueryResult.Series> seriesList = result.getResults().get(0).getSeries();
if (seriesList != null) {
QueryResult.Series series = seriesList.get(0);
List<List<Object>> valueList = series.getValues();
for (List<Object> value : valueList) {
if (measurement.equals(value.get(0))) {
return true;
}
}
}
}
return false;
}
@Override
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}
@Override
public QueryResult dataQuery(String command) {
return influxDB.query(new Query(command, database), TimeUnit.MILLISECONDS);
}
@Override
public void insert(Point point1) {
influxDB.write(point1);
}
@Override
public void insert(String measurement, TimeUnit timeUnit, UniteMetricData data) {
timeUnit = timeUnit == null ? TimeUnit.MILLISECONDS : timeUnit;
Point point = pointBuilder(measurement, data.getTags(), data.getFields(), data.getTimestamp(), timeUnit);
influxDB.write(database, retentionPolicy, point);
}
@Override
public void batchInsert(BatchPoints batchPoints) {
influxDB.write(batchPoints);
}
@Override
public Point pointBuilder(String measurement,
Map<String, String> tags,
Map<String, Object> fields,
long time,
TimeUnit timeunit) {
Point point = Point.measurement(measurement).time(time, timeunit).tag(tags).fields(fields).build();
return point;
}
@Override
public BatchPoints buildBatchPoints() {
return this.batchPointsBuilder(database, CONSISTENCY_LEVEL, PRECESION);
}
@Override
public BatchPoints batchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision) {
return batchPointsBuilder(database, level, precision, null);
}
@Override
public BatchPoints batchPointsBuilder(String database, InfluxDB.ConsistencyLevel level, TimeUnit precision, String retentionPolicy) {
return BatchPoints.database(database).consistency(level).precision(precision).retentionPolicy(retentionPolicy).build();
}
三、使用示例
3.0 連通性測(cè)試
/**
* TSDB訪問(wèn)連通性檢查揍障,置于其他所有測(cè)試之前
*/
@Before
public void connectionTest() {
boolean connected = tsdbService.ping();
assertTrue(connected);
}
3.1 查詢數(shù)據(jù)
InfluxDB支持一次查詢多個(gè)SQL目养,SQL之間用逗號(hào)隔開(kāi)即可。下面只演示下毒嫡,只有一條SQL的情況下癌蚁,怎么解析查詢返回的結(jié)果集。
@Resource
private TsdbService tsdbService;
// 自測(cè)環(huán)境TSDB地址
private static final String MASTER_URL = "10.185.3.150:8091";
private static final String USERNAME = "root";
private static final String PASSWORD = "root";
private static final String DATABASE_NAME = "ncm_test_temp";
private static final String RP_NAME = "default_policy";
//測(cè)試數(shù)據(jù)表名兜畸,通常以namespace來(lái)命名
private static final String MEASUREMENT_NAME = "NVS";
private static final Integer QUERY_LIMIT = 10;
/**
* 批量查詢操作單元測(cè)試
*/
@Test
public void batchQueryTest() {
long beginTime = 1559613845000L;
long endTime = 1559621045000L;
Map<String, String> dimensionMap = new HashMap<>();
dimensionMap.put("tag1", "tag1");
// 帶時(shí)間范圍的查詢
String condition = TSDBUtil.getQueryCondition(dimensionMap,beginTime,endTime);
Object[] args = new Object[]{MEASUREMENT_NAME, condition, QUERY_LIMIT};
String command = String.format("SELECT * FROM %s WHERE %s ORDER BY time ASC LIMIT %d", args);
// 執(zhí)行查詢
QueryResult results = tsdbService.dataQuery(command);
if (results.getResults() == null) {
System.out.println("Data is empty");
return;
}
//results.getResults()是同時(shí)查詢多條SQL語(yǔ)句的返回值
for (QueryResult.Result result : results.getResults()) {
List<QueryResult.Series> series = result.getSeries();
for (QueryResult.Series serie : series) {
List<List<Object>> values = serie.getValues();//字段字集合
List<String> colums = serie.getColumns();//字段名
// 打印查詢結(jié)果
System.out.println("colums:" + colums);
for (List<Object> value : values) {
System.out.println("value:" + value);
}
// 封裝查詢結(jié)果
List<Map<String, Object>> dataList = new LinkedList<>();
for (int i=0;i<values.size();++i){
Map<String, Object> dataMap=new HashMap<>(colums.size());
for (int j=0;j<colums.size();++j){
dataMap.put(colums.get(j),values.get(i).get(j));
}
dataList.add(dataMap);
}
// dataList即可作為返回給用戶的查詢數(shù)據(jù)的基礎(chǔ)格式
System.out.println(dataList);
}
}
/**
* 標(biāo)準(zhǔn)化之前:
*
* colums:[time, field1, field2, tag1, tag2]
* value:[1.550599292E12, efs, 444444.0, tag1, tag2]
* value:[1.550595692E12, bcd, 333333.0, tag1, tag2]
* value:[1.550592092E12, abc, 123456.0, tag1, tag2]
*
* 標(biāo)準(zhǔn)化之后:
* [
* {
* tag1=tag1,
* field1=efs,
* time=1.550599292E12,
* field2=444444.0,
* tag2=tag2
* },
* {
* tag1=tag1,
* field1=bcd,
* time=1.550595692E12,
* field2=333333.0,
* tag2=tag2
* },
* {
* tag1=tag1,
* field1=abc,
* time=1.550592092E12,
* field2=123456.0,
* tag2=tag2
* }
* ]
*/
}
取數(shù)據(jù)的時(shí)候努释,注意空值判斷,本例將返回?cái)?shù)據(jù)先進(jìn)行判空oneResult.getSeries() != null咬摇,然后調(diào)用oneResult.getSeries().getValues().get(0)獲取到第一條SQL的返回結(jié)果集伐蒂,然后遍歷valueList,取出每條記錄中的目標(biāo)字段值菲嘴。
InfluxDB封裝的結(jié)果集有點(diǎn)深饿自,主要是由于支持多條SQL一次性查詢,可以提高查詢速度龄坪,這個(gè)地方有別于關(guān)系型數(shù)據(jù)庫(kù)的使用昭雌。
3.2 插入單條數(shù)據(jù)
InfluxDB的字段類型,由第一條插入的值得類型決定健田;tags的類型只能是String型烛卧,可以作為索引,提高檢索速度妓局。
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
Map<String, String> tags = new HashMap<String, String>();
tags.put("tag1", "標(biāo)簽值");
Map<String, Object> fields = new HashMap<String, Object>();
fields.put("field1", "String類型");
// 數(shù)值型总放,InfluxDB的字段類型,由第一天插入的值得類型決定
fields.put("field2", 3.141592657);
// 時(shí)間使用毫秒為單位
influxDBConnection.insert("表名", tags, fields, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
另外一個(gè)測(cè)試用例:
public class UniteMetricData implements Serializable {
private static final long serialVersionUID = 8968059029015805484L;
private Map<String, String> tags;
private Map<String, Object> fields;
private long timestamp;
public UniteMetricData(Map<String, String> tags, Map<String, Object> fields, long timestamp) {
this.tags = tags;
this.fields = fields;
this.timestamp = timestamp;
}
public Map<String, String> getTags() {
return tags;
}
public void setTags(Map<String, String> tags) {
this.tags = tags;
}
public Map<String, Object> getFields() {
return fields;
}
public void setFields(Map<String, Object> fields) {
this.fields = fields;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
@Test
public void writeTest() throws InterruptedException {
Map<String, String> tags = new HashMap<>();
tags.put("host", "ncm-test-01");
tags.put("projectId", "c57212bdec1345cd95107ef3109777");
Map<String, Object> fields = new HashMap<>();
fields.put("cpuUsage", 2.17);
UniteMetricData data = new UniteMetricData(tags, fields, System.currentTimeMillis());
tsdbService.insert(MEASUREMENT_NAME, TimeUnit.MILLISECONDS, data);
Thread.sleep(1000);
QueryResult queryResult = tsdbService.query(String.format("SELECT cpuUsage FROM %s", MEASUREMENT_NAME));
for (QueryResult.Result result : queryResult.getResults()) {
for (QueryResult.Series series : result.getSeries()) {
System.out.println(series.toString());
}
}
}
3.3 批量寫(xiě)入數(shù)據(jù)的幾種方式
注:使用這兩種種方式好爬,要就這兩條數(shù)據(jù)都寫(xiě)入到同一數(shù)據(jù)庫(kù)下且tag相同,若tag不相同局雄,需將它們放到不同的BatchPoint對(duì)象中,否則會(huì)出現(xiàn)數(shù)據(jù)寫(xiě)入錯(cuò)亂問(wèn)題存炮。
3.3.1 方式一:通過(guò)BatchPoints組裝數(shù)據(jù)后炬搭,循環(huán)插入數(shù)據(jù)庫(kù)
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
Map<String, String> tags = new HashMap<String, String>();
tags.put("tag1", "標(biāo)簽值");
Map<String, Object> fields1 = new HashMap<String, Object>();
fields1.put("field1", "abc");
// 數(shù)值型蜈漓,InfluxDB的字段類型,由第一天插入的值得類型決定
fields1.put("field2", 123456);
Map<String, Object> fields2 = new HashMap<String, Object>();
fields2.put("field1", "String類型");
fields2.put("field2", 3.141592657);
// 一條記錄值
Point point1 = influxDBConnection.pointBuilder("表名", System.currentTimeMillis(), tags, fields1);
Point point2 = influxDBConnection.pointBuilder("表名", System.currentTimeMillis(), tags, fields2);
// 將兩條記錄添加到batchPoints中
BatchPoints batchPoints1 = BatchPoints.database("db-test").tag("tag1", "標(biāo)簽值1").retentionPolicy("hour")
.consistency(ConsistencyLevel.ALL).build();
BatchPoints batchPoints2 = BatchPoints.database("db-test").tag("tag2", "標(biāo)簽值2").retentionPolicy("hour")
.consistency(ConsistencyLevel.ALL).build();
batchPoints1.point(point1);
batchPoints2.point(point2);
// 將兩條數(shù)據(jù)批量插入到數(shù)據(jù)庫(kù)中
influxDBConnection.batchInsert(batchPoints1);
influxDBConnection.batchInsert(batchPoints2);
}
3.3.2 方式二:通過(guò)BatchPoints組裝數(shù)據(jù)宫盔,序列化后融虽,一次性插入數(shù)據(jù)庫(kù)
public static void main(String[] args) {
InfluxDBConnection influxDBConnection = new InfluxDBConnection("admin", "admin", "1.1.1.1", "db-test", "hour");
Map<String, String> tags1 = new HashMap<String, String>();
tags1.put("tag1", "標(biāo)簽值");
Map<String, String> tags2 = new HashMap<String, String>();
tags2.put("tag2", "標(biāo)簽值");
Map<String, Object> fields1 = new HashMap<String, Object>();
fields1.put("field1", "abc");
// 數(shù)值型,InfluxDB的字段類型灼芭,由第一天插入的值得類型決定
fields1.put("field2", 123456);
Map<String, Object> fields2 = new HashMap<String, Object>();
fields2.put("field1", "String類型");
fields2.put("field2", 3.141592657);
// 一條記錄值
Point point1 = influxDBConnection.pointBuilder("表名", System.currentTimeMillis(), tags1, fields1);
Point point2 = influxDBConnection.pointBuilder("表名", System.currentTimeMillis(), tags2, fields2);
BatchPoints batchPoints1 = BatchPoints.database("db-test").tag("tag1", "標(biāo)簽值1")
.retentionPolicy("hour").consistency(ConsistencyLevel.ALL).build();
// 將兩條記錄添加到batchPoints中
batchPoints1.point(point1);
BatchPoints batchPoints2 = BatchPoints.database("db-test").tag("tag2", "標(biāo)簽值2")
.retentionPolicy("hour").consistency(ConsistencyLevel.ALL).build();
// 將兩條記錄添加到batchPoints中
batchPoints2.point(point2);
// 將不同的batchPoints序列化后有额,一次性寫(xiě)入數(shù)據(jù)庫(kù),提高寫(xiě)入速度
List<String> records = new ArrayList<String>();
records.add(batchPoints1.lineProtocol());
records.add(batchPoints2.lineProtocol());
// 將兩條數(shù)據(jù)批量插入到數(shù)據(jù)庫(kù)中
influxDBConnection.batchInsert("db-test", "hour", ConsistencyLevel.ALL, records);
}
方式三:直接調(diào)用write方法寫(xiě)入
@Test
public void batchWriteTest() {
Map<String, String> tags = new HashMap<>();
tags.put("tag1", "tag1");
tags.put("tag2", "tag2");
Map<String, Object> fields1 = new HashMap<>();
fields1.put("field1", "abc");
fields1.put("field2", 123456);
Map<String, Object> fields2 = new HashMap<>();
fields2.put("field1", "bcd");
fields2.put("field2", 333333);
Map<String, Object> fields3 = new HashMap<>();
fields3.put("field1", "efs");
fields3.put("field2", 444444);
// 偽造1W條待插入的測(cè)試數(shù)據(jù)
List<UniteMetricData> recordList=new ArrayList<>(10000);
for (int i = 0; i < 10000; ++i) {
UniteMetricData data = new UniteMetricData(tags, fields1, 1559617445000L + random.nextLong() % 1000);
recordList.add(data);
}
long start = System.currentTimeMillis();
for (UniteMetricData data : recordList) {
tsdbService.insert(MEASUREMENT_NAME, TimeUnit.MILLISECONDS, data);
}
long end = System.currentTimeMillis();
System.out.println(String.format("Time used %d ms", end - start));
}
由于在連接創(chuàng)建時(shí)開(kāi)啟了本地緩存區(qū)彼绷,influxdb會(huì)執(zhí)行異步寫(xiě)入巍佑,經(jīng)過(guò)測(cè)試,批量寫(xiě)入性能不低于上面兩種方式寄悯,且無(wú)需手動(dòng)構(gòu)建BathPonint結(jié)構(gòu)句狼,更加簡(jiǎn)單易用,生產(chǎn)環(huán)境中热某,我們就是使用的這種方式,比較推薦胳螟。
3.4 總結(jié)
本小節(jié)主要介紹了如何使用Java API封裝InfluxDB基本的讀寫(xiě)操作昔馋,希望上述service類能夠在大家首次接入InfluxDB時(shí)提供一些便利。