10分鐘教你寫一個資料庫
今天教大家藉助一款框架快速實現一個資料庫,這個框架就是Calcite
,下面會帶大家通過兩個例子快速教會大家怎麼實現,一個是可以通過 SQL 語句的方式可以直接查詢文件內容,第二個是模擬 Mysql 查詢功能,以及最後告訴大家怎麼實現 SQL 查詢 Kafka 數據。
Calcite
Calcite
是一個用於優化異構數據源的查詢處理的可插拔基礎框架(他是一個框架),可以將任意數據(Any data, Anywhere)DML 轉換成基於 SQL 的 DML 引擎,並且我們可以選擇性的使用它的部分功能。
Calcite能幹什麼
-
使用 SQL 訪問記憶體中某個數據
-
使用 SQL 訪問某個文件的數據
-
跨數據源的數據訪問、聚合、排序等(例如 Mysql 和 Redis 數據源中的數據進行join)
當我們需要自建一個資料庫的時候,數據可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方介面數據等等,我們只有數據,我們想讓這些數據支援 SQL 形式動態增刪改查。
另外,像Hive、Drill、Flink、Phoenix 和 Storm 等項目中,數據處理系統都是使用 Calcite 來做 SQL 解析和查詢優化,當然,還有部分用來構建自己的 JDBC driver。
名詞解釋
Token
就是將標準 SQL(可以理解為Mysql)關鍵詞以及關鍵詞之間的字元串截取出來,每一個token
,會被封裝為一個SqlNode
,SqlNode
會衍生很多子類,比如Select
會被封裝為SqlSelect
,當前 SqlNode
也能反解析為 SQL 文本。
RelDataTypeField
某個欄位的名稱和類型資訊
RelDataType
多個 RelDataTypeField 組成了 RelDataType,可以理解為數據行
Table
一個完整的表的資訊
Schema
所有元數據的組合,可以理解為一組 Table 或者庫的概念
開始使用
1. 引入包
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<!-- 目前最新版本 2022-09-10日更新-->
<version>1.32.0</version>
</dependency>
2. 創建model.json文件和表結構csv
model.json 裡面主要描述或者說告訴 Calcite
如何創建 Schema
,也就是告訴框架怎麼創建出庫。
{
"version": "1.0",//忽略
"defaultSchema": "CSV",//設置默認的schema
"schemas": [//可定義多個schema
{
"name": "CSV",//相當於namespace和上面的defaultSchema的值對應
"type": "custom",//寫死
"factory": "csv.CsvSchemaFactory",//factory的類名必須是你自己實現的factory的包的全路徑
"operand": { //這裡可以傳遞自定義參數,最終會以map的形式傳遞給factory的operand參數
"directory": "csv"//directory代表calcite會在resources下面的csv目錄下面讀取所有的csv文件,factory創建的Schema會吧這些文件全部構建成Table,可以理解為讀取數據文件的根目錄,當然key的名稱也不一定非得用directory,你可以隨意指定
}
}
]
}
接下來還需要定義一個 csv
文件,用來定義表結構。
NAME:string,MONEY:string
aixiaoxian,10000萬
xiaobai,10000萬
adong,10000萬
maomao,10000萬
xixi,10000萬
zizi,10000萬
wuwu,10000萬
kuku,10000萬
整個項目的結構大概就是這樣:
3. 實現Schema的工廠類
在上述文件中指定的包路徑下去編寫 CsvSchemaFactory
類,實現 SchemaFactory
介面,並且實現裡面唯一的方法 create
方法,創建Schema
(庫)。
public class CsvSchemaFactory implements SchemaFactory {
/**
* parentSchema 父節點,一般為root
* name 為model.json中定義的名字
* operand 為model.json中定於的數據,這裡可以傳遞自定義參數
*
* @param parentSchema Parent schema
* @param name Name of this schema
* @param operand The "operand" JSON property
* @return
*/
@Override
public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
final String directory = (String) operand.get("directory");
File directoryFile = new File(directory);
return new CsvSchema(directoryFile, "scannable");
}
}
4. 自定義Schma類
有了 SchemaFactory
,接下來需要自定義 Schema
類。
自定義的 Schema
需要實現 Schema
介面,但是直接實現要實現的方法太多,我們去實現官方的 AbstractSchema
類,這樣就只需要實現一個方法就行(如果有其他訂製化需求可以實現原生介面)。
核心的邏輯就是createTableMap
方法,用於創建出 Table
表。
他會掃描指定的Resource
下面的所有 csv
文件,將每個文件映射成Table
對象,最終以map
形式返回,Schema
介面的其他幾個方法會用到這個對象。
//實現這一個方法就行了
@Override
protected Map<String, Table> getTableMap() {
if (tableMap == null) {
tableMap = createTableMap();
}
return tableMap;
}
private Map<String, Table> createTableMap() {
// Look for files in the directory ending in ".csv"
final Source baseSource = Sources.of(directoryFile);
//會自動過濾掉非指定文件後綴的文件,我這裡寫的csv
File[] files = directoryFile.listFiles((dir, name) -> {
final String nameSansGz = trim(name, ".gz");
return nameSansGz.endsWith(".csv");
});
if (files == null) {
System.out.println("directory " + directoryFile + " not found");
files = new File[0];
}
// Build a map from table name to table; each file becomes a table.
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
for (File file : files) {
Source source = Sources.of(file);
final Source sourceSansCsv = source.trimOrNull(".csv");
if (sourceSansCsv != null) {
final Table table = createTable(source);
builder.put(sourceSansCsv.relative(baseSource).path(), table);
}
}
return builder.build();
}
5. 自定義 Table
Schema
有了,並且數據文件 csv
也映射成 Table
了,一個 csv
文件對應一個 Table
。
接下來我們去自定義 Table
,自定義 Table
的核心是我們要定義欄位的類型和名稱,以及如何讀取 csv
文件。
- 先獲取數據類型和名稱,即單表結構,從
csv
文件頭中獲取(當前文件頭需要我們自己定義,包括規則我們也可以訂製化)。
/**
* Base class for table that reads CSV files.
*/
public abstract class CsvTable extends AbstractTable {
protected final Source source;
protected final @Nullable RelProtoDataType protoRowType;
private @Nullable RelDataType rowType;
private @Nullable List<RelDataType> fieldTypes;
/**
* Creates a CsvTable.
*/
CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
this.source = source;
this.protoRowType = protoRowType;
}
/**
* 創建一個CsvTable,繼承AbstractTable,需要實現裡面的getRowType方法,此方法就是獲取當前的表結構。
Table的類型有很多種,比如還有視圖類型,AbstractTable類中幫我們默認實現了Table介面的一些方法,比如getJdbcTableType 方法,默認為Table類型,如果有其他訂製化需求可直接實現Table介面。
和AbstractSchema很像
*/
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (protoRowType != null) {
return protoRowType.apply(typeFactory);
}
if (rowType == null) {
rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
null);
}
return rowType;
}
/**
* Returns the field types of this CSV table.
*/
public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {
if (fieldTypes == null) {
fieldTypes = new ArrayList<>();
CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
fieldTypes);
}
return fieldTypes;
}
public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, @Nullable List<RelDataType> fieldTypes) {
final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
try (CSVReader reader = openCsv(source)) {
String[] strings = reader.readNext();
if (strings == null) {
strings = new String[]{"EmptyFileHasNoColumns:boolean"};
}
for (String string : strings) {
final String name;
final RelDataType fieldType;
//就是簡單的讀取字元串冒號前面是名稱,冒號後面是類型
final int colon = string.indexOf(':');
if (colon >= 0) {
name = string.substring(0, colon);
String typeString = string.substring(colon + 1);
Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
if (decimalMatcher.matches()) {
int precision = Integer.parseInt(decimalMatcher.group(1));
int scale = Integer.parseInt(decimalMatcher.group(2));
fieldType = parseDecimalSqlType(typeFactory, precision, scale);
} else {
switch (typeString) {
case "string":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
case "boolean":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
break;
case "byte":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
break;
case "char":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
break;
case "short":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
break;
case "int":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
break;
case "long":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
break;
case "float":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
break;
case "double":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
break;
case "date":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
break;
case "timestamp":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
break;
case "time":
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
break;
default:
LOGGER.warn(
"Found unknown type: {} in file: {} for column: {}. Will assume the type of "
+ "column is string.",
typeString, source.path(), name);
fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
break;
}
}
} else {
// 如果沒定義,默認都是String類型,欄位名稱也是string
name = string;
fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
names.add(name);
types.add(fieldType);
if (fieldTypes != null) {
fieldTypes.add(fieldType);
}
}
} catch (IOException e) {
// ignore
}
if (names.isEmpty()) {
names.add("line");
types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
- 獲取文件中的數據,上面把
Table
的表結構欄位名稱和類型都獲取到了以後,就剩最後一步了,獲取文件中的數據。我們需要自定義一個類,實現ScannableTable
介面,並且實現裡面唯一的方法scan
方法,其實本質上就是讀文件,然後把文件的每一行的數據和上述獲取的fileType
進行匹配。
@Override
public Enumerable<Object[]> scan(DataContext root) {
JavaTypeFactory typeFactory = root.getTypeFactory();
final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<@Nullable Object[]>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {
//返回我們自定義的讀取數據的類
return new CsvEnumerator<>(source, cancelFlag, false, null,
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}
public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
@Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
this.cancelFlag = cancelFlag;
this.rowConverter = rowConverter;
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {
this.reader = openCsv(source);
//跳過第一行,因為第一行是定義類型和名稱的
this.reader.readNext(); // skip header row
} catch (IOException e) {
throw new RuntimeException(e);
}
}
//CsvEnumerator必須實現calcit自己的迭代器,裡面有current、moveNext方法,current是返回當前游標所在的數據記錄,moveNext是將游標指向下一個記錄,官網中自己定義了一個類型轉換器,是將csv文件中的數據轉換成文件頭指定的類型,這個需要我們自己來實現
@Override
public E current() {
return castNonNull(current);
}
@Override
public boolean moveNext() {
try {
outer:
for (; ; ) {
if (cancelFlag.get()) {
return false;
}
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
reader.close();
return false;
}
if (filterValues != null) {
for (int i = 0; i < strings.length; i++) {
String filterValue = filterValues.get(i);
if (filterValue != null) {
if (!filterValue.equals(strings[i])) {
continue outer;
}
}
}
}
current = rowConverter.convertRow(strings);
return true;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
if (fieldType == null || string == null) {
return string;
}
switch (fieldType.getSqlTypeName()) {
case BOOLEAN:
if (string.length() == 0) {
return null;
}
return Boolean.parseBoolean(string);
case TINYINT:
if (string.length() == 0) {
return null;
}
return Byte.parseByte(string);
case SMALLINT:
if (string.length() == 0) {
return null;
}
return Short.parseShort(string);
case INTEGER:
if (string.length() == 0) {
return null;
}
return Integer.parseInt(string);
case BIGINT:
if (string.length() == 0) {
return null;
}
return Long.parseLong(string);
case FLOAT:
if (string.length() == 0) {
return null;
}
return Float.parseFloat(string);
case DOUBLE:
if (string.length() == 0) {
return null;
}
return Double.parseDouble(string);
case DECIMAL:
if (string.length() == 0) {
return null;
}
return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
case DATE:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_DATE.parse(string);
return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
} catch (ParseException e) {
return null;
}
case TIME:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_TIME.parse(string);
return (int) date.getTime();
} catch (ParseException e) {
return null;
}
case TIMESTAMP:
if (string.length() == 0) {
return null;
}
try {
Date date = TIME_FORMAT_TIMESTAMP.parse(string);
return date.getTime();
} catch (ParseException e) {
return null;
}
case VARCHAR:
default:
return string;
}
}
6. 最後
至此我們需要準備的東西:庫、表名稱、欄位名稱、欄位類型都有了,接下來我們去寫我們的 SQL 語句查詢我們的數據文件。
創建好幾個測試的數據文件,例如上面項目結構中我創建 2 個 csv 文件USERINFO.csv
、ASSET.csv
,然後創建測試類。
這樣跑起來,就可以通過 SQL 語句的方式直接查詢數據了。
public class Test {
public static void main(String[] args) throws SQLException {
Connection connection = null;
Statement statement = null;
try {
Properties info = new Properties();
info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
connection = DriverManager.getConnection("jdbc:calcite:", info);
statement = connection.createStatement();
print(statement.executeQuery("select * from asset "));
print(statement.executeQuery(" select * from userinfo "));
print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));
print(statement.executeQuery(" select * from userinfo where age >60 "));
print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
} finally {
connection.close();
}
}
private static void print(ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
}
}
查詢結果:
這裡在測試的時候踩到2個坑,大家如果自己實驗的時候可以避免下。
-
Calcite
默認會把你的 SQL 語句中的表名和類名全部轉換為大寫,因為默認的 csv(其他文件也一樣)文件的名稱就是表名,除非你自定義規則,所以你的文件名要寫成大寫。 -
Calcite
有一些默認的關鍵字不能用作表名,不然會查詢失敗,比如我剛開始定的user.csv
就一直查不出來,改成USERINFO
就可以了,這點和Mysql
的內置關鍵字差不多,也可以通過個性化配置去改。
演示Mysql
- 首先,還是先準備
Calcite
需要的東西:庫、表名稱、欄位名稱、欄位類型。
如果數據源使用Mysql
的話,這些都不用我們去 JAVA 服務中去定義,直接在 Mysql 客戶端創建好,這裡直接創建兩張表用於測試,就和我們的csv
文件一樣。
CREATE TABLE `USERINFO1` (
`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
`AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
CREATE TABLE `ASSET` (
`NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
`MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
-
上述
csv
案例中的SchemaFactory
以及Schema
這些都不需要創建,因為Calcite
默認提供了 Mysql 的 Adapter適配器。 -
其實,上述兩步都不需要做,我們真正要做的是,告訴
Calcite
你的 JDBC 的連接資訊就行了,也是在model.json
文件中定義。
{
"version": "1.0",
"defaultSchema": "Demo",
"schemas": [
{
"name": "Demo",
"type": "custom",
// 這裡是calcite默認的SchemaFactory,裡面的流程和我們上述自己定義的相同,下面會簡單看看源碼。
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
// 我用的是mysql8以上版本,所以這裡注意包的名稱
"jdbcDriver": "com.mysql.cj.jdbc.Driver",
"jdbcUrl": "jdbc:mysql://localhost:3306/irving",
"jdbcUser": "root",
"jdbcPassword": "123456"
}
}
]
}
- 在項目中引入 Mysql 的驅動包
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
- 寫好測試類,這樣直接就相當於完成了所有的功能了。
public class TestMysql {
public static void main(String[] args) throws SQLException {
Connection connection = null;
Statement statement = null;
try {
Properties info = new Properties();
info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
connection = DriverManager.getConnection("jdbc:calcite:", info);
statement = connection.createStatement();
statement.executeUpdate(" insert into userinfo1 values ('xxx',12) ");
print(statement.executeQuery("select * from asset "));
print(statement.executeQuery(" select * from userinfo1 "));
print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));
print(statement.executeQuery(" select * from userinfo1 where age >60 "));
print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
} finally {
connection.close();
}
}
private static void print(ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
}
}
查詢結果:
Mysql實現原理
上述我們在 model.json
文件中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory
類,可以看下這個類的程式碼。
這個類是把 Factory
和 Schema
寫在了一起,其實就是調用schemafactory
類的create
方法創建一個 schema
出來,和我們上面自定義的流程是一樣的。
其中JdbcSchema
類也是 Schema
的子類,所以也會實現getTable
方法(這個我們上述也實現了,我們當時是獲取表結構和表的欄位類型以及名稱,是從csv文件頭中讀文件的),JdbcSchema
的實現是通過連接 Mysql 服務端查詢元數據資訊,再將這些資訊封裝成 Calcite
需要的對象格式。
這裡同樣要注意 csv
方式的2個注意點,大小寫和關鍵字問題。
public static JdbcSchema create(
SchemaPlus parentSchema,
String name,
Map<String, Object> operand) {
DataSource dataSource;
try {
final String dataSourceName = (String) operand.get("dataSource");
if (dataSourceName != null) {
dataSource =
AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
} else {
//會走在這裡來,這裡就是我們在model.json中指定的jdbc的連接資訊,最終會創建一個datasource
final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
final String jdbcDriver = (String) operand.get("jdbcDriver");
final String jdbcUser = (String) operand.get("jdbcUser");
final String jdbcPassword = (String) operand.get("jdbcPassword");
dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
}
} catch (Exception e) {
throw new RuntimeException("Error while reading dataSource", e);
}
String jdbcCatalog = (String) operand.get("jdbcCatalog");
String jdbcSchema = (String) operand.get("jdbcSchema");
String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
return JdbcSchema.create(
parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
} else {
SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
SqlDialectFactory.class, sqlDialectFactory);
return JdbcSchema.create(
parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
}
}
@Override public @Nullable Table getTable(String name) {
return getTableMap(false).get(name);
}
private synchronized ImmutableMap<String, JdbcTable> getTableMap(
boolean force) {
if (force || tableMap == null) {
tableMap = computeTables();
}
return tableMap;
}
private ImmutableMap<String, JdbcTable> computeTables() {
Connection connection = null;
ResultSet resultSet = null;
try {
connection = dataSource.getConnection();
final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
final String catalog = catalogSchema.left;
final String schema = catalogSchema.right;
final Iterable<MetaImpl.MetaTable> tableDefs;
Foo threadMetadata = THREAD_METADATA.get();
if (threadMetadata != null) {
tableDefs = threadMetadata.apply(catalog, schema);
} else {
final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
// 獲取元數據
final DatabaseMetaData metaData = connection.getMetaData();
resultSet = metaData.getTables(catalog, schema, null, null);
while (resultSet.next()) {
//獲取庫名,表明等資訊
final String catalogName = resultSet.getString(1);
final String schemaName = resultSet.getString(2);
final String tableName = resultSet.getString(3);
final String tableTypeName = resultSet.getString(4);
tableDefList.add(
new MetaImpl.MetaTable(catalogName, schemaName, tableName,
tableTypeName));
}
tableDefs = tableDefList;
}
final ImmutableMap.Builder<String, JdbcTable> builder =
ImmutableMap.builder();
for (MetaImpl.MetaTable tableDef : tableDefs) {
final String tableTypeName2 =
tableDef.tableType == null
? null
: tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
final TableType tableType =
Util.enumVal(TableType.OTHER, tableTypeName2);
if (tableType == TableType.OTHER && tableTypeName2 != null) {
System.out.println("Unknown table type: " + tableTypeName2);
}
// 最終封裝成JdbcTable對象
final JdbcTable table =
new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
tableDef.tableName, tableType);
builder.put(tableDef.tableName, table);
}
return builder.build();
} catch (SQLException e) {
throw new RuntimeException(
"Exception while reading tables", e);
} finally {
close(connection, null, resultSet);
}
}
SQL執行流程
OK,到這裡基本上兩個簡單的案例已經演示好了,最後補充一下整個Calcite
架構和整個 SQL 的執行流程。
整個流程如下:SQL解析(Parser)=> SQL校驗(Validator)=> SQL查詢優化(optimizer)=> SQL生成 => SQL執行
SQL Parser
所有的 SQL 語句在執行前都需要經歷 SQL 解析器解析,解析器的工作內容就是將 SQL 中的 Token 解析成抽象語法樹,每個樹的節點都是一個 SqlNode,這個過程其實就是 Sql Text => SqlNode 的過程。
我們前面的 Demo 沒有自定義 Parser,是因為 Calcite 採用了自己默認的 Parser(SqlParserImpl)。
SqlNode
SqlNode
是整個解析中的核心,比如圖中你可以發現,對於每個比如select
、from
、where
關鍵字之後的內容其實都是一個SqlNode
。
parserConfig
方法主要是設置 SqlParserFactory 的參數,比如我們上面所說的我本地測試的時候踩的大小寫的坑,就可以在這裡設置。
直接調用setCaseSensitive=false
即不會將 SQL 語句中的表名列名轉為大寫,下面是默認的,其他的參數可以按需配置。
SQL Validator
SQL 語句先經過 Parser,然後經過語法驗證器,注意 Parser 並不會驗證語法的正確性。
其實 Parser 只會驗證 SQL 關鍵詞的位置是否正確,我們上述2個 Parser 的例子中都沒有創建 schema
和 table
這些,但是如果這樣寫,那就會報錯,這個錯誤就是 parser
檢測後拋出來的(ParseLocationErrorTest)。
真正的校驗在 validator
中,會去驗證查詢的表名是否存在,查詢的欄位是否存在,類型是否匹配,這個過程比較複雜,默認的 validator
是SqlValidatorImpl
。
查詢優化
比如關係代數,比如什麼投影、笛卡爾積這些,Calcite
提供了很多內部的優化器,也可以實現自己的優化器。
適配器
Calcite
是不包含存儲層的,所以提供一種適配器的機制來訪問外部的數據存儲或者存儲引擎。
最後,進階
官網裡面寫了未來會支援Kafka
適配器到公共Api
中,到時候使用起來就和上述集成Mysql
一樣方便,但是現在還沒有支援,我這裡給大家提供個自己實現的方式,這樣就可以通過 SQL 的方式直接查詢 Kafka 中的 Topic 數據等資訊。
這裡我們內部集成實現了KSQL
的能力,查詢結果是OK的。
還是像上述步驟一樣,我們需要準備庫、表名稱、欄位名稱、欄位類型、數據源(多出來的地方)。
- 自定義
Sql
解析,之前我們都沒有自定義解析,這裡需要自定義解析,是因為我需要動態解析sql
的where
條件裡面的partation
。
- 配置解析器,就是之前案例中提到的配置大小寫之類的
- 創建解析器,使用的默認
SqlParseImpl
- 開始解析,生成
AST
,我們可以基於生成的SqlNode
做一些業務相關的校驗和參數解析
- 適配器獲取數據源
public class KafkaConsumerAdapter {
public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> topics = new ArrayList<>();
for (Integer partition : kafkaSql.getPartition()) {
TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
topics.add(tp);
}
consumer.assign(topics);
for (TopicPartition tp : topics) {
Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
long position = 500;
if (offsets.get(tp).longValue() > position) {
consumer.seek(tp, offsets.get(tp).longValue() - 500);
} else {
consumer.seek(tp, 0);
}
}
List<KafkaResult> results = new ArrayList<>();
boolean flag = true;
while (flag) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//轉成我定義的對象集合
KafkaResult result = new KafkaResult();
result.setPartition(record.partition());
result.setOffset(record.offset());
result.setMsg(record.value());
result.setKey(record.key());
results.add(result);
}
if (!records.isEmpty()) {
flag = false;
}
}
consumer.close();
return results;
}
}
- 執行查詢,就可以得到我們想要的效果了。
public class TestKafka {
public static void main(String[] args) throws Exception {
KafkaService kafkaService = new KafkaService();
//把解析到的參數放在我自己定義的kafkaSqlInfo對象中
KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
//適配器獲取數據源,主要是從上述的sqlInfo對象中去poll數據
List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
//執行查詢
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%' limit 1000 ");
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
sqlInfo = kafkaService.parseSql("select count(*) AS addad from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
results = KafkaConsumerAdapter.executor(sqlInfo);
query(sqlInfo.getTableName(), results, sqlInfo.getSql());
}
private static void query(String tableName, List<KafkaResult> results,
String sql) throws Exception {
//創建model.json,設置我的SchemaFactory,設置庫名
String model = createTempJson();
//設置我的表結構,表名稱和表欄位名以及類型
KafkaTableSchema.generateSchema(tableName, results);
Properties info = new Properties();
info.setProperty("lex", Lex.JAVA.toString());
Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
Statement st = connection.createStatement();
//執行
ResultSet result = st.executeQuery(sql);
ResultSetMetaData rsmd = result.getMetaData();
List<Map<String, Object>> ret = new ArrayList<>();
while (result.next()) {
Map<String, Object> map = new LinkedHashMap<>();
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
}
ret.add(map);
}
result.close();
st.close();
connection.close();
}
private static void print(ResultSet resultSet) throws SQLException {
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
}
private static String createTempJson() throws IOException {
JSONObject object = new JSONObject();
object.put("version", "1.0");
object.put("defaultSchema", "QAKAFKA");
JSONArray array = new JSONArray();
JSONObject tmp = new JSONObject();
tmp.put("name", "QAKAFKA");
tmp.put("type", "custom");
tmp.put("factory", "kafka.KafkaSchemaFactory");
array.add(tmp);
object.put("schemas", array);
return object.toJSONString();
}
}
- 生成臨時的
model.json
,之前是基於文件,現在基於text
字元串,mode=inline
模式 - 設置我的表結構、表名稱、欄位名、欄位類型等,並放置在記憶體中,同時將適配器查詢出來的數據也放進去
table
裡面 - 獲取連接,執行查詢,完美!