10分鐘教你寫一個資料庫

今天教大家藉助一款框架快速實現一個資料庫,這個框架就是Calcite,下面會帶大家通過兩個例子快速教會大家怎麼實現,一個是可以通過 SQL 語句的方式可以直接查詢文件內容,第二個是模擬 Mysql 查詢功能,以及最後告訴大家怎麼實現 SQL 查詢 Kafka 數據。

Calcite

Calcite 是一個用於優化異構數據源的查詢處理的可插拔基礎框架(他是一個框架),可以將任意數據(Any data, Anywhere)DML 轉換成基於 SQL 的 DML 引擎,並且我們可以選擇性的使用它的部分功能。

Calcite能幹什麼

  1. 使用 SQL 訪問記憶體中某個數據

  2. 使用 SQL 訪問某個文件的數據

  3. 跨數據源的數據訪問、聚合、排序等(例如 Mysql 和 Redis 數據源中的數據進行join)

當我們需要自建一個資料庫的時候,數據可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方介面數據等等,我們只有數據,我們想讓這些數據支援 SQL 形式動態增刪改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等項目中,數據處理系統都是使用 Calcite 來做 SQL 解析和查詢優化,當然,還有部分用來構建自己的 JDBC driver。

名詞解釋

Token

就是將標準 SQL(可以理解為Mysql)關鍵詞以及關鍵詞之間的字元串截取出來,每一個token,會被封裝為一個SqlNodeSqlNode會衍生很多子類,比如Select會被封裝為SqlSelect,當前 SqlNode 也能反解析為 SQL 文本。

RelDataTypeField

某個欄位的名稱和類型資訊

RelDataType

多個 RelDataTypeField 組成了 RelDataType,可以理解為數據行

Table

一個完整的表的資訊

Schema

所有元數據的組合,可以理解為一組 Table 或者庫的概念

開始使用

1. 引入包

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <!-- 目前最新版本 2022-09-10日更新-->
    <version>1.32.0</version>
</dependency>

2. 創建model.json文件和表結構csv

model.json 裡面主要描述或者說告訴 Calcite 如何創建 Schema,也就是告訴框架怎麼創建出庫。

{
"version": "1.0",//忽略
"defaultSchema": "CSV",//設置默認的schema
"schemas": [//可定義多個schema
        {
          "name": "CSV",//相當於namespace和上面的defaultSchema的值對應
          "type": "custom",//寫死
          "factory": "csv.CsvSchemaFactory",//factory的類名必須是你自己實現的factory的包的全路徑
          "operand": { //這裡可以傳遞自定義參數,最終會以map的形式傳遞給factory的operand參數
          "directory": "csv"//directory代表calcite會在resources下面的csv目錄下面讀取所有的csv文件,factory創建的Schema會吧這些文件全部構建成Table,可以理解為讀取數據文件的根目錄,當然key的名稱也不一定非得用directory,你可以隨意指定
                }
        }
      ]
}

接下來還需要定義一個 csv 文件,用來定義表結構。

NAME:string,MONEY:string
aixiaoxian,10000萬
xiaobai,10000萬
adong,10000萬
maomao,10000萬
xixi,10000萬
zizi,10000萬
wuwu,10000萬
kuku,10000萬

整個項目的結構大概就是這樣:

3. 實現Schema的工廠類

在上述文件中指定的包路徑下去編寫 CsvSchemaFactory 類,實現 SchemaFactory 介面,並且實現裡面唯一的方法 create 方法,創建Schema(庫)。

public class CsvSchemaFactory implements SchemaFactory {
    /**
     * parentSchema 父節點,一般為root
     * name 為model.json中定義的名字
     * operand 為model.json中定於的數據,這裡可以傳遞自定義參數
     *
     * @param parentSchema Parent schema
     * @param name         Name of this schema
     * @param operand      The "operand" JSON property
     * @return
     */
    @Override
    public Schema create(SchemaPlus parentSchema, String name,
                         Map<String, Object> operand) {
        final String directory = (String) operand.get("directory");
        File directoryFile = new File(directory);
        return new CsvSchema(directoryFile, "scannable");
    }
}

4. 自定義Schma類

有了 SchemaFactory,接下來需要自定義 Schema 類。

自定義的 Schema 需要實現 Schema 介面,但是直接實現要實現的方法太多,我們去實現官方的 AbstractSchema 類,這樣就只需要實現一個方法就行(如果有其他訂製化需求可以實現原生介面)。

核心的邏輯就是createTableMap方法,用於創建出 Table 表。

他會掃描指定的Resource下面的所有 csv 文件,將每個文件映射成Table對象,最終以map形式返回,Schema介面的其他幾個方法會用到這個對象。

		//實現這一個方法就行了
    @Override
    protected Map<String, Table> getTableMap() {
        if (tableMap == null) {
            tableMap = createTableMap();
        }
        return tableMap;
    }
		private Map<String, Table> createTableMap() {
        // Look for files in the directory ending in ".csv"
        final Source baseSource = Sources.of(directoryFile);
        //會自動過濾掉非指定文件後綴的文件,我這裡寫的csv
        File[] files = directoryFile.listFiles((dir, name) -> {
            final String nameSansGz = trim(name, ".gz");
            return nameSansGz.endsWith(".csv");
        });
        if (files == null) {
            System.out.println("directory " + directoryFile + " not found");
            files = new File[0];
        }
        // Build a map from table name to table; each file becomes a table.
        final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
        for (File file : files) {
            Source source = Sources.of(file);
            final Source sourceSansCsv = source.trimOrNull(".csv");
            if (sourceSansCsv != null) {
                final Table table = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        }
        return builder.build();
    }

5. 自定義 Table

Schema 有了,並且數據文件 csv 也映射成 Table 了,一個 csv 文件對應一個 Table

接下來我們去自定義 Table,自定義 Table 的核心是我們要定義欄位的類型和名稱,以及如何讀取 csv文件。

  1. 先獲取數據類型和名稱,即單表結構,從csv文件頭中獲取(當前文件頭需要我們自己定義,包括規則我們也可以訂製化)。
/**
 * Base class for table that reads CSV files.
 */
public abstract class CsvTable extends AbstractTable {
    protected final Source source;
    protected final @Nullable RelProtoDataType protoRowType;
    private @Nullable RelDataType rowType;
    private @Nullable List<RelDataType> fieldTypes;
​
    /**
     * Creates a CsvTable.
     */
    CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
        this.source = source;
        this.protoRowType = protoRowType;
    }
		/**
		* 創建一個CsvTable,繼承AbstractTable,需要實現裡面的getRowType方法,此方法就是獲取當前的表結構。
			Table的類型有很多種,比如還有視圖類型,AbstractTable類中幫我們默認實現了Table介面的一些方法,比如getJdbcTableType			方法,默認為Table類型,如果有其他訂製化需求可直接實現Table介面。
			和AbstractSchema很像
		*/
    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (protoRowType != null) {
            return protoRowType.apply(typeFactory);
        }
        if (rowType == null) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    null);
        }
        return rowType;
    }
​
    /**
     * Returns the field types of this CSV table.
     */
    public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {
        if (fieldTypes == null) {
            fieldTypes = new ArrayList<>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    fieldTypes);
        }
        return fieldTypes;
    }
  
   public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                            Source source, @Nullable List<RelDataType> fieldTypes) {
        final List<RelDataType> types = new ArrayList<>();
        final List<String> names = new ArrayList<>();
        try (CSVReader reader = openCsv(source)) {
            String[] strings = reader.readNext();
            if (strings == null) {
                strings = new String[]{"EmptyFileHasNoColumns:boolean"};
            }
            for (String string : strings) {
                final String name;
                final RelDataType fieldType;
                //就是簡單的讀取字元串冒號前面是名稱,冒號後面是類型
                final int colon = string.indexOf(':');
                if (colon >= 0) {
                    name = string.substring(0, colon);
                    String typeString = string.substring(colon + 1);
                    Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
                    if (decimalMatcher.matches()) {
                        int precision = Integer.parseInt(decimalMatcher.group(1));
                        int scale = Integer.parseInt(decimalMatcher.group(2));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else {
                        switch (typeString) {
                            case "string":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                            case "boolean":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
                                break;
                            case "byte":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
                                break;
                            case "char":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
                                break;
                            case "short":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
                                break;
                            case "int":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
                                break;
                            case "long":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
                                break;
                            case "float":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
                                break;
                            case "double":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
                                break;
                            case "date":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
                                break;
                            case "timestamp":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
                                break;
                            case "time":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
                                break;
                            default:
                                LOGGER.warn(
                                        "Found unknown type: {} in file: {} for column: {}. Will assume the type of "
                                                + "column is string.",
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                        }
                    }
                } else {
                    //  如果沒定義,默認都是String類型,欄位名稱也是string
                    name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                }
                names.add(name);
                types.add(fieldType);
                if (fieldTypes != null) {
                    fieldTypes.add(fieldType);
                }
            }
        } catch (IOException e) {
            // ignore
        }
        if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}
  1. 獲取文件中的數據,上面把Table的表結構欄位名稱和類型都獲取到了以後,就剩最後一步了,獲取文件中的數據。我們需要自定義一個類,實現 ScannableTable 介面,並且實現裡面唯一的方法 scan 方法,其實本質上就是讀文件,然後把文件的每一行的數據和上述獲取的 fileType 進行匹配。
@Override
    public Enumerable<Object[]> scan(DataContext root) {
        JavaTypeFactory typeFactory = root.getTypeFactory();
        final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
        final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
        final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
        return new AbstractEnumerable<@Nullable Object[]>() {
            @Override
            public Enumerator<@Nullable Object[]> enumerator() {
                //返回我們自定義的讀取數據的類
                return new CsvEnumerator<>(source, cancelFlag, false, null,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false));
            }
        };
    }
 
 
 public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
                         @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
        this.cancelFlag = cancelFlag;
        this.rowConverter = rowConverter;
        this.filterValues = filterValues == null ? null
                : ImmutableNullableList.copyOf(filterValues);
        try {
 
            this.reader = openCsv(source);
            //跳過第一行,因為第一行是定義類型和名稱的
            this.reader.readNext(); // skip header row
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
//CsvEnumerator必須實現calcit自己的迭代器,裡面有current、moveNext方法,current是返回當前游標所在的數據記錄,moveNext是將游標指向下一個記錄,官網中自己定義了一個類型轉換器,是將csv文件中的數據轉換成文件頭指定的類型,這個需要我們自己來實現
     @Override
    public E current() {
        return castNonNull(current);
    }
 
    @Override
    public boolean moveNext() {
        try {
            outer:
            for (; ; ) {
                if (cancelFlag.get()) {
                    return false;
                }
                final String[] strings = reader.readNext();
                if (strings == null) {
                    current = null;
                    reader.close();
                    return false;
                }
                if (filterValues != null) {
                    for (int i = 0; i < strings.length; i++) {
                        String filterValue = filterValues.get(i);
                        if (filterValue != null) {
                            if (!filterValue.equals(strings[i])) {
                                continue outer;
                            }
                        }
                    }
                }
                current = rowConverter.convertRow(strings);
                return true;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
 
        protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
            if (fieldType == null || string == null) {
                return string;
            }
            switch (fieldType.getSqlTypeName()) {
                case BOOLEAN:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Boolean.parseBoolean(string);
                case TINYINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Byte.parseByte(string);
                case SMALLINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Short.parseShort(string);
                case INTEGER:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Integer.parseInt(string);
                case BIGINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Long.parseLong(string);
                case FLOAT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Float.parseFloat(string);
                case DOUBLE:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Double.parseDouble(string);
                case DECIMAL:
                    if (string.length() == 0) {
                        return null;
                    }
                    return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
                case DATE:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_DATE.parse(string);
                        return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                    } catch (ParseException e) {
                        return null;
                    }
                case TIME:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIME.parse(string);
                        return (int) date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case TIMESTAMP:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIMESTAMP.parse(string);
                        return date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case VARCHAR:
                default:
                    return string;
            }
        }

6. 最後

至此我們需要準備的東西:庫、表名稱、欄位名稱、欄位類型都有了,接下來我們去寫我們的 SQL 語句查詢我們的數據文件。

創建好幾個測試的數據文件,例如上面項目結構中我創建 2 個 csv 文件USERINFO.csvASSET.csv,然後創建測試類。

這樣跑起來,就可以通過 SQL 語句的方式直接查詢數據了。

public class Test {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo "));
 
            print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
        } finally {
            connection.close();
        }
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查詢結果:

這裡在測試的時候踩到2個坑,大家如果自己實驗的時候可以避免下。

  1. Calcite 默認會把你的 SQL 語句中的表名和類名全部轉換為大寫,因為默認的 csv(其他文件也一樣)文件的名稱就是表名,除非你自定義規則,所以你的文件名要寫成大寫。

  2. Calcite 有一些默認的關鍵字不能用作表名,不然會查詢失敗,比如我剛開始定的user.csv就一直查不出來,改成USERINFO就可以了,這點和Mysql 的內置關鍵字差不多,也可以通過個性化配置去改。

演示Mysql

  1. 首先,還是先準備 Calcite 需要的東西:庫、表名稱、欄位名稱、欄位類型。

如果數據源使用Mysql的話,這些都不用我們去 JAVA 服務中去定義,直接在 Mysql 客戶端創建好,這裡直接創建兩張表用於測試,就和我們的csv文件一樣。

CREATE TABLE `USERINFO1` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE TABLE `ASSET` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 這些都不需要創建,因為 Calcite 默認提供了 Mysql 的 Adapter適配器。

  2. 其實,上述兩步都不需要做,我們真正要做的是,告訴 Calcite 你的 JDBC 的連接資訊就行了,也是在 model.json 文件中定義。

{
  "version": "1.0",
  "defaultSchema": "Demo",
  "schemas": [
    {
      "name": "Demo",
      "type": "custom",
    //  這裡是calcite默認的SchemaFactory,裡面的流程和我們上述自己定義的相同,下面會簡單看看源碼。
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        //  我用的是mysql8以上版本,所以這裡注意包的名稱
        "jdbcDriver": "com.mysql.cj.jdbc.Driver",
        "jdbcUrl": "jdbc:mysql://localhost:3306/irving",
        "jdbcUser": "root",
        "jdbcPassword": "123456"
      }
    }
  ]
}
  1. 在項目中引入 Mysql 的驅動包
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.30</version>
</dependency>
  1. 寫好測試類,這樣直接就相當於完成了所有的功能了。
public class TestMysql {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            statement.executeUpdate(" insert into  userinfo1 values ('xxx',12) ");
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo1 "));
 
            print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo1 where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
        } finally {
            connection.close();
        }
 
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查詢結果:

Mysql實現原理

上述我們在 model.json 文件中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory類,可以看下這個類的程式碼。

這個類是把 FactorySchema 寫在了一起,其實就是調用schemafactory類的create方法創建一個 schema 出來,和我們上面自定義的流程是一樣的。

其中JdbcSchema類也是 Schema 的子類,所以也會實現getTable方法(這個我們上述也實現了,我們當時是獲取表結構和表的欄位類型以及名稱,是從csv文件頭中讀文件的),JdbcSchema的實現是通過連接 Mysql 服務端查詢元數據資訊,再將這些資訊封裝成 Calcite需要的對象格式。

這裡同樣要注意 csv方式的2個注意點,大小寫和關鍵字問題。

public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
        //會走在這裡來,這裡就是我們在model.json中指定的jdbc的連接資訊,最終會創建一個datasource
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
 
    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  }
 
  @Override public @Nullable Table getTable(String name) {
    return getTableMap(false).get(name);
  }
 
  private synchronized ImmutableMap<String, JdbcTable> getTableMap(
      boolean force) {
    if (force || tableMap == null) {
      tableMap = computeTables();
    }
    return tableMap;
  }
 
  private ImmutableMap<String, JdbcTable> computeTables() {
    Connection connection = null;
    ResultSet resultSet = null;
    try {
      connection = dataSource.getConnection();
      final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
      final String catalog = catalogSchema.left;
      final String schema = catalogSchema.right;
      final Iterable<MetaImpl.MetaTable> tableDefs;
      Foo threadMetadata = THREAD_METADATA.get();
      if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
        final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
        //  獲取元數據
        final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null, null);
        while (resultSet.next()) {
        //獲取庫名,表明等資訊
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      }
 
      final ImmutableMap.Builder<String, JdbcTable> builder =
          ImmutableMap.builder();
      for (MetaImpl.MetaTable tableDef : tableDefs) {
        final String tableTypeName2 =
            tableDef.tableType == null
            ? null
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
        final TableType tableType =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        }
        //  最終封裝成JdbcTable對象
        final JdbcTable table =
            new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      }
      return builder.build();
    } catch (SQLException e) {
      throw new RuntimeException(
          "Exception while reading tables", e);
    } finally {
      close(connection, null, resultSet);
    }
  }

SQL執行流程

OK,到這裡基本上兩個簡單的案例已經演示好了,最後補充一下整個Calcite架構和整個 SQL 的執行流程。

整個流程如下:SQL解析(Parser)=> SQL校驗(Validator)=> SQL查詢優化(optimizer)=> SQL生成 => SQL執行

SQL Parser

所有的 SQL 語句在執行前都需要經歷 SQL 解析器解析,解析器的工作內容就是將 SQL 中的 Token 解析成抽象語法樹,每個樹的節點都是一個 SqlNode,這個過程其實就是 Sql Text => SqlNode 的過程。

我們前面的 Demo 沒有自定義 Parser,是因為 Calcite 採用了自己默認的 Parser(SqlParserImpl)。

SqlNode

SqlNode是整個解析中的核心,比如圖中你可以發現,對於每個比如selectfromwhere關鍵字之後的內容其實都是一個SqlNode

parserConfig方法主要是設置 SqlParserFactory 的參數,比如我們上面所說的我本地測試的時候踩的大小寫的坑,就可以在這裡設置。

直接調用setCaseSensitive=false即不會將 SQL 語句中的表名列名轉為大寫,下面是默認的,其他的參數可以按需配置。

SQL Validator

SQL 語句先經過 Parser,然後經過語法驗證器,注意 Parser 並不會驗證語法的正確性。

其實 Parser 只會驗證 SQL 關鍵詞的位置是否正確,我們上述2個 Parser 的例子中都沒有創建 schematable 這些,但是如果這樣寫,那就會報錯,這個錯誤就是 parser 檢測後拋出來的(ParseLocationErrorTest)。

真正的校驗在 validator 中,會去驗證查詢的表名是否存在,查詢的欄位是否存在,類型是否匹配,這個過程比較複雜,默認的 validatorSqlValidatorImpl

查詢優化

比如關係代數,比如什麼投影、笛卡爾積這些,Calcite提供了很多內部的優化器,也可以實現自己的優化器。

適配器

Calcite 是不包含存儲層的,所以提供一種適配器的機制來訪問外部的數據存儲或者存儲引擎。

最後,進階

官網裡面寫了未來會支援Kafka適配器到公共Api中,到時候使用起來就和上述集成Mysql一樣方便,但是現在還沒有支援,我這裡給大家提供個自己實現的方式,這樣就可以通過 SQL 的方式直接查詢 Kafka 中的 Topic 數據等資訊。

這裡我們內部集成實現了KSQL的能力,查詢結果是OK的。

還是像上述步驟一樣,我們需要準備庫、表名稱、欄位名稱、欄位類型、數據源(多出來的地方)。

  1. 自定義Sql解析,之前我們都沒有自定義解析,這裡需要自定義解析,是因為我需要動態解析sqlwhere條件裡面的partation
  • 配置解析器,就是之前案例中提到的配置大小寫之類的
  • 創建解析器,使用的默認SqlParseImpl
  • 開始解析,生成AST,我們可以基於生成的SqlNode做一些業務相關的校驗和參數解析
  1. 適配器獲取數據源
   public class KafkaConsumerAdapter {
       public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {
           Properties props = new Properties();
           props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           List<TopicPartition> topics = new ArrayList<>();
           for (Integer partition : kafkaSql.getPartition()) {
               TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
               topics.add(tp);
           }
           consumer.assign(topics);
           for (TopicPartition tp : topics) {
               Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
               long position = 500;
               if (offsets.get(tp).longValue() > position) {
                   consumer.seek(tp, offsets.get(tp).longValue() - 500);
               } else {
                   consumer.seek(tp, 0);
               }
           }
           List<KafkaResult> results = new ArrayList<>();
           boolean flag = true;
           while (flag) {
               ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
               for (ConsumerRecord<String, String> record : records) {
                   //轉成我定義的對象集合
                   KafkaResult result = new KafkaResult();
                   result.setPartition(record.partition());
                   result.setOffset(record.offset());
                   result.setMsg(record.value());
                   result.setKey(record.key());
                   results.add(result);
               }
               if (!records.isEmpty()) {
                   flag = false;
               }
           }
           consumer.close();
           return results;
       }
   
   }
  1. 執行查詢,就可以得到我們想要的效果了。
   public class TestKafka {
       public static void main(String[] args) throws Exception {
           KafkaService kafkaService = new KafkaService();
           //把解析到的參數放在我自己定義的kafkaSqlInfo對象中
           KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           //適配器獲取數據源,主要是從上述的sqlInfo對象中去poll數據
           List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
           //執行查詢
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
           sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%'  limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
   
           sqlInfo = kafkaService.parseSql("select count(*) AS addad  from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
       }
   
       private static void query(String tableName, List<KafkaResult> results,
                                 String sql) throws Exception {
           //創建model.json,設置我的SchemaFactory,設置庫名
           String model = createTempJson();
           //設置我的表結構,表名稱和表欄位名以及類型
           KafkaTableSchema.generateSchema(tableName, results);
           Properties info = new Properties();
           info.setProperty("lex", Lex.JAVA.toString());
           Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
           Statement st = connection.createStatement();
           //執行
           ResultSet result = st.executeQuery(sql);
           ResultSetMetaData rsmd = result.getMetaData();
           List<Map<String, Object>> ret = new ArrayList<>();
           while (result.next()) {
               Map<String, Object> map = new LinkedHashMap<>();
               for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                   map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
               }
               ret.add(map);
           }
           result.close();
           st.close();
           connection.close();
       }
   
       private static void print(ResultSet resultSet) throws SQLException {
           final ResultSetMetaData metaData = resultSet.getMetaData();
           final int columnCount = metaData.getColumnCount();
           while (resultSet.next()) {
               for (int i = 1; ; i++) {
                   System.out.print(resultSet.getString(i));
                   if (i < columnCount) {
                       System.out.print(", ");
                   } else {
                       System.out.println();
                       break;
                   }
               }
           }
       }
   
       private static String createTempJson() throws IOException {
           JSONObject object = new JSONObject();
           object.put("version", "1.0");
           object.put("defaultSchema", "QAKAFKA");
           JSONArray array = new JSONArray();
           JSONObject tmp = new JSONObject();
           tmp.put("name", "QAKAFKA");
           tmp.put("type", "custom");
           tmp.put("factory", "kafka.KafkaSchemaFactory");
           array.add(tmp);
           object.put("schemas", array);
           return object.toJSONString();
       }
   }
  • 生成臨時的model.json,之前是基於文件,現在基於text字元串,mode=inline模式
  • 設置我的表結構、表名稱、欄位名、欄位類型等,並放置在記憶體中,同時將適配器查詢出來的數據也放進去table裡面
  • 獲取連接,執行查詢,完美!