influxDB2.2

下載安裝

  1. 下載地址
  2. 下載後在解壓目錄中,輸入cmd執行exe文件
  3. 瀏覽器訪問localhost:8086
  4. 選擇快速開始,填寫用戶資訊,組織資訊

image.png

相關概念

InfluxDB是一個由InfluxData開發的開源時序型數據。它由Go寫成,著力於高性能地查詢與存儲時序型數據。InfluxDB被廣泛應用於存儲系統的監控數據,IoT行業的實時數據等場景。

名詞

  • bucket:相當於mysql中的資料庫
  • measurement:相當於mysql中的數據表
  • tag:標籤可以有多個,相當於索引
  • time:時間戳
  • field:欄位

數據操作

Line Protocol

image.png
選擇Enter Manually執行語句

語法

InfluxDB使用行協議寫入數據點。它是一種基於文本的格式,提供數據點的度量、標記集、欄位集和時間戳。
measurementName,tagKey=tagValue fieldKey=”fieldValue” 1465839830100400200
————— ————— ——————— ——————-
| | | |
Measurement Tag set Field set Timestamp
例:
myMeasurement,tag1=value1,tag2=value2 fieldKey=”fieldValue” 1556813561098000000
由換行符分隔的行 \ n表示InfluxDB中的單個點。線路協議對空格敏感。

Explore

image.png
可進行頁面上的篩選,點擊script Editor查看執行的語句
image.png
可切換數據呈現的樣式
image.png

查詢數據

  • 聲明資料庫:from(bucket:”example-bucket”)
  • 指定查詢範圍:|> range(start: -1h)
  • 設置篩選條件:|> filter(fn: (r) => r._measurement == “cpu” and r._field == “usage_system” and r.cpu == “cpu-total”)
  • 輸出結果:yield()
  • Flux自動假定在每個腳本的末尾有一個yield()函數,用於輸出和可視化數據。只有在同一個Flux查詢中包含多個查詢時,才需要顯式地調用yield()。每一組返回的數據都需要使用yield()函數命名。

完整語句:

from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> yield(name: "test")

java開發

引入依賴

<dependency>
  <groupId>com.influxdb</groupId>
  <artifactId>influxdb-client-java</artifactId>
  <version>3.1.0</version>
</dependency>

數據模型

@Data
@Accessors(chain = true)
@Measurement(name = "monitoring_data")
public class MonData {

  @Column(tag = true)
  private String pointName;

  @Column(tag = true)
  private String indexName;

  @Column private Double value;

  @Column(timestamp = true)
  private Instant time;
}

@InfluxColumn為自定義註解,用於拼接查詢語句構造map函數使用

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface InfluxColumn {
  String value();
}
@Data
public class MonDataDTO {

  // tag1名稱
  @InfluxColumn("pointName")
  private String pointName;

  // tag2名稱
  @InfluxColumn("indexName")
  private String indexName;

  // 時間片開始時間
  @InfluxColumn("_start")
  private String start;

  // 時間片結束時間
  @InfluxColumn("_stop")
  private String stop;

  // 數據產生時間
  @InfluxColumn("_time")
  private String time;

  // 值
  @InfluxColumn("_value")
  private String value;
}
@Data
public class SearchParams<T> {

  // 查詢時間範圍開始時間
  private String start;
  // 時間戳欄位排序規則,true:降序
  private Boolean sortRule = true;
  // 查詢時間範圍結束時間
  private String end;
  // 時間間隔
  private String every;
  // 篩選條件
  private List<String> filterList;
  // map構造的目標類對象
  private Class<? extends T> mapClazz;
}

功能類

@Repository
@Slf4j
public class InfluxRepository {

  @Autowired private WriteApi writeApi;
  @Autowired private QueryApi queryApi;
  @Autowired private InfluxdbConfigProp influxdbConfigProp;

  /**
   * 向influx寫入數據
   *
   * @param data 寫入數據實體
   */
  public <T> void writeData(T data) {
    writeApi.writeMeasurement(
        influxdbConfigProp.getBucket(), influxdbConfigProp.getOrg(), WritePrecision.MS, data);
  }

  /**
   * 查詢數據
   *
   * @param params 查詢參數
   */
  public <T> List<FluxTable> findMonitoringData(SearchParams<T> params) {

    StringBuffer queryBuffer = new StringBuffer();
    // BUCKET
    queryBuffer.append("from(bucket: \"");
    queryBuffer.append(influxdbConfigProp.getBucket());
    // 時間範圍條件
    queryBuffer.append("\") \n|> range(start: ");
    queryBuffer.append(params.getStart());
    queryBuffer.append(", stop: ");
    queryBuffer.append(params.getEnd());
    queryBuffer.append(")\n");
    List<String> filterList = params.getFilterList();
    if (!CollectionUtils.isEmpty(filterList)) {
      queryBuffer.append("  |> filter(fn: (r) => ");
      // 拼接查詢條件
      for (int i = 0; i < filterList.size(); i++) {
        String[] filters = filterList.get(i).split(">");
        queryBuffer.append("r[\"");
        queryBuffer.append(filters[0]);
        queryBuffer.append("\"]");
        queryBuffer.append(filters[1]);
        if (i < filterList.size() - 1) queryBuffer.append(" and ");
      }
      queryBuffer.append(")\n");
    }

    // aggregateWindow函數
    queryBuffer.append("  |> aggregateWindow(every: ");
    queryBuffer.append(params.getEvery());
    queryBuffer.append(",fn: first, createEmpty: true)\n");
    // 為查詢結果添加排序
    queryBuffer.append("  |> sort(columns: [\"_time\"], desc: ");
    queryBuffer.append(params.getSortRule().booleanValue());
    queryBuffer.append(")\n");
    // map函數語句拼接
    Class<? extends T> mapClazz = params.getMapClazz();
    if (!ObjectUtils.isEmpty(mapClazz)) {
      queryBuffer.append("  |> map(");
      queryBuffer.append(" fn:(r) => { \n");
      queryBuffer.append("    return {\n");
      Field[] fields = mapClazz.getDeclaredFields();
      // 目標實體欄位和influx查詢結果欄位的映射
      Map<String, String> fieldMap = new HashMap<>();
      for (Field field : fields) {
        InfluxColumn influxColumn = field.getAnnotation(InfluxColumn.class);
        if (influxColumn != null) {
          fieldMap.put(field.getName(), influxColumn.value());
        }
      }
      // 若有需要映射的欄位則構建語句
      if (!CollectionUtils.isEmpty(fieldMap)) {
        for (String key : fieldMap.keySet()) {
          queryBuffer.append(key);
          queryBuffer.append(": r[\"");
          queryBuffer.append(fieldMap.get(key));
          queryBuffer.append("\"],\n");
        }
        queryBuffer.append("}})\n");
      }
    }

    String influxQl = queryBuffer.toString();
    log.info("查詢語句, {}", influxQl);
    List<FluxTable> queryData = queryApi.query(influxQl, influxdbConfigProp.getOrg());
    return queryData;
  }
}
@Service
@Slf4j
public class InfluxQueryService {

  @Autowired private ObjectMapper objectMapper;

  @Autowired private InfluxRepository influxRepository;

  /**
   * 監測數據查詢
   *
   * @param start 起始範圍時間點
   * @param end 結束範圍時間點
   * @param every 時間片
   * @param filterList 篩選條件集合(集合內元素例:pointName>csd-001)
   * @param clazz 去除數據時map對象映射的類對象
   * @param sort 時間欄位排序規則
   */
  public <T> List<T> findMonitoringDataInFluxDB(
      String start,
      String end,
      String every,
      List<String> filterList,
      Class<? extends T> clazz,
      boolean sort) {
    // mainTag和 subTag需要特殊處理,將逗號替換成"|"正則表達
    filterList =
        filterList.stream()
            .map(filter -> StringUtils.replace(filter, ",", "|"))
            .collect(Collectors.toList());
    SearchParams<T> searchParams = new SearchParams<>();
    searchParams.setStart(start);
    searchParams.setEnd(end);
    searchParams.setEvery(every);
    searchParams.setFilterList(filterList);
    searchParams.setMapClazz(clazz);
    searchParams.setSortRule(sort);
    List<FluxTable> fluxTableList = influxRepository.findMonitoringData(searchParams);
    return mapFluxData(fluxTableList, clazz);
  }

  /**
   * 解析原始數據
   *
   * @param data 原始數據
   */
  public <T> List<T> mapFluxData(List<FluxTable> data, Class<? extends T> clazz) {
    List<T> result = new LinkedList<>();
    for (FluxTable ft : data) {
      List<FluxRecord> records = ft.getRecords();
      for (FluxRecord rc : records) {
        try {
          T originData =
              objectMapper.readValue(objectMapper.writeValueAsString(rc.getValues()), clazz);
          result.add(originData);
        } catch (JsonProcessingException e) {
          log.error("influx查詢數據轉換為DTO時解析出錯");
          throw new RuntimeException(e);
        }
      }
    }
    return result;
  }
}

業務Service構造查詢條件,並提供相應的:查詢結果實體 => 實體之間的轉換方法

/**
   * 設備指標監測值
   *
   * @param start 起始範圍時間點
   * @param end 結束範圍時間點
   * @param every 時間片
   * @param tagName 設備id
   */
  public List<MonDataDTO> getMonitoringData(
      String start, String end, String every, String tagName) {
    
    // 篩選條件
    List<String> filterList = new ArrayList<>();
    filterList.add("_measurement> == \"monitoring_data\"");
    filterList.add("tagName> =~/" + tagName + "/");
    // 處理時間參數
    String startDate;
    String endDate;
    LocalDate startLocalDate = LocalDate.parse(start).plusDays(-1);
    // 一天內的數據 (開始時間的前一天的23點,到結束時間的23點,時區原因查詢時時間減去8小時)
    // 跨天的數據(開始和結束時間減8小時)
    String endTime = (start.equals(end) ? "T15:00:00Z" : "T16:00:00Z");
    startDate = startLocalDate + endTime;
    endDate = end + endTime;
    List<MonDataDTO> dataInFluxDB =
        influxQueryService.findMonitoringDataInFluxDB(
            startDate, endDate, every, filterList, MonDataDTO.class, false);
    return dataInFluxDB;
  }

配置類

@Data
@ConfigurationProperties(prefix = "influxdb")
@Component
public class InfluxdbConfigProp {

  private String token;

  private String bucket;

  private String org;

  private String url;
}

@Configuration
public class InfluxdbConfig {

  @Autowired private InfluxdbConfigProp influxdbConfigProp;

  @Bean
  public InfluxDBClient influxDBClient() {
    InfluxDBClient influxClient =
        InfluxDBClientFactory.create(
            influxdbConfigProp.getUrl(), influxdbConfigProp.getToken().toCharArray());
    influxClient.setLogLevel(LogLevel.BASIC);
    return influxClient;
  }

  @Bean
  public WriteApi writeApi(InfluxDBClient influxDBClient) {
    WriteOptions writeOptions =
        WriteOptions.builder()
            .batchSize(5000)
            .flushInterval(1000)
            .bufferLimit(10000)
            .jitterInterval(1000)
            .retryInterval(5000)
            .build();
    return influxDBClient.getWriteApi(writeOptions);
  }

  @Bean
  public QueryApi queryApi(InfluxDBClient influxDBClient) {
    return influxDBClient.getQueryApi();
  }
}

常用函數

window()

使用window()函數根據時間界限對數據進行分組。window()傳遞的最常用參數是every,它定義了窗口之間的持續時間。也可以使用其他參數,但是對於本例,將基本數據集窗口化為一分鐘窗口。
dataSet |> window(every: 1m)

first()和last()

獲取查詢結果的第一條或最後一條

drop()

刪除查詢結果的指定列
|> drop(columns: [“host”])

sort()和limit()

排序和分頁
|> sort(columns: [“index”, “time”], desc: true)
|> limit(n: 10)
n參數為pageSize

timedMovingAverage()

對於表中的每一行,timedMovingAverage()返回當前值和上一個周期(持續時間)中所有行值的平均值。它以每個參數定義的頻率返回移動平均線。
|> timedMovingAverage(every: 1h, period: 1h)

aggregateWindow()

|> aggregateWindow(every: 1h, fn: first, createEmpty: true)
每一小時時間片的第一條記錄,空數據以null填充

map()

|> map( fn:(r) => {
    return { 
      code: r["code"],
      time: r["_time"],
      value: r["_value"],
      index: r["indexName"]
      }
   }
  )

注意事項

  • tag與tag之間用逗號分隔
  • field與field之間用逗號分隔
  • tag與field之間用空格分隔
  • tag都是string類型,不需要引號將value包裹
  • tag的值不能有空格
  • 寫入數據時,若tag和時間戳都相同的多條記錄,則最後只會保存一條