「事件驅動架構」使用GoldenGate創建從Oracle到Kafka的CDC事件流
- 2019 年 12 月 10 日
- 筆記
我們通過GoldenGate技術在Oracle DB和Kafka代理之間創建集成,該技術實時發布Kafka中的CDC事件流。
Oracle在其Oracle GoldenGate for Big Data套件中提供了一個Kafka連接處理程式,用於將CDC(更改數據捕獲)事件流推送到Apache Kafka集群。
因此,對於給定的Oracle資料庫,成功完成的業務事務中的任何DML操作(插入、更新、刪除)都將轉換為實時發布的Kafka消息。
這種集成對於這類用例非常有趣和有用:
- 如果遺留的單片應用程式使用Oracle資料庫作為單一數據源,那麼應該可以通過監視相關表的更改來創建實時更新事件流。換句話說,我們可以實現來自遺留應用程式的數據管道,而無需更改它們。
- 我們需要承認只有在資料庫事務成功完成時才會發布Kafka消息。為了賦予這個特性,我們可以(始終以事務的方式)在一個由GoldenGate特別監視的表中編寫Kafka消息,通過它的Kafka連接處理程式,將發布一個「插入」事件來存儲原始的Kafka消息。
在本文中,我們將逐步說明如何通過GoldenGate技術實現PoC(概念驗證)來測試Oracle資料庫與Kafka之間的集成。
PoC的先決條件
我們將安裝所有的東西在一個本地虛擬機,所以你需要:
- 安裝Oracle VirtualBox(我在Oracle VirtualBox 5.2.20上測試過)
- 16 gb的RAM。
- 大約75GB的磁碟空間空閑。
- 最後但並非最不重要的是:了解vi。
PoC架構
本指南將創建一個單一的虛擬機有:
- Oracle資料庫12c:要監視的表存儲在其中。
- Oracle GoldenGate 12c(經典版本):將應用於監視表的業務事務實時提取,以中間日誌格式(trail log)存儲,並將其輸送到另一個GoldenGate(用於大數據)實例管理的遠程日誌。
- Oracle GoldenGate for Big Data 12c:pumped的業務事務並將其複製到Kafka消息中。
- Apache Zookeeper/Apache Kafka實例:在這裡發布Kafka消息中轉換的業務事務。
換句話說,在某些Oracle表上應用的任何插入、更新和刪除操作都將生成Kafka消息的CDC事件流,該事件流將在單個Kafka主題中發布。
下面是我們將要創建的架構和實時數據流:

步驟1/12:啟動Oracle資料庫
您可以自由地安裝Oracle資料庫和Oracle GoldenGate手動。但幸運的是……)Oracle共享了一些虛擬機,這些虛擬機已經安裝了所有的東西,可以隨時進行開發。
Oracle虛擬機可以在這裡下載,你需要一個免費的Oracle帳戶來獲得它們。
我使用了Oracle Big Data Lite虛擬機(ver)。4.11),它包含了很多Oracle產品,包括:
- Oracle資料庫12c第一版企業版(12.1.0.2)
- Oracle GoldenGate 12c (12.3.0.1.2)
從上述下載頁面獲取所有7-zip文件(約22GB),提取VM映像文件BigDataLite411。在Oracle VirtualBox中雙擊文件,打開導入嚮導。完成導入過程後,一個名為BigDataLite-4.11的VM將可用。

啟動BigDataLite-4.11並使用以下憑證登錄:
- 用戶:oracle
- 密碼:welcome1
一個舒適的Linux桌面環境將會出現。
雙擊桌面上的「開始/停止服務」圖標,然後:
- 檢查第一項ORCL (Oracle資料庫12c)。
- 不要檢查所有其他的東西(對PoC無用且有害)。
- 按回車確認選擇。

最後,Oracle資料庫將啟動。
當您重新啟動虛擬機時,Oracle資料庫將自動啟動。
與下載的虛擬機有關的其他有用資訊:
- Oracle主文件夾($ORACLE_HOME)是/u01/app/ Oracle /product/12.1.0.2/dbhome_1
- GoldenGate (classic)安裝在/u01/ogg中
- SQL Developer安裝在/u01/sqldeveloper中。您可以從上面工具欄中的圖標啟動SQL Developer。
- Oracle資料庫是作為多租戶容器資料庫(CDB)安裝的。
- Oracle資料庫監聽埠是1521
- 根容器的Oracle SID是cdb
- PDB(可插拔資料庫)的Oracle SID是orcl
- 所有Oracle資料庫用戶(SYS、SYSTEM等)的密碼都是welcome1
- 連接到PDB資料庫的tnsname別名是ORCL(參見$ORACLE_HOME/network/admin/tnsnames)。ora文件內容)。
- Java主文件夾($JAVA_HOME)是/usr/java/latest
- $JAVA_HOME中安裝的Java開發工具包是JDK8更新151。
步驟2/12:在Oracle中啟用歸檔日誌
我們需要在Oracle中啟用歸檔日誌來使用GoldenGate (classic)。
從VM的Linux shell中啟動SQL Plus作為SYS:
sqlplus sys/welcome1 as sysdba
然後從SQL + shell運行這個命令列表(我建議一次啟動一個):
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;ALTER DATABASE FORCE LOGGING;ALTER SYSTEM SWITCH LOGFILE;ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION=TRUE;SHUTDOWN IMMEDIATE;STARTUP MOUNT;ALTER DATABASE ARCHIVELOG;ALTER DATABASE OPEN;
然後檢查存檔日誌是否成功啟用:
ARCHIVE LOG LIST;
輸出應該是這樣的:
Database log mode Archive ModeAutomatic archival EnabledArchive destination USE_DB_RECOVERY_FILE_DESTOldest online log sequence 527Next log sequence to archive 529Current log sequence 529

步驟3/12:創建一個ggadmin用戶
需要為GoldenGate (classic)創建一個特殊的Oracle管理員用戶。
同樣,從VM的Linux shell中打開SQL Plus:
sqlplus sys/welcome1作為sysdba
並通過運行這個腳本創建ggadmin用戶:
ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE; CREATE USER ggadmin IDENTIFIED BY ggadmin;GRANT CREATE SESSION, CONNECT, RESOURCE, ALTER SYSTEM TO ggadmin;EXEC DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(grantee=>'ggadmin', privilege_type=>'CAPTURE', grant_optional_privileges=>'*');GRANT SELECT ANY DICTIONARY TO ggadmin;GRANT UNLIMITED TABLESPACE TO ggadmin;
步驟4/12 -創建ESHOP模式
我們將創建一個模式(ESHOP),其中只有兩個表(CUSTOMER_ORDER和CUSTOMER_ORDER_ITEM),用於生成要推送到Kafka中的CDC事件流。
使用SQL Plus(或者,如果您願意,也可以使用SQL Developer)連接orcl作為SID的Oracle PDB:
sqlplus sys/welcome1@ORCL as sysdba
運行這個腳本:
— init session ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE; — create tablespace for eshop CREATE TABLESPACE eshop_tbs DATAFILE 'eshop_tbs.dat' SIZE 10M AUTOEXTEND ON;CREATE TEMPORARY TABLESPACE eshop_tbs_temp TEMPFILE 'eshop_tbs_temp.dat' SIZE 5M AUTOEXTEND ON; — create user schema eshop, please note that the password is eshopCREATE USER ESHOP IDENTIFIED BY eshop DEFAULT TABLESPACE eshop_tbs TEMPORARY TABLESPACE eshop_tbs_temp; — grant eshop user permissionsGRANT CREATE SESSION TO ESHOP;GRANT CREATE TABLE TO ESHOP;GRANT UNLIMITED TABLESPACE TO ESHOP;GRANT RESOURCE TO ESHOP;GRANT CONNECT TO ESHOP;GRANT CREATE VIEW TO ESHOP; — create eshop sequencesCREATE SEQUENCE ESHOP.CUSTOMER_ORDER_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE;CREATE SEQUENCE ESHOP.CUSTOMER_ORDER_ITEM_SEQ START WITH 1 INCREMENT BY 1 NOCACHE NOCYCLE; — create eshop tablesCREATE TABLE ESHOP.CUSTOMER_ORDER ( ID NUMBER(19) PRIMARY KEY, CODE VARCHAR2(10), CREATED DATE, STATUS VARCHAR2(32), UPDATE_TIME TIMESTAMP); CREATE TABLE ESHOP.CUSTOMER_ORDER_ITEM ( ID NUMBER(19) PRIMARY KEY, ID_CUSTOMER_ORDER NUMBER(19), DESCRIPTION VARCHAR2(255), QUANTITY NUMBER(3), CONSTRAINT FK_CUSTOMER_ORDER FOREIGN KEY (ID_CUSTOMER_ORDER) REFERENCES ESHOP.CUSTOMER_ORDER (ID));
步驟5/12:初始化GoldenGate Classic
現在是時候在BigDataListe-4.11虛擬機中安裝GoldenGate (classic)實例了。
從Linux shell運行:
cd /u01/ogg./ggsci
GoldenGate CLI(命令行介面)將啟動:
Oracle GoldenGate Command Interpreter for OracleVersion 12.2.0.1.0 OGGCORE_12.2.0.1.0_PLATFORMS_151101.1925.2_FBOLinux, x64, 64bit (optimized), Oracle 12c on Nov 11 2015 03:53:23Operating system character set identified as UTF-8. Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved. GGSCI (bigdatalite.localdomain) 1>
從GoldenGate CLI啟動經理與以下命令:
start mgr
它將引導GoldenGate的主控制器進程(監聽埠7810)。
現在創建一個憑據庫來存儲ggadmin用戶憑據(並使用具有相同名稱的別名來引用它們):
add credentialstorealter credentialstore add user ggadmin password ggadmin alias ggadmin
現在,通過使用剛才創建的ggadmin別名連接到Oracle資料庫,並啟用對存儲在名為orcl的PDB中的eshop模式的附加日誌:
dblogin useridalias ggadminadd schematrandata orcl.eshop

步驟6/12:製作金門果提取物
在此步驟中,我們將創建一個GoldenGate摘要,此過程將監視Oracle archive重做日誌,以捕獲與ESHOP表相關的資料庫事務,並將此SQL修改流寫入另一個名為trail log的日誌文件中。
從GoldenGate CLI運行:
edit params exteshop
該命令將打開一個引用新空文件的vi實例。在vi編輯器中放入以下內容:
EXTRACT exteshopUSERIDALIAS ggadminEXTTRAIL ./dirdat/aaTABLE orcl.eshop.*;
保存內容並退出vi,以便返回GoldenGate CLI。
保存的內容將存儲在/u01/ogg/dirprm/exteshop中。人口、難民和移民事務局文件。您也可以在外部編輯它的內容,而不需要再次從GoldenGate CLI運行「edit params exteshop」命令。
現在在Oracle中註冊提取過程,從GoldenGate CLI運行以下命令:
dblogin useridalias ggadminregister extract exteshop database container (orcl)
最後一個命令的輸出應該是這樣的:
OGG-02003 Extract EXTESHOP successfully registered with database at SCN 13624423.
使用所示的SCN號來完成提取配置。從GoldenGate CLI:
add extract exteshop, integrated tranlog, scn 13624423add exttrail ./dirdat/aa, extract exteshop
現在我們可以啟動名為exteshop的GoldenGate提取過程:
start exteshop
你可以使用以下命令中的on來檢查進程的狀態:
info exteshopview report exteshop
驗證提取過程是否正常工作以完成此步驟。從Linux shell運行以下命令,用SQL Plus(或SQL Developer)連接到ESHOP模式:
sqlplus eshop / eshop@ORCL
創建一個模擬客戶訂單:
INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA01', SYSDATE, 'DRAFT', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Toy Story', 1); COMMIT;
最後,從GoldenGate CLI跑出來:
stats exteshop
並驗證前面的插入操作是否已計算在內。下面是stats命令輸出的一個小示例:
Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER: *** Total statistics since 2019-05-29 09:18:12 ***Total inserts 1.00Total updates 0.00Total deletes 0.00Total discards 0.00Total operations 1.00
檢查提取過程是否正常工作的另一種方法是檢查GoldenGate跟蹤日誌文件的時間戳。在Linux shell中運行「ls -l /u01/ogg/dirdat/」,並驗證以「aa」開頭的文件的時間戳已經更改。

步驟7/12:安裝並運行Apache Kafka
從VM的桌面環境中打開Firefox並下載Apache Kafka(我使用的是kafka_2.11-2.1.1.tgz)。
現在,打開一個Linux shell並重置CLASSPATH環境變數(在BigDataLite-4.11虛擬機中設置的當前值會在Kafka中產生衝突):
declare -x CLASSPATH=""
從同一個Linux shell中,解壓縮壓縮包,啟動ZooKeeper和Kafka:
cdtar zxvf Downloads/kafka_2.11-2.1.1.tgzcd kafka_2.11-2.1.1./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties./bin/kafka-server-start.sh -daemon config/server.properties
你可以通過啟動「echo stats | nc localhost 2181」來檢查ZooKeeper是否正常:
[oracle@bigdatalite ~]$ echo stats | nc localhost 2181Zookeeper version: 3.4.5-cdh5.13.1–1, built on 11/09/2017 16:28 GMTClients: /127.0.0.1:34997[1](queued=0,recved=7663,sent=7664) /0:0:0:0:0:0:0:1:17701[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/25Received: 8186Sent: 8194Connections: 2Outstanding: 0Zxid: 0x3fMode: standaloneNode count: 25
您可以檢查Kafka是否與「echo dump | nc localhost 2181 | grep代理」(一個字元串/brokers/ids/0應該出現)
[oracle@bigdatalite ~]$ echo dump | nc localhost 2181 | grep brokers/brokers/ids/0
用於PoC的BigDataLite-4.11虛擬機已經在啟動虛擬機時啟動了一個較老的ZooKeeper實例。因此,請確保禁用了步驟1中描述的所有服務。 此外,當您打開一個新的Linux shell時,請注意在啟動ZooKeeper和Kafka之前總是要重置CLASSPATH環境變數,這一點在步驟開始時已經解釋過了。

步驟8/12:為大數據安裝GoldenGate
同樣,從這個頁面下載Oracle GoldenGate for Big Data 12c只需要使用VM中安裝的Firefox瀏覽器(我在Linux x86-64上使用Oracle GoldenGate for Big Data 12.3.2.1.1)。請注意,您需要一個(免費)Oracle帳戶來獲得它。
安裝很容易,只是爆炸壓縮包內的下載:
cd ~/Downloadsunzip OGG_BigData_Linux_x64_12.3.2.1.1.zipcd ..mkdir ogg-bd-poccd ogg-bd-poctar xvf ../Downloads/OGG_BigData_Linux_x64_12.3.2.1.1.tar
就這樣,GoldenGate for Big Data 12c被安裝在/home/oracle/ogg-bd-poc文件夾中。
同樣,BigDataLite-4.11虛擬機已經在/u01/ogg-bd文件夾中安裝了用於大數據的GoldenGate。但它是一個較舊的版本,連接Kafka的選項較少。
步驟9/12:啟動GoldenGate for Big Data Manager
打開大數據大門
cd ~/ogg-bd-poc./ggsci
需要更改管理器埠,否則之前啟動的與GoldenGate (classic)管理器的衝突將被引發。
因此,從大數據的GoldenGate來看,CLI運行:
create subdirsedit params mgr
一個vi實例將開始,只是寫這個內容:
PORT 27801
然後保存內容,退出vi,返回CLI,我們終於可以啟動GoldenGate for Big Data manager監聽埠27081:

步驟10/12:創建數據泵(Data Pump)
現在,我們需要創建在GoldenGate世界中被稱為數據泵的東西。數據泵是一個提取過程,它監視一個跟蹤日誌,並(實時地)將任何更改推到另一個由不同的(通常是遠程的)GoldenGate實例管理的跟蹤日誌。
對於這個PoC,由GoldenGate (classic)管理的trail log aa將被泵送至GoldenGate管理的trail log bb進行大數據處理。
因此,如果您關閉它,請回到來自Linux shell的GoldenGate(經典)CLI:
cd /u01/ogg./ggsci
來自GoldenGate(經典)CLI:
edit params pmpeshop
並在vi中加入以下內容:
EXTRACT pmpeshopUSERIDALIAS ggadminSETENV (ORACLE_SID='orcl')– GoldenGate for Big Data address/port:RMTHOST localhost, MGRPORT 27801RMTTRAIL ./dirdat/bbPASSTHRU– The "tokens" part it is useful for writing in the Kafka messages– the Transaction ID and the database Change Serial NumberTABLE orcl.eshop.*, tokens(txid = @GETENV('TRANSACTION', 'XID'), csn = @GETENV('TRANSACTION', 'CSN'));
保存內容並退出vi。
正如已經解釋的提取器,保存的內容將存儲在/u01/ogg/dirprm/pmpeshop中。人口、難民和移民事務局文件。
現在我們要註冊並啟動數據泵,從GoldenGate CLI:
dblogin useridalias ggadminadd extract pmpeshop, exttrailsource ./dirdat/aa begin nowadd rmttrail ./dirdat/bb extract pmpeshopstart pmpeshop
通過從CLI運行以下命令之一來檢查數據泵的狀態:
info pmpeshopview report pmpeshop
你甚至可以在金門大數據的dirdat文件夾中查看trail log bb是否已經創建:
[oracle@bigdatalite dirdat]$ ls -l ~/ogg-bd-poc/dirdattotal 0-rw-r—–. 1 oracle oinstall 0 May 30 13:22 bb000000000[oracle@bigdatalite dirdat]$
那檢查泵送過程呢?來自Linux shell:
sqlplus eshop/eshop@ORCL
執行這個SQL腳本創建一個新的模擬客戶訂單:
INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA02', SYSDATE, 'SHIPPING', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Inside Out', 1); COMMIT;
現在從GoldenGate(經典)CLI運行:
stats pmpeshop
用於檢查插入操作是否正確計數(在輸出的一部分下面):
GGSCI (bigdatalite.localdomain as ggadmin@cdb/CDB$ROOT) 11> stats pmpeshop Sending STATS request to EXTRACT PMPESHOP … Start of Statistics at 2019-05-30 14:49:00. Output to ./dirdat/bb: Extracting from ORCL.ESHOP.CUSTOMER_ORDER to ORCL.ESHOP.CUSTOMER_ORDER: *** Total statistics since 2019-05-30 14:01:56 ***Total inserts 1.00Total updates 0.00Total deletes 0.00Total discards 0.00Total operations 1.00
此外,您還可以驗證GoldenGate中存儲的用於測試泵過程的大數據的跟蹤日誌的時間戳。事務提交後,從Linux shell運行:「ln -l ~/og -bd-poc/dirdat」,並檢查最後一個以「bb」作為前綴的文件的時間戳。

步驟11/12:將事務發布到Kafka
最後,我們將在GoldenGate中為BigData創建一個副本流程,以便在Kafka主題中發布泵出的業務事務。replicat將從trail日誌bb讀取事務中的插入、更新和刪除操作,並將它們轉換為JSON編碼的Kafka消息。
因此,創建一個名為eshop_kafkaconnect的文件。文件夾/home/oracle/ogg-bd- pocd /dirprm中的屬性包含以下內容:
# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kafkaconnect.properties# ———————————————————– # address/port of the Kafka brokerbootstrap.servers=localhost:9092acks=1 #JSON Converter Settingskey.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsevalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false #Adjust for performancebuffer.memory=33554432batch.size=16384linger.ms=0 # This property fix a start-up error as explained by Oracle Support here:# https://support.oracle.com/knowledge/Middleware/2455697_1.htmlconverter.type=key
在同一個文件夾中,創建一個名為eshop_kc的文件。具有以下內容的道具:
# File: /home/oracle/ogg-bd-poc/dirprm/eshop_kc.props# —————————————————gg.handlerlist=kafkaconnect #The handler propertiesgg.handler.kafkaconnect.type=kafkaconnectgg.handler.kafkaconnect.kafkaProducerConfigFile=eshop_kafkaconnect.propertiesgg.handler.kafkaconnect.mode=tx #The following selects the topic name based only on the schema namegg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName} #The following selects the message key using the concatenated primary keysgg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys} #The formatter propertiesgg.handler.kafkaconnect.messageFormatting=opgg.handler.kafkaconnect.insertOpKey=Igg.handler.kafkaconnect.updateOpKey=Ugg.handler.kafkaconnect.deleteOpKey=Dgg.handler.kafkaconnect.truncateOpKey=Tgg.handler.kafkaconnect.treatAllColumnsAsStrings=falsegg.handler.kafkaconnect.iso8601Format=falsegg.handler.kafkaconnect.pkUpdateHandling=abendgg.handler.kafkaconnect.includeTableName=truegg.handler.kafkaconnect.includeOpType=truegg.handler.kafkaconnect.includeOpTimestamp=truegg.handler.kafkaconnect.includeCurrentTimestamp=truegg.handler.kafkaconnect.includePosition=truegg.handler.kafkaconnect.includePrimaryKeys=truegg.handler.kafkaconnect.includeTokens=true goldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUE gg.log=log4jgg.log.level=INFO gg.report.time=30sec # Apache Kafka Classpath# Put the path of the "libs" folder inside the Kafka home pathgg.classpath=/home/oracle/kafka_2.11-2.1.1/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm
如果關閉,重啟大數據CLI的GoldenGate:
cd ~/ogg-bd-poc./ggsci
and start to create a replicat from the CLI with:
edit params repeshop
in vi put this content:
REPLICAT repeshopTARGETDB LIBFILE libggjava.so SET property=dirprm/eshop_kc.propsGROUPTRANSOPS 1000MAP orcl.eshop.*, TARGET orcl.eshop.*;
然後保存內容並退出vi。現在將replicat與trail log bb關聯,並使用以下命令啟動replicat進程,以便從GoldenGate啟動大數據CLI:
add replicat repeshop, exttrail ./dirdat/bbstart repeshop
Check that the replicat is live and kicking with one of these commands:
info repeshopview report repeshop
Now, connect to the ESHOP schema from another Linux shell:
sqlplus eshop/eshop@ORCL
and commit something:
INSERT INTO CUSTOMER_ORDER (ID, CODE, CREATED, STATUS, UPDATE_TIME)VALUES (CUSTOMER_ORDER_SEQ.NEXTVAL, 'AAAA03', SYSDATE, 'DELIVERED', SYSTIMESTAMP); INSERT INTO CUSTOMER_ORDER_ITEM (ID, ID_CUSTOMER_ORDER, DESCRIPTION, QUANTITY)VALUES (CUSTOMER_ORDER_ITEM_SEQ.NEXTVAL, CUSTOMER_ORDER_SEQ.CURRVAL, 'Cars 3', 2); COMMIT;
From the GoldenGate for Big Data CLI, check that the INSERT operation was counted for the replicat process by running:
stats repeshop
And (hurrah!) we can have a look inside Kafka, as the Linux shell checks that the topic named CDC-ESHOP was created:
cd ~/kafka_2.11-2.1.1/bin./kafka-topics.sh –list –zookeeper localhost:2181
and from the same folder run the following command for showing the CDC events stored in the topic:
./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic CDC-ESHOP –from-beginning
You should see something like:
[oracle@bigdatalite kafka_2.11-2.1.1]$ ./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic CDC-ESHOP –from-beginning {"table":"ORCL.ESHOP.CUSTOMER_ORDER","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.637000","pos":"00000000020000003830","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"CODE":"AAAA03","CREATED":"2019-05-31 04:24:34","STATUS":"DELIVERED","UPDATE_TIME":"2019-05-31 04:24:34.929950000"}}{"table":"ORCL.ESHOP.CUSTOMER_ORDER_ITEM","op_type":"I","op_ts":"2019-05-31 04:24:34.000327","current_ts":"2019-05-31 04:24:39.650000","pos":"00000000020000004074","primary_keys":["ID"],"tokens":{"txid":"9.32.6726","csn":"13906131"},"before":null,"after":{"ID":11.0,"ID_CUSTOMER_ORDER":11.0,"DESCRIPTION":"Cars 3","QUANTITY":2}}
For a better output, install jq:
sudo yum -y install jq./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic CDC-ESHOP –from-beginning | jq .
and here is how will appear the JSON events:
{ "table": "ORCL.ESHOP.CUSTOMER_ORDER", "op_type": "I", "op_ts": "2019-05-31 04:24:34.000327", "current_ts": "2019-05-31 04:24:39.637000", "pos": "00000000020000003830", "primary_keys": [ "ID" ], "tokens": { "txid": "9.32.6726", "csn": "13906131" }, "before": null, "after": { "ID": 11, "CODE": "AAAA03", "CREATED": "2019-05-31 04:24:34", "STATUS": "DELIVERED", "UPDATE_TIME": "2019-05-31 04:24:34.929950000" }}{ "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM", "op_type": "I", "op_ts": "2019-05-31 04:24:34.000327", "current_ts": "2019-05-31 04:24:39.650000", "pos": "00000000020000004074", "primary_keys": [ "ID" ], "tokens": { "txid": "9.32.6726", "csn": "13906131" }, "before": null, "after": { "ID": 11, "ID_CUSTOMER_ORDER": 11, "DESCRIPTION": "Cars 3", "QUANTITY": 2 }}
現在打開Kafka -console-consumer.sh進程,並在ESHOP上執行其他一些資料庫事務,以便實時列印發送給Kafka的CDC事件流。
以下是一些用於更新和刪除操作的JSON事件示例:
// Generated with: UPDATE CUSTOMER_ORDER SET STATUS='DELIVERED' WHERE ID=8; { "table": "ORCL.ESHOP.CUSTOMER_ORDER", "op_type": "U", "op_ts": "2019-05-31 06:22:07.000245", "current_ts": "2019-05-31 06:22:11.233000", "pos": "00000000020000004234", "primary_keys": [ "ID" ], "tokens": { "txid": "14.6.2656", "csn": "13913689" }, "before": { "ID": 8, "CODE": null, "CREATED": null, "STATUS": "SHIPPING", "UPDATE_TIME": null }, "after": { "ID": 8, "CODE": null, "CREATED": null, "STATUS": "DELIVERED", "UPDATE_TIME": null }} // Generated with: DELETE CUSTOMER_ORDER_ITEM WHERE ID=3;{ "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM", "op_type": "D", "op_ts": "2019-05-31 06:25:59.000916", "current_ts": "2019-05-31 06:26:04.910000", "pos": "00000000020000004432", "primary_keys": [ "ID" ], "tokens": { "txid": "14.24.2651", "csn": "13913846" }, "before": { "ID": 3, "ID_CUSTOMER_ORDER": 1, "DESCRIPTION": "Toy Story", "QUANTITY": 1 }, "after": null}
恭喜你!你完成了PoC:

步驟12/12:使用PoC
GoldenGate中提供的Kafka Connect處理程式有很多有用的選項,可以根據需要訂製集成。點擊這裡查看官方文件。
例如,您可以選擇為CDC流中涉及的每個表創建不同的主題,只需在eshop_kc.props中編輯此屬性:
gg.handler.kafkaconnect.topicMappingTemplate=CDC-${schemaName}-${tableName}
更改後重新啟動replicat,從GoldenGate for Big Data CLI:
stop repeshopstart repeshop
您可以在「~/og -bd-poc/AdapterExamples/big-data/kafka_connect」文件夾中找到其他配置示例。
結論
在本文中,我們通過GoldenGate技術在Oracle資料庫和Kafka代理之間創建了一個完整的集成。CDC事件流以Kafka實時發布。
為了簡單起見,我們使用了一個已經全部安裝的虛擬機,但是您可以在不同的主機上免費安裝用於大數據的GoldenGate和Kafka。
請在評論中告訴我您對這種集成的潛力(或限制)的看法。
原文:https://dzone.com/articles/creates-a-cdc-stream-from-oracle-database-to-kafka
本文:https://pub.intelligentx.net/node/839