时序数据库influxDB入门

InfluxDB官方文档/githab源码

Helm安装

1
helm upgrade --install my-influxdb influxdata/influxdb --set image.tag=2.5.1-alpine

访问:开放nodeport,访问ip:nodeport地址

基础概念

measurement:表

point::一行数据记录

time:时间戳,自带字段,单位纳秒

tags:有各种索引的属性,一般用于

fields:没有索引的属性,一般会实时变化,存经纬度、温度、等变化的数据

annotated CSV数据

csv数据格式
group false false true true false false true true true 组?
datatype string long dateTime:RFC3339 dateTime:RFC3339 dateTime:RFC3339 double string string string 数据类型
default mean 默认值?
result table _start _stop _time _value _field _measurement car 数据表头
0 2022-11-22T07:20:32.833674853Z 2022-11-22T08:20:32.833674853Z 2022-11-22T08:00:30Z 39.90786 lat gps 川A888888 point一行数据
注释 表? 查询的开始时间 查询的结束时间 数据的时间 field的值 field的key tag
csv源数据
1
2
3
4
5
6
7
8
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,mean,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,car
,,0,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:00:30Z,39.90786,lat,gps,川A888888
,,0,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:12:40Z,39.91786,lat,gps,川A888888
,,1,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:00:30Z,116.510958,lon,gps,川A888888
,,1,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:12:40Z,116.510928,lon,gps,川A888888

line protocol

1
2
3
4
5
6
measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
---------------,--------------- --------------------- -------------------
Measurement tags set fields set timestamp
eg:
gps,car=川A888888 lat=39.90786,lon=116.510958 1669104020000000000
gps,car=川A888888 lat=39.91786,lon=116.510928 1669104759000000000

接入springboot

添加依赖

1
2
3
4
5
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.7.0</version>
</dependency>

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class InfluxdbConfig {

@Value("${spring.influx.url:''}")
private String influxDbUrl;
@Value("${spring.influx.token:''}")
private String influxDbToken;
@Value("${spring.influx.org:''}")
private String influxDbOrg;
@Value("${spring.influx.buket:''}")
private String influxDbBuket;


@Bean
InfluxDBClient influxDBClient(){
InfluxDBClient influxDBClient= InfluxDBClientFactory.create(influxDbUrl,influxDbToken.toCharArray(),influxDbOrg,influxDbBuket);
influxDBClient.setLogLevel(LogLevel.BASIC);
return influxDBClient;
}

}

读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Repository
public class GpsSeriesDao {
private static final Logger LOG = Logger.getLogger(GpsSeriesDao.class.getName());
/**
* 表(_measurement)
*/
private static final String TABLE = "gps";
@Autowired
InfluxDBClient influxDBClient;
@Autowired
InfluxdbConfig influxdbConfig;

/**
* 读数据
* @param carNum
* @param startTime
* @param endTime
* @return
*/
public List<GpsRecord> findTrackByCarNum(String carNum, String startTime, String endTime) {
String flux = "from(bucket: \"" + influxdbConfig.influxDbBuket + "\")\n" +
" |> range(start: " + startTime + ",stop:" + endTime + ")\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"" + TABLE + "\")\n" +
" |> filter(fn: (r) => r[\"carNum\"] == \"" + carNum + "\")\n" +
" |> pivot(rowKey:[\"_time\"],columnKey: [\"_field\"],valueColumn: \"_value\") "+
" |> yield(name: \"mean\")";
LOG.info("query flux:\n" + flux);
QueryApi queryApi = influxDBClient.getQueryApi();
List<GpsRecord> gpsRecords = queryApi.query(flux, GpsRecord.class);
return gpsRecords;
}

/**
* 写数据
* @param gpsRecord
*/
public void writeGpsRecord(GpsRecord gpsRecord) {
String timestamp;
if (gpsRecord.getTime() != null) {
timestamp = gpsRecord.getTime().toEpochMilli() + "000000"; //13位转为19位时间戳,数据库才能识别
} else {
throw new BusinessException(ApiCode.PARAM_FORMAT_INCORR.getValue(), "gps time参数格式错误");
}
String data = TABLE + ",carNum=" + gpsRecord.getCarNum() + " lat=" + gpsRecord.getLat() + ",lon=" + gpsRecord.getLon() + " " + timestamp;
LOG.info("Line Protocol:\n" + data);
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
writeApi.writeRecord(WritePrecision.NS, data);
}
}

常见influxdb flux语句

官方文档

行列转换语句-pivot

在influxdb当有多个field时,每个field查询出来会单独成一行数据,这是就需要添加

1
|> pivot(rowKey:["_time"],columnKey: ["_field"],valueColumn: "_value")

添加之后,就会把多个field换成一行。

flux in查询

1
|> filter(fn: (r) =>contains(value: r["carNum"], set: ["01","02","03"]))  

对应java代码

1
2
3
4
5
6
7
8
9
10
11
String carNumsStr=carNums.stream().map(s->"\""+s+"\"").collect(Collectors.joining(","));
String flux = "from(bucket: \"" + influxdbConfig.influxDbBuket + "\")\n" +
" |> range(start:-15d)\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"" + TABLE + "\")\n" +
" |> filter(fn: (r) => contains(value: r[\"carNum\"], set: ["+carNumsStr+"]))\n" +
" |> last()\n" +
" |> pivot(rowKey:[\"_time\"],columnKey: [\"_field\"],valueColumn: \"_value\") "+
" |> yield(name: \"mean\")";
LOG.info("query flux:\n" + flux);
QueryApi queryApi = influxDBClient.getQueryApi();
List<GpsRecord> gpsRecords = queryApi.query(flux, GpsRecord.class);

last查询最新数据

1
|> last()  

flux官方文档阅读笔记

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|> group(columns: ["host"], mode: "by") #分组
|> sort(columns: ["host", "_value"]) #排序
|> aggregateWindow(every: 20m, fn: mean) #每20分钟取一次平均值
|> map(fn: (r) => ({ r with _value: r._value * r._value })) #转换数据,求平方
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") #行转列
|> increase() #每分钟累加的值
|> movingAverage(n: 5) #当前时间点的前三个数据(从自己开始计数)的平均数
|> timedMovingAverage(every: 2m, period: 4m) #每2分钟统计4分钟前的平均数
|> derivative(unit: 1m, nonNegative: true) #
|> histogram(
column: "_value",
upperBoundColumn: "le",
countColumn: "_value",
bins: [100.0, 200.0, 300.0, 400.0],
) #分别统计100,200,300,400以内的数据的个数
|> fill(usePrevious: true) #替换空值,用上一个数据进行替换,第一个数据可能为null,因为没有上一个数据
|> fill(value: 0.0) #null空值用0.0替换
|> median() #获取中间值
|> quantile(q: 0.99, method: "estimate_tdigest")
|> cumulativeSum() #历史数据求和
|> first() #最早的数据
|> last() #最晚的数据
|> filter(fn: (r) => exists r._value) #过滤null的数据
|> count()
|> count(column: "lat")


常见实用案例查询

1
2
3
4
5
6
7
8
9
# 先把数据值转化为1,然后统计数据的条数,可以按分钟,天、月等力度统计每个区间段的数据
from(bucket: "transport")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "gps")
|> filter(fn: (r) => r["_field"] == "lat")
|> filter(fn: (r) => r["carNum"] == "719135")
|> map(fn: (r) => ({ r with _value: 1 }))
|> aggregateWindow(every: 1m, fn: sum, createEmpty: false)
|> yield(name: "mean")

常见集成架构

graph LR

A[mqtt/物联网设备] -->B(emq/mqtt服务端)
B --> c[telegraf/数据中转处理]
c --> d[infludb/入库]
d --> e[grafna/展示]

常见问题

  1. 时区问题,在界面上默认是0时区去查询,时间数据导入2022-11-22T08:51:00Z z代表0时区。

    解决:修改容器的时区,添加只读挂在卷

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spec:
    volumes:
    - name: localtime
    hostPath:
    path: /etc/localtime
    type: ''
    containers:
    volumeMounts:
    - name: localtime
    readOnly: true
    mountPath: /etc/localtime

    添加之后,导入数据也要注意,切换为中国(+8)时区,也就是在时间RFC3339上,就行加8,也就是2022-11-22T08:51:00+08:00进行导入,但是查询时,界面上还是以0时区显示,不方便观察,可以在data explorer->customize->time format选择YYYY-MM-DD hh:mm:ss a ZZ

参考

Go - time.RFC3339 时间格式化

mqtt+emq+influxdb+grafana系统搭建傻瓜教程