时序数据库influxDB入门

InfluxDB官方文档/githab源码

Helm安装

1
helm upgrade --install my-influxdb influxdata/influxdb --set image.tag=2.5.1-alpine

访问:开放nodeport,访问ip:nodeport地址

基础概念

measurement:表

point::一行数据记录

time:时间戳,自带字段,单位纳秒

tags:有各种索引的属性,一般用于

fields:没有索引的属性,一般会实时变化,存经纬度、温度、等变化的数据

annotated CSV数据

csv数据格式
group false false true true false false true true true 组?
datatype string long dateTime:RFC3339 dateTime:RFC3339 dateTime:RFC3339 double string string string 数据类型
default mean 默认值?
result table _start _stop _time _value _field _measurement car 数据表头
0 2022-11-22T07:20:32.833674853Z 2022-11-22T08:20:32.833674853Z 2022-11-22T08:00:30Z 39.90786 lat gps 川A888888 point一行数据
注释 表? 查询的开始时间 查询的结束时间 数据的时间 field的值 field的key tag
csv源数据
1
2
3
4
5
6
7
8
#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string
#default,mean,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,car
,,0,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:00:30Z,39.90786,lat,gps,川A888888
,,0,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:12:40Z,39.91786,lat,gps,川A888888
,,1,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:00:30Z,116.510958,lon,gps,川A888888
,,1,2022-11-22T07:20:32.833674853Z,2022-11-22T08:20:32.833674853Z,2022-11-22T08:12:40Z,116.510928,lon,gps,川A888888

line protocol

1
2
3
4
5
6
measurementName,tagKey=tagValue fieldKey="fieldValue" 1465839830100400200
---------------,--------------- --------------------- -------------------
Measurement tags set fields set timestamp
eg:
gps,car=川A888888 lat=39.90786,lon=116.510958 1669104020000000000
gps,car=川A888888 lat=39.91786,lon=116.510928 1669104759000000000

接入springboot

添加依赖

1
2
3
4
5
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.7.0</version>
</dependency>

连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class InfluxdbConfig {

@Value("${spring.influx.url:''}")
private String influxDbUrl;
@Value("${spring.influx.token:''}")
private String influxDbToken;
@Value("${spring.influx.org:''}")
private String influxDbOrg;
@Value("${spring.influx.buket:''}")
private String influxDbBuket;


@Bean
InfluxDBClient influxDBClient(){
InfluxDBClient influxDBClient= InfluxDBClientFactory.create(influxDbUrl,influxDbToken.toCharArray(),influxDbOrg,influxDbBuket);
influxDBClient.setLogLevel(LogLevel.BASIC);
return influxDBClient;
}

}

读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Repository
public class GpsSeriesDao {
private static final Logger LOG = Logger.getLogger(GpsSeriesDao.class.getName());
/**
* 表(_measurement)
*/
private static final String TABLE = "gps";
@Autowired
InfluxDBClient influxDBClient;
@Autowired
InfluxdbConfig influxdbConfig;

/**
* 读数据
* @param carNum
* @param startTime
* @param endTime
* @return
*/
public List<GpsRecord> findTrackByCarNum(String carNum, String startTime, String endTime) {
String flux = "from(bucket: \"" + influxdbConfig.influxDbBuket + "\")\n" +
" |> range(start: " + startTime + ",stop:" + endTime + ")\n" +
" |> filter(fn: (r) => r[\"_measurement\"] == \"" + TABLE + "\")\n" +
" |> filter(fn: (r) => r[\"carNum\"] == \"" + carNum + "\")\n" +
" |> pivot(rowKey:[\"_time\"],columnKey: [\"_field\"],valueColumn: \"_value\") "+
" |> yield(name: \"mean\")";
LOG.info("query flux:\n" + flux);
QueryApi queryApi = influxDBClient.getQueryApi();
List<GpsRecord> gpsRecords = queryApi.query(flux, GpsRecord.class);
return gpsRecords;
}

/**
* 写数据
* @param gpsRecord
*/
public void writeGpsRecord(GpsRecord gpsRecord) {
String timestamp;
if (gpsRecord.getTime() != null) {
timestamp = gpsRecord.getTime().toEpochMilli() + "000000"; //13位转为19位时间戳,数据库才能识别
} else {
throw new BusinessException(ApiCode.PARAM_FORMAT_INCORR.getValue(), "gps time参数格式错误");
}
String data = TABLE + ",carNum=" + gpsRecord.getCarNum() + " lat=" + gpsRecord.getLat() + ",lon=" + gpsRecord.getLon() + " " + timestamp;
LOG.info("Line Protocol:\n" + data);
WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking();
writeApi.writeRecord(WritePrecision.NS, data);
}
}

常见influxdb flux语句

官方文档

行列转换语句-pivot

在influxdb当有多个field时,每个field查询出来会单独成一行数据,这是就需要添加

1
|> pivot(rowKey:["_time"],columnKey: ["_field"],valueColumn: "_value")

添加之后,就会把多个field换成一行。

常见集成架构

graph LR

A[mqtt/物联网设备] -->B(emq/mqtt服务端)
B --> c[telegraf/数据中转处理]
c --> d[infludb/入库]
d --> e[grafna/展示]

常见问题

  1. 时区问题,在界面上默认是0时区去查询,时间数据导入2022-11-22T08:51:00Z z代表0时区。

    解决:修改容器的时区,添加只读挂在卷

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spec:
    volumes:
    - name: localtime
    hostPath:
    path: /etc/localtime
    type: ''
    containers:
    volumeMounts:
    - name: localtime
    readOnly: true
    mountPath: /etc/localtime

    添加之后,导入数据也要注意,切换为中国(+8)时区,也就是在时间RFC3339上,就行加8,也就是2022-11-22T08:51:00+08:00进行导入,但是查询时,界面上还是以0时区显示,不方便观察,可以在data explorer->customize->time format选择YYYY-MM-DD hh:mm:ss a ZZ

参考

Go - time.RFC3339 时间格式化

mqtt+emq+influxdb+grafana系统搭建傻瓜教程