Ignite實戰

1.概述

本篇部落格將對Ignite的基礎環境、集群快照、分散式計算、SQL查詢與處理、機器學習等內容進行介紹。

2.內容

2.1 什麼是Ignite?

在學習Ignite之前,我們先來了解一下什麼是Ignite?首先,Ignite是Apache開源的頂級項目之一。Ignite 記憶體數組組織框架是一個高性能、集成和分散式的記憶體計算和事務平台,用於大規模的數據集處理,比傳統的基於磁碟或快閃記憶體的技術具有更高的性能,同時他還為應用和不同的數據源之間提供高性能、分散式記憶體中數據組織管理的功能。

2.2 安裝環境要求

Apache Ignite官方在如下環境中進行了測試:

  • JDK:Oracle JDK8、11或17,Open JDK8、11或17,IBM JDK8、11或17;
  • OS:Linux(任何版本),Mac OS X(10.6及以上),Windows(XP及以上),Windows Server(2008及以上),Oracle Solaris;
  • 網路:沒有限制(建議10G甚至更快的網路頻寬);
  • 架構:x86,x64,SPARC,PowerPC。

支援Docker、DEB/RPM、Kubernetes、虛擬機等安裝模式。

2.3 Ignite啟動和停止

節點的類型有兩種:服務端節點和客戶端節點。服務端節點參與快取、計算的執行、流數據處理等。客戶端節點提供遠程接入服務端的能力,有完整的Ignite API支援,包括近快取、事務、計算、流處理、服務等。所有的節點默認都以服務端模式啟動,客戶端模式需要顯式指定。

1.啟動服務端節點

執行如下所示命令:

ignite.sh path/to/configuration.xml

2.啟動客戶端節點

執行如下Java程式碼片段:

IgniteConfiguration cfg = new IgniteConfiguration();

// 開啟客戶端模式
cfg.setClientMode(true);

// 啟動客戶端
Ignite ignite = Ignition.start(cfg);

3.停止服務節點

強制停止某個節點時,可能會導致數據丟失或數據不一致,甚至會使節點無法重啟。當節點沒有響應且無法正常關閉時,應將強制停止作為最後的手段。正常停止可以使節點完成關鍵操作並正確完成其生命周期,執行正常停止的正確過程如下:

  • 使用以下方法之一停止節點
    • 以編程方式調用Ignite.close();
    • 以編程方式調用System.exit();
    • 發送用戶中斷訊號。Ignite使用JVM關閉鉤子在JVM停止之前執行自定義邏輯。如果通過運行ignite.sh來啟動節點並且不將其與終端分離,則可以通過按下Ctrl+C來停止節點。
  • 從基準線拓撲中刪除該節點。如果啟用了基準線自動調整,則可以不執行此步驟。

從基準拓撲中刪除節點將在其餘節點上開始再平衡過程。如果計劃在停止後立即重啟該節點,則不必進行再平衡。在這種情況下,請勿從基準拓撲中刪除該節點。

2.4 集群快照

Ignite 提供了使用Ignite Persistence為部署創建完整集群快照的能力 。Ignite 快照包括持久在磁碟上的所有數據記錄的一致的集群範圍副本以及恢復過程所需的一些其他文件。快照結構類似於 Ignite Persistence 存儲目錄的布局,但有幾個例外。讓我們以這個快照為例來回顧一下結構:

work
└── snapshots
    └── backup23012020
        └── db
            ├── binary_meta
            │         ├── node1
            │         ├── node2
            │         └── node3
            ├── marshaller
            │         ├── node1
            │         ├── node2
            │         └── node3
            ├── node1
            │    └── my-sample-cache
            │        ├── cache_data.dat
            │        ├── part-3.bin
            │        ├── part-4.bin
            │        └── part-6.bin
            ├── node2
            │    └── my-sample-cache
            │        ├── cache_data.dat
            │        ├── part-1.bin
            │        ├── part-5.bin
            │        └── part-7.bin
            └── node3
                └── my-sample-cache
                    ├── cache_data.dat
                    ├── part-0.bin
                    └── part-2.bin
  • 快照位於該目錄下,並work\snapshots命名為Ignite 的工作目錄。backup23012020work
  • 快照是為 3 節點集群創建的,所有節點都在同一台機器上運行。在此示例中,節點被命名為node1、node2和node3,而在實踐中,名稱等於節點的 一致 ID。
  • my-sample-cache快照保留快取的副本。
  • 該文件夾將數據記錄的db副本保存在文件中。只要當前還原過程不需要預寫和檢查點,就不會將其添加到快照中。part-N.bincache_data.dat
  • binary_meta和目錄存儲元數據和特定於marshaller編組器的資訊。
注意:通常快照分布在整個集群中
前面的示例顯示了為在同一台物理機上運行的集群創建的快照。因此,整個快照位於一個位置。在實踐中,所有節點都將運行在不同的機器上,快照數據分布在集群中。每個節點保存一段快照,其中包含屬於該特定節點的數據。恢復過程解釋了如何在恢復過程中將所有段連接在一起。

2.4.1 配置

1.快照目錄

默認情況下,快照的一部分存儲在各個 Ignite 節點的工作目錄中,並使用 Ignite Persistence 保存數據、索引、WAL 和其他文件的相同存儲介質。由於快照可以消耗與持久性文件已經佔用的空間一樣多的空間,並且可以通過與 Ignite Persistence 常式共享磁碟 I/O 來影響應用程式的性能,因此建議將快照和持久性文件存儲在不同的媒體上。

2.快照執行池

默認情況下,快照執行緒池大小的值為4。減少快照創建過程中涉及的執行緒數會增加拍攝快照的總時間。但是,這會將磁碟負載保持在合理的範圍內。

2.4.2 創建快照

Ignite 提供了幾個用於創建快照的 API。

1.使用控制腳本

Ignite 提供了支援以下列出的與快照相關的命令的控制腳本:

# Create a cluster snapshot named "snapshot_09062021" in the background:
control.(sh|bat) --snapshot create snapshot_09062021

# Create a cluster snapshot named "snapshot_09062021" and wait for the entire operation to complete:
control.(sh|bat) --snapshot create snapshot_09062021 --sync

# Create a cluster snapshot named "snapshot_09062021" in the "/tmp/ignite/snapshots" folder (the full path to the snapshot files will be /tmp/ignite/snapshots/snapshot_09062021):
control.(sh|bat) --snapshot create snapshot_09062021 -dest /tmp/ignite/snapshots

# Cancel a running snapshot named "snapshot_09062021":
control.(sh|bat) --snapshot cancel snapshot_09062021

# Kill a running snapshot named "snapshot_09062021":
control.(sh|bat) --kill SNAPSHOT snapshot_09062021

2.使用JMX

使用該SnapshotMXBean介面通過 JMX 執行特定於快照的過程:

方法 描述
createSnapshot(String snpName) 創建快照

cancelSnapshot(String snpName)

取消節點上的快照已啟動其創建

3.使用Java API

此外,還可以在 Java 中以編程方式創建快照:

CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>("snapshot-cache");

try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(ccfg)) {
    cache.put(1, "Maxim");

    // Start snapshot operation.
    ignite.snapshot().createSnapshot("snapshot_02092020").get();
}
finally {
    ignite.destroyCache(ccfg.getName());
}

2.4.3 檢查快照一致性

通常所有集群節點都運行在不同的機器上,並且快照數據分布在整個集群中。每個節點都存儲自己的快照段,因此在某些情況下,可能需要在從快照恢復之前檢查快照的數據完整性和整個集群的數據一致性。

對於這種情況,Apache Ignite 提供了內置的快照一致性檢查命令,使您能夠驗證內部數據一致性,計算數據分區哈希和頁面校驗和,並在發現問題時列印結果。check 命令還將主分區的哈希值與相應的備份分區進行比較,並報告任何差異。

2.4.4 從快照恢復

快照可以在停止的集群上手動恢復,也可以在活動的集群上自動恢復。下面描述了這兩個過程,但是,最好只使用控制腳本中的恢復命令。

1.手動快照恢復過程

快照結構類似於 Ignite Native Persistence 的布局,因此對於手動快照還原,您必須僅在具有相同節點的相同集群consistentId和拍攝快照的相同拓撲上執行快照還原。如果您需要在不同的集群或不同的集群拓撲上恢復快照,請使用 自動快照恢復過程。

一般來說,停止集群,然後用快照中的數據替換持久化數據和其他文件,然後重新啟動節點。

詳細過程如下所示:

  • 停止要恢復的集群
  • 從檢查點$IGNITE_HOME/work/cp目錄中刪除所有文件
  • 在每個節點上執行以下操作:
    • {nodeId}從目錄中刪除與 相關的文件$IGNITE_HOME/work/db/binary_meta。
    • {nodeId}從目錄中刪除與 相關的文件$IGNITE_HOME/work/db/marshaller。
    • {nodeId}刪除與您的目錄下相關的文件和子目錄$IGNITE_HOME/work/db。db/{node_id}如果目錄不在 Ignite 目錄下,請單獨清理該目錄work。
    • 將屬於具有{node_id}快照的節點的文件複製到$IGNITE_HOME/work/目錄中。如果該db/{node_id}目錄不在 Ignitework目錄下,那麼您需要將數據文件複製到那裡。
  • 重啟集群

2.自動快照恢復過程

自動恢復過程允許用戶使用 Java API 或命令行腳本從活動集群上的快照恢復快取組。

目前,此過程有幾個限制,將在未來的版本中解決:

  • 僅當快照的所有部分都存在於集群中時,才能進行恢復。每個節點通過給定的快照名稱和一致的節點 ID 在配置的快照路徑中查找本地快照數據。
  • 恢復過程只能應用於用戶創建的快取組。
  • 要從快照恢復的快取組不得存在於集群中。如果它們存在,則用戶必須在開始此操作之前將它們銷毀。
  • 不允許並發還原操作。因此,如果一個操作已經開始,則只有在第一個操作完成後才能啟動另一個操作。

以下程式碼片段演示了如何從快照恢復單個快取組。

// Restore cache named "snapshot-cache" from the snapshot "snapshot_02092020".
ignite.snapshot().restoreSnapshot("snapshot_02092020", Collections.singleton("snapshot-cache")).get();

3.使用 CLI 控制還原操作

該control.sh|bat腳本提供了啟動和停止恢復操作的能力。

# Start restoring all user-created cache groups from the snapshot "snapshot_09062021" in the background.
control.(sh|bat) --snapshot restore snapshot_09062021 --start

# Start restoring all user-created cache groups from the snapshot "snapshot_09062021" and wait for the entire operation to complete.
control.(sh|bat) --snapshot restore snapshot_09062021 --start --sync

# Start restoring all user-created cache groups from the snapshot "snapshot_09062021" located in the "/tmp/ignite/snapshots" folder (the full path to the snapshot files should be /tmp/ignite/snapshots/snapshot_09062021):
control.(sh|bat) --snapshot restore snapshot_09062021 --src /tmp/ignite/snapshots

# Start restoring only "cache-group1" and "cache-group2" from the snapshot "snapshot_09062021" in the background.
control.(sh|bat) --snapshot restore snapshot_09062021 --start --groups cache-group1,cache-group2

# Cancel the restore operation for "snapshot_09062021".
control.(sh|bat) --snapshot restore snapshot_09062021 --cancel

2.4.5 一致性保證

在集群範圍內的並發操作以及與 Ignite 的持續更改方面,所有快照都是完全一致的。持久化數據、索引、模式、二進位元數據、編組器和節點上的其他文件。

集群範圍的快照一致性是通過觸發Partition-Map-Exchange 過程來實現的。通過這樣做,集群最終將到達所有先前啟動的事務都完成並暫停新事務的時間點。一旦發生這種情況,集群將啟動快照創建過程。PME 過程確保快照包括處於一致狀態的主備份和備份。

Ignite Persistence 文件與其快照副本之間的一致性是通過將原始文件複製到目標快照目錄並跟蹤所有並發正在進行的更改來實現的。跟蹤更改可能需要 Ignite Persistence 存儲介質上的額外空間(最多為存儲介質的 1 倍大小)。

2.5 分散式計算

Ignite 提供了一個 API,用於以平衡和容錯的方式在集群節點之間分配計算。您可以提交單個任務以供執行,也可以通過自動任務拆分來實現 MapReduce 模式。API 提供對作業分配策略的細粒度控制。

2.5.1 獲取計算介面

運行分散式計算的主要入口點是計算介面,它可以從Ignite.

Ignite ignite = Ignition.start();

IgniteCompute compute = ignite.compute();

2.5.2 指定計算的節點集

計算介面的每個實例都與執行任務的一組節點相關聯。不帶參數調用時,ignite.compute()返回與所有伺服器節點關聯的計算介面。要獲取特定節點子集的實例,請使用Ignite.compute(ClusterGroup group). 在以下示例中,計算介面僅綁定到遠程節點,即除運行此程式碼的節點之外的所有節點。

Ignite ignite = Ignition.start();

IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());

2.5.3 執行任務

Ignite 提供了三個介面,可以實現代表一個任務並通過計算介面執行:

  • IgniteRunnable— 其擴展java.lang.Runnable可用於實現沒有輸入參數且不返回結果的計算。
  • IgniteCallablejava.util.concurrent.Callable—返回特定值的擴展。
  • IgniteClosure— 接受參數並返回值的功能介面。

您可以執行一次任務(在其中一個節點上)或將其廣播到所有節點。

2.5.4 執行一個可運行的任務

要執行可運行的任務,請使用run(…​)計算介面的方法。任務被發送到與計算實例關聯的節點之一。

IgniteCompute compute = ignite.compute();

// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
    compute.run(() -> System.out.println(word));
}

2.5.5 執行可調用任務

要執行可調用任務,請使用call(…​)計算介面的方法。

Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

// Iterate through all words in the sentence and create callable jobs.
for (String word : "How many characters".split(" "))
    calls.add(word::length);

// Execute the collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);

// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();

2.5.6 執行IgniteClosure

要執行IgniteClosure,請使用apply(…​)計算介面的方法。該方法接受任務和任務的輸入參數。IgniteClosure參數在執行時傳遞給給定的。

IgniteCompute compute = ignite.compute();

// Execute closure on all cluster nodes.
Collection<Integer> res = compute.apply(String::length, Arrays.asList("How many characters".split(" ")));

// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();

2.5.7 廣播任務

該方法在與計算實例關聯的所有節點broadcast()上執行任務。

// Limit broadcast to remote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());

// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));

2.5.8 非同步執行

前幾節中描述的所有方法都有非同步對應物:

  • callAsync(…​)
  • runAsync(…​)
  • applyAsync(…​)
  • broadcastAsync(…​)

非同步方法返回一個IgniteFuture表示操作結果的值。在以下示例中,非同步執行一組可調用任務。

 

IgniteCompute compute = ignite.compute();

Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

// Iterate through all words in the sentence and create callable jobs.
for (String word : "Count characters using a callable".split(" "))
    calls.add(word::length);

IgniteFuture<Collection<Integer>> future = compute.callAsync(calls);

future.listen(fut -> {
    // Total number of characters.
    int total = fut.get().stream().mapToInt(Integer::intValue).sum();

    System.out.println("Total number of characters: " + total);
});

2.5.9 執行超時任務

您可以設置任務執行的超時時間。如果任務沒有在給定的時間範圍內完成,它會被停止並取消該任務產生的所有作業。

要執行超時任務,請使用withTimeout(…​)計算介面的方法。該方法返回一個計算介面,該介面以時間限制的方式執行給它的第一個任務。後續任務沒有超時:您需要調用withTimeout(…​)每個應該有超時的任務。

IgniteCompute compute = ignite.compute();

compute.withTimeout(300_000).run(() -> {
    // your computation
    // ...
});

2.5.10 在本地節點上的作業之間共享狀態

在一個節點上執行的不同計算作業之間共享狀態通常很有用。為此,每個節點上都有一個共享的並發本地映射。

IgniteCluster cluster = ignite.cluster();

ConcurrentMap<String, Integer> nodeLocalMap = cluster.nodeLocalMap();

節點局部值類似於執行緒局部變數,因為這些值不分布並且僅保留在本地節點上。節點本地數據可用於在計算作業之間共享狀態。它也可以被部署的服務使用。

在以下示例中,作業每次在某個節點上執行時都會增加一個節點本地計數器。結果,每個節點上的節點本地計數器告訴我們作業在該節點上執行了多少次。

IgniteCallable<Long> job = new IgniteCallable<Long>() {
    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public Long call() {
        // Get a reference to node local.
        ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite.cluster().nodeLocalMap();

        AtomicLong cntr = nodeLocalMap.get("counter");

        if (cntr == null) {
            AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());

            if (old != null)
                cntr = old;
        }

        return cntr.incrementAndGet();
    }
};

2.5.11 從計算任務訪問數據

如果您的計算任務需要訪問存儲在快取中的數據,您可以通過以下實例來完成Ignite:

public class MyCallableTask implements IgniteCallable<Integer> {

    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public Integer call() throws Exception {

        IgniteCache<Long, Person> cache = ignite.cache("person");

        // Get the data you need
        Person person = cache.get(1L);

        // do with the data what you need to do

        return 1;
    }
}

請注意,上面顯示的示例可能不是最有效的方法。原因是key對應的person對象1可能位於與執行任務的節點不同的節點上。在這種情況下,對象是通過網路獲取的。這可以通過將任務與數據放在一起來避免。

注意:
如果要在IgniteCallable和IgniteRunnable任務中使用鍵和值對象,請確保鍵和值類部署在所有集群節點上。

2.6 SQL查詢與處理

Ignite 帶有符合 ANSI-99、水平可擴展和容錯的分散式 SQL 資料庫。根據用例,通過跨集群節點對數據進行分區或完全複製來提供分布。

作為 SQL 資料庫,Ignite 支援所有 DML 命令,包括 SELECT、UPDATE、INSERT 和 DELETE 查詢,並且還實現了與分散式系統相關的 DDL 命令子集。

您可以通過連接來自外部工具和應用程式的JDBC或ODBC驅動程式與 Ignite 進行交互,就像與任何其他啟用了 SQL 的存儲一樣。Java、.NET 和 C++ 開發人員可以利用本機 SQL API。

在內部,SQL 表與鍵值快取具有相同的數據結構。這意味著您可以更改數據的分區分布並利用親和力託管技術來獲得更好的性能。

Ignite 的默認 SQL 引擎使用 H2 資料庫來解析和優化查詢並生成執行計劃,但也可以啟用基於 Apache Calcite 的 SQL 引擎來執行查詢。

2.6.1 分散式查詢

針對分區表的查詢以分散式方式執行:

  • 查詢被解析並拆分為多個「map」查詢和一個「reduce」查詢。
  • 所有地圖查詢都在所需數據所在的所有節點上執行。
  • 所有節點都向查詢發起者提供本地執行的結果集,查詢發起者反過來會將提供的結果集合併到最終結果中。

您可以強制在本地處理查詢,即在存儲在執行查詢的節點上的數據子集上。

2.6.2 本地查詢

如果對複製表執行查詢,它將針對本地數據運行。

對分區表的查詢以分散式方式執行。但是,您可以強制對分區表執行本地查詢。

2.6.3 SQL架構

Ignite 有許多默認模式並支援創建自定義模式。

默認情況下有兩種可用的模式:

  • SYS 模式,其中包含許多帶有集群節點資訊的系統視圖。您不能在此架構中創建表。有關詳細資訊,請參閱系統視圖頁面。
  • PUBLIC 架構,在未指定架構時默認使用。

在以下情況下會創建自定義模式:

  • 您可以在集群配置中指定自定義模式。
  • Ignite 為通過其中一個編程介面或 XML 配置創建的每個快取創建一個模式

1.公共模式

每當需要並且未指定模式時,默認使用 PUBLIC 模式。例如,當您通過 JDBC 連接到集群而不顯式設置模式時,您將連接到 PUBLIC 模式。

2.自定義模式

可以通過 的sqlSchemas屬性設置自定義模式IgniteConfiguration。您可以在啟動集群之前在配置中指定模式列表,然後在運行時在這些模式中創建對象。

下面是一個帶有兩個自定義模式的配置示例。

IgniteConfiguration cfg = new IgniteConfiguration();

SqlConfiguration sqlCfg = new SqlConfiguration();

sqlCfg.setSqlSchemas("MY_SCHEMA", "MY_SECOND_SCHEMA" );

cfg.setSqlConfiguration(sqlCfg);

要通過例如 JDBC 驅動程式連接到特定模式,請在連接字元串中提供模式名稱:

jdbc:ignite:thin://127.0.0.1/MY_SCHEMA

3.快取和架構名稱

當您使用可查詢欄位創建快取時,您可以使用SQL API操作快取的數據。在 SQL 術語中,每個這樣的快取對應於一個單獨的模式,其名稱等於快取的名稱。

同樣,當您通過 DDL 語句創建表時,您可以通過 Ignite 支援的編程介面將其作為鍵值快取進行訪問。可以通過在語句部分提供CACHE_NAME參數來指定相應快取的名稱。WITHCREATE TABLE

CREATE TABLE City (
  ID INT(11),
  Name CHAR(35),
  CountryCode CHAR(3),
  District CHAR(20),
  Population INT(11),
  PRIMARY KEY (ID, CountryCode)
) WITH "backups=1, CACHE_NAME=City";

2.6.4 SQL索引

Ignite 自動為每個主鍵和親和鍵欄位創建索引。當您在值對象中的欄位上定義索引時,Ignite 會創建一個由索引欄位和快取的主鍵組成的複合索引。在 SQL 術語中,這意味著索引將由兩列組成:要索引的列和主鍵列。

1.使用註解配置索引

@QuerySqlField可以通過注釋從程式碼中配置索引以及可查詢欄位。在下面的示例中,Ignite SQL 引擎將為id和salary欄位創建索引。

public class Person implements Serializable {
    /** Indexed field. Will be visible to the SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible to the SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible to the SQL engine. */
    private int age;

    /**
     * Indexed field sorted in descending order. Will be visible to the SQL engine.
     */
    @QuerySqlField(index = true, descending = true)
    private float salary;
}

類型名用作 SQL 查詢中的表名。在這種情況下,我們的表名將是Person(模式名稱的使用和定義在模式部分中解釋)。

id和都是salary索引欄位。id將按升序(默認)和salary降序排序。

如果你不想索引一個欄位,但你仍然需要在 SQL 查詢中使用它,那麼該欄位必須在沒有index = true參數的情況下進行注釋。這樣的欄位稱為可查詢欄位。在上面的示例中,name被定義為可查詢欄位。

該age欄位既不可查詢也不是索引欄位,因此無法從 SQL 查詢中訪問。

定義索引欄位時,需要註冊索引類型。

2.索引嵌套對象

嵌套對象的欄位也可以使用注釋進行索引和查詢。例如,考慮一個Person將Address對象作為欄位的對象:

public class Person {
    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible for SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible for SQL engine. */
    private int age;

    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private Address address;
}

類的結構Address可能如下所示:

public class Address {
    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField (index = true)
    private String street;

    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private int zip;
}

在上面的示例中,@QuerySqlField(index = true)在類的所有欄位以及Address類中的Address對象上都指定了註解Person。

這使得執行如下 SQL 查詢成為可能:

QueryCursor<List<?>> cursor = personCache.query(new SqlFieldsQuery( "select * from Person where street = 'street1'"));

請注意,您不需要address.street在 SQL 查詢的 WHERE 子句中指定。這是因為Address類的欄位在表中被展平,Person這僅允許我們直接訪問Address查詢中的欄位。

3.註冊索引類型

定義索引和可查詢欄位後,必須在 SQL 引擎中註冊它們以及它們所屬的對象類型。

要指定應該索引哪些類型,請在方法中傳遞相應的鍵值對,CacheConfiguration.setIndexedTypes()如下例所示。

// Preparing configuration.
CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>();

// Registering indexed type.
ccfg.setIndexedTypes(Long.class, Person.class);

此方法只接受成對的類型:一個用於鍵類,另一個用於值類。基元作為盒裝類型傳遞。

4.組索引

要設置可以加速複雜條件查詢的多欄位索引,可以使用@QuerySqlField.Group註解。如果您希望一個欄位成為多個組的一部分,您可以添加多個@QuerySqlField.Group注釋。orderedGroups

例如,在Person下面的類中,我們有一個age屬於索引組的欄位,該age_salary_idx組以「0」的組順序和降序排序。此外,在同一個組中,我們有salary一個組順序為「3」和升序排序的欄位。此外,該欄位salary本身是一個單列索引(index = true除了orderedGroups聲明之外還指定了參數)。組order不必是特定的數字。只需要對特定組內的欄位進行排序。

public class Person implements Serializable {
    /** Indexed in a group index with "salary". */
    @QuerySqlField(orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 0, descending = true) })

    private int age;

    /** Indexed separately and in a group index with "age". */
    @QuerySqlField(index = true, orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 3) })
    private double salary;
}

5.使用查詢實體配置索引

索引和可查詢欄位也可以通過org.apache.ignite.cache.QueryEntity便於基於 Spring XML 的配置的類進行配置。

作為上面基於注釋的配置的一部分討論的所有概念也適用於QueryEntity基於方法。此外,其欄位配置了@QuerySqlField註解並註冊到CacheConfiguration.setIndexedTypes()方法的類型在內部轉換為查詢實體。

下面的示例展示了如何定義單個欄位索引、組索引和可查詢欄位。

 

CacheConfiguration<Long, Person> cache = new CacheConfiguration<Long, Person>("myCache");

QueryEntity queryEntity = new QueryEntity();

queryEntity.setKeyFieldName("id").setKeyType(Long.class.getName()).setValueType(Person.class.getName());

LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("id", "java.lang.Long");
fields.put("name", "java.lang.String");
fields.put("salary", "java.lang.Long");

queryEntity.setFields(fields);

queryEntity.setIndexes(Arrays.asList(new QueryIndex("name"),
        new QueryIndex(Arrays.asList("id", "salary"), QueryIndexType.SORTED)));

cache.setQueryEntities(Arrays.asList(queryEntity));

在這種情況下,我們的表名將是Person(模式名稱的使用和定義在Schemas頁面上進行了解釋)。

定義好之後,就QueryEntity可以按如下方式執行 SQL 查詢:

SqlFieldsQuery qry = new SqlFieldsQuery("SELECT id, name FROM Person" + "WHERE id > 1500 LIMIT 10");

6.配置索引內聯大小

適當的索引內聯大小可以幫助加快對索引欄位的查詢。有關如何選擇合適的內聯大小的資訊,請參閱SQL 調優指南中的專門部分。

在大多數情況下,您只需為可變長度欄位(例如字元串或數組)上的索引設置內聯大小。默認值為 10。

您可以通過設置來更改默認值

  • 每個索引單獨的內聯大小,或
  • CacheConfiguration.sqlIndexMaxInlineSize給定快取中所有索引的屬性,或
  • IGNITE_MAX_INDEX_PAYLOAD_SIZE集群中所有索引的系統屬性

設置按上面列出的順序應用。

您還可以單獨為每個索引配置內聯大小,這將覆蓋默認值。要為用戶定義的索引設置索引內聯大小,請使用以下方法之一。在所有情況下,該值都以位元組為單位。

  • 使用註解時:
@QuerySqlField(index = true, inlineSize = 13)
private String country;
  • 使用時QueryEntity:
QueryIndex idx = new QueryIndex("country");
idx.setInlineSize(13);
queryEntity.setIndexes(Arrays.asList(idx));
  • 如果您使用該CREATE INDEX命令創建索引,則可以使用該INLINE_SIZE選項設置內聯大小:
create index country_idx on Person (country) INLINE_SIZE 13;

7.自定義鍵

如果您只對主鍵使用預定義的 SQL 數據類型,那麼您不需要對 SQL 模式配置執行額外的操作。這些數據類型由GridQueryProcessor.SQL_TYPES常量定義,如下所示。

預定義的 SQL 數據類型包括:

  • 所有原語及其包裝器,除了char和Character
  • String
  • BigDecimal
  • byte[]
  • java.util.Date, java.sql.Date,java.sql.Timestamp
  • java.util.UUID

但是,一旦您決定引入自定義複雜鍵並從 DML 語句中引用其欄位,您需要:

  • QueryEntity以與為值對象設置欄位相同的方式定義這些欄位。
  • 使用新的配置參數QueryEntity.setKeyFields(..)來區分鍵欄位和值欄位。

下面的示例顯示了如何執行此操作。

// Preparing cache configuration.
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<Long, Person>("personCache");

// Creating the query entity.
QueryEntity entity = new QueryEntity("CustomKey", "Person");

// Listing all the queryable fields.
LinkedHashMap<String, String> fields = new LinkedHashMap<>();

fields.put("intKeyField", Integer.class.getName());
fields.put("strKeyField", String.class.getName());

fields.put("firstName", String.class.getName());
fields.put("lastName", String.class.getName());

entity.setFields(fields);

// Listing a subset of the fields that belong to the key.
Set<String> keyFlds = new HashSet<>();

keyFlds.add("intKeyField");
keyFlds.add("strKeyField");

entity.setKeyFields(keyFlds);

// End of new settings, nothing else here is DML related

entity.setIndexes(Collections.<QueryIndex>emptyList());

cacheCfg.setQueryEntities(Collections.singletonList(entity));

ignite.createCache(cacheCfg);

2.6.5 SQL API

除了使用 JDBC 驅動程式之外,Java 開發人員還可以使用 Ignite 的 SQL API 來查詢和修改存儲在 Ignite 中的數據。

該類SqlFieldsQuery是用於執行 SQL 語句和瀏覽結果的介面。SqlFieldsQuery通過IgniteCache.query(SqlFieldsQuery)返回查詢游標的方法執行。

1.配置可查詢欄位

如果要使用 SQL 語句查詢快取,則需要定義值對象的哪些欄位是可查詢的。可查詢欄位是 SQL 引擎可以「看到」和查詢的數據模型的欄位。

在 Java 中,可以通過兩種方式配置可查詢欄位:

  • 使用注釋
  • 通過定義查詢實體

要使特定欄位可查詢,​​請在值類定義中使用@QuerySqlField註解和調用來註解欄位CacheConfiguration.setIndexedTypes(…​)

class Person implements Serializable {
    /** Indexed field. Will be visible to the SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible to the SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible to the SQL engine. */
    private int age;

    /**
     * Indexed field sorted in descending order. Will be visible to the SQL engine.
     */
    @QuerySqlField(index = true, descending = true)
    private float salary;
}

public static void main(String[] args) {
    Ignite ignite = Ignition.start();
    CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
    personCacheCfg.setName("Person");

    personCacheCfg.setIndexedTypes(Long.class, Person.class);
    IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}

確保調用CacheConfiguration.setIndexedTypes(…​)以讓 SQL 引擎知道帶注釋的欄位。

2.查詢實體

QueryEntity您可以使用該類定義可查詢欄位。查詢實體可以通過 XML 配置進行配置。

class Person implements Serializable {
    private long id;

    private String name;

    private int age;

    private float salary;
}

public static void main(String[] args) {
    Ignite ignite = Ignition.start();
    CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
    personCacheCfg.setName("Person");

    QueryEntity queryEntity = new QueryEntity(Long.class, Person.class)
            .addQueryField("id", Long.class.getName(), null).addQueryField("age", Integer.class.getName(), null)
            .addQueryField("salary", Float.class.getName(), null)
            .addQueryField("name", String.class.getName(), null);

    queryEntity.setIndexes(Arrays.asList(new QueryIndex("id"), new QueryIndex("salary", false)));

    personCacheCfg.setQueryEntities(Arrays.asList(queryEntity));

    IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}

3.查詢

要在快取上執行選擇查詢,只需創建一個對象,SqlFieldsQuery將查詢字元串提供給構造函數並運行cache.query(…​)。請注意,在以下示例中,必須將 Person 快取配置為對 SQL 引擎可見。

IgniteCache<Long, Person> cache = ignite.cache("Person");

SqlFieldsQuery sql = new SqlFieldsQuery(
        "select concat(firstName, ' ', lastName) from Person");

// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql)) {
    for (List<?> row : cursor)
        System.out.println("personName=" + row.get(0));
}

SqlFieldsQuery返回一個游標,該游標遍歷與 SQL 查詢匹配的結果。

4.本地執行

要強制本地執行查詢,請使用SqlFieldsQuery.setLocal(true). 在這種情況下,查詢是針對存儲在運行查詢的節點上的數據執行的。這意味著查詢的結果幾乎總是不完整的。僅當您確信自己了解此限制時才使用本地模式。

5.WHERE子句中的子查詢

SELECT在INSERTandMERGE語句中使用的查詢以及SELECT由UPDATEandDELETE操作生成的查詢以colocated 或 non-colocated 分散式模式分布和執行。

但是,如果有一個子查詢作為WHERE子句的一部分執行,則它只能在 colocated 模式下執行。

例如,讓我們考慮以下查詢:

DELETE FROM Person WHERE id IN
    (SELECT personId FROM Salary s WHERE s.amount > 2000);

SQL 引擎生成SELECT查詢以獲取要刪除的條目列表。該查詢在整個集群中分布和執行,如下所示:

SELECT _key, _val FROM Person WHERE id IN
    (SELECT personId FROM Salary s WHERE s.amount > 2000);

但是,IN子句 ( SELECT personId FROM Salary …​) 中的子查詢不會進一步分布,而是在節點上可用的本地數據集上執行。

6.插入、更新、刪除和合併

SqlFieldsQuery您可以執行其他 DML 命令以修改數據:

// 插入
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(
        new SqlFieldsQuery("INSERT INTO Person(id, firstName, lastName) VALUES(?, ?, ?)")
                .setArgs(1L, "John", "Smith"))
        .getAll();
// 更新
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("UPDATE Person set lastName = ? " + "WHERE id >= ?")
        .setArgs("Jones", 2L)).getAll();
// 刪除
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("DELETE FROM Person " + "WHERE id >= ?").setArgs(2L))
        .getAll();
// 合併
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("MERGE INTO Person(id, firstName, lastName)"
        + " values (1, 'John', 'Smith'), (5, 'Mary', 'Jones')")).getAll();

用於SqlFieldsQuery執行 DDL 語句時,必須調用getAll()從query(…​)方法返回的游標。

7.指定架構

默認情況下,執行的任何 SELECT 語句SqlFieldsQuery都將針對 PUBLIC 模式進行解析。但是,如果您要查詢的表在不同的架構中,您可以通過調用來指定架構SqlFieldsQuery.setSchema(…​)。在這種情況下,語句在給定的模式中執行。

SqlFieldsQuery sql = new SqlFieldsQuery("select name from City").setSchema("PERSON");

或者,您可以在語句中定義架構:

SqlFieldsQuery sql = new SqlFieldsQuery("select name from Person.City");

8.創建表

您可以將任何受支援的 DDL 語句傳遞到SqlFieldsQuery快取並在快取上執行,如下所示。

IgniteCache<Long, Person> cache = ignite
        .getOrCreateCache(new CacheConfiguration<Long, Person>().setName("Person"));

// Creating City table.
cache.query(new SqlFieldsQuery(
        "CREATE TABLE City (id int primary key, name varchar, region varchar)")).getAll();

在 SQL 模式方面,執行程式碼會創建以下表:

  • 「Person」模式中的表「Person」(如果之前沒有創建過)。
  • 「人員」模式中的表「城市」。

要查詢「City」表,請使用select * from Person.Cityor之類new SqlFieldsQuery(“select * from City”).setSchema(“PERSON”)的語句(注意大寫)。

9.取消查詢

有兩種方法可以取消長時間運行的查詢。

第一種方法是通過設置查詢執行超時來防止查詢失控。

SqlFieldsQuery query = new SqlFieldsQuery("SELECT * from Person");

// Setting query execution timeout
query.setTimeout(10_000, TimeUnit.SECONDS);

第二種方法是通過使用來停止查詢QueryCursor.close()。

SqlFieldsQuery query = new SqlFieldsQuery("SELECT * FROM Person");

// Executing the query
QueryCursor<List<?>> cursor = cache.query(query);

// Halting the query that might be still in progress.
cursor.close();

2.6.6 機器學習

Apache Ignite 機器學習 (ML) 是一組簡單、可擴展且高效的工具,無需昂貴的數據傳輸即可構建預測機器學習模型。

將機器和深度學習 (DL) 添加到 Apache Ignite 的基本原理非常簡單。今天的數據科學家必須處理阻礙 ML 被主流採用的兩個主要因素:

  • 首先,在不同的系統中訓練和部署模型(訓練結束後)。數據科學家必須等待 ETL 或其他一些數據傳輸過程才能將數據移動到 Apache Mahout 或 Apache Spark 等系統中以進行培訓。然後他們必須等待此過程完成並在生產環境中重新部署模型。整個過程可能需要數小時才能將數 TB 的數據從一個系統轉移到另一個系統。此外,訓練部分通常發生在舊數據集上。
  • 第二個因素與可擴展性有關。必須處理不再適合單個伺服器單元的數據集的 ML 和 DL 演算法正在不斷增長。這促使數據科學家提出複雜的解決方案,或者轉向分散式計算平台,如 Apache Spark 和 TensorFlow。然而,這些平台大多隻解決了模型訓練的一部分難題,這使得開發人員決定以後如何在生產中部署模型成為負擔。

 

 1.零 ETL 和大規模可擴展性

Ignite 機器學習依賴於 Ignite 以記憶體為中心的存儲,它為 ML 和 DL 任務帶來了巨大的可擴展性,並消除了 ETL 在不同系統之間施加的等待。例如,它允許用戶直接在 Ignite 集群中跨記憶體和磁碟存儲的數據上運行 ML/DL 訓練和推理。接下來,Ignite 提供了大量針對 Ignite 的並置分散式處理進行優化的 ML 和 DL 演算法。當針對大量數據集或增量針對傳入數據流運行時,這些實現提供記憶體速度和無限的水平可擴展性,而無需將數據移動到另一個存儲中。通過消除數據移動和較長的處理等待時間

2.容錯和持續學習

Apache Ignite 機器學習可以容忍節點故障。這意味著在學習過程中出現節點故障的情況下,所有的恢復過程對用戶都是透明的,學習過程不會中斷,我們會在類似於所有節點都正常工作的情況下得到結果。

3.演算法和適用性

3.1 分類

根據訓練集識別新觀察屬於哪個類別。

  • 適用性:垃圾郵件檢測、影像識別、信用評分、疾病識別。
  • 演算法: 邏輯回歸、線性 SVM(支援向量機)、k-NN 分類、樸素貝葉斯、決策樹、隨機森林、多層感知器、梯度提升、ANN(近似最近鄰)

3.2 回歸

對標量因變數 (y) 與一個或多個解釋變數或自變數 (x) 之間的關係進行建模。

  • 適用性:藥物反應、股票價格、超市收入。
  • 演算法:線性回歸、決策樹回歸、k-NN 回歸。

3.3 聚類

以這樣一種方式對一組對象進行分組,即同一組(稱為集群)中的對象彼此之間(在某種意義上)比其他組(集群)中的對象更相似。

  • 適用性:客戶細分、實驗結果分組、購物項目分組。
  • 演算法: K-Means 聚類、高斯混合 (GMM)。

3.4 推薦

構建推薦系統,它是資訊過濾系統的子類,旨在預測用戶對項目的「評分」或「偏好」。

  • 適用性: 影片和音樂服務的播放列表生成器,服務的產品推薦器
  • 演算法: 矩陣分解。

3.5 預處理

特徵提取和歸一化。

  • 適用性:轉換輸入數據(例如文本)以用於機器學習演算法,以提取我們需要適應的特徵,對輸入數據進行規範化。
  • 演算法: Apache Ignite ML 支援使用基於分區的數據集功能進行自定義預處理,並具有默認預處理器,例如規範化預處理器、one-hot-encoder、min-max 縮放器等。

3.總結

Ignite和Hadoop解決的是不同業務場景的問題,即使在一定程度上可能應用了類似的底層基礎技術。Ignite是一種多用途,和OLAP/ OLTP記憶體中數據結構相關的,而Hadoop僅僅是Ignite原生支援的諸多數據來源之一。

Spark是一個和Ignite類似的項目。但是Spark聚焦於OLAP,而Ignite憑藉強大的事務處理能力在混合型的OLTP/ OLAP場景中表現能力更好。特別是針對Hadoop,Ignite將為現有的MapReduce框架,Hive作業提供即插即用模式的加速,避免了推倒重來的做法,而Spark需要先做數據ETL,更適合開發新的分析應用。

4.結束語

這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,部落客出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買部落客的書進行學習,在此感謝大家的支援。關注下面公眾號,根據提示,可免費獲取書籍的教學影片。