快速學習-Flume高級之自定義MySQLSource

  • 2020 年 2 月 19 日
  • 筆記

第5章 Flume高級之自定義MySQLSource

5.1 自定義Source說明

Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類型、各種格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Source。

如:實時監控MySQL,從MySQL中獲取數據傳輸到HDFS或者其他存儲框架,所以此時需要我們自己實現MySQLSource。 官方也提供了自定義source的接口:

官網說明:https://flume.apache.org/FlumeDeveloperGuide.html#source

5.3 自定義MySQLSource組成

5.2 自定義MySQLSource步驟

根據官方說明自定義MySqlSource需要繼承AbstractSource類並實現Configurable和PollableSource接口。 實現相應方法: getBackOffSleepIncrement()//暫不用 getMaxBackOffSleepInterval()//暫不用 configure(Context context)//初始化context

process()//獲取數據(從MySql獲取數據,業務處理比較複雜,所以我們定義一個專門的類——SQLSourceHelper來處理跟MySql的交互),封裝成Event並寫入Channel,這個方法被循環調用stop()//關閉相關的資源

5.4 代碼實現

5.4.1 導入Pom依賴

<dependencies>      <dependency>          <groupId>org.apache.flume</groupId>          <artifactId>flume-ng-core</artifactId>          <version>1.7.0</version>      </dependency>      <dependency>          <groupId>mysql</groupId>          <artifactId>mysql-connector-java</artifactId>          <version>5.1.27</version>      </dependency>  </dependencies>

5.4.2 添加配置信息

在ClassPath下添加jdbc.properties和log4j. properties jdbc.properties:

dbDriver=com.mysql.jdbc.Driver  dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8  dbUser=root  dbPassword=000000

log4j. properties:

#--------console-----------  log4j.rootLogger=info,myconsole,myfile  log4j.appender.myconsole=org.apache.log4j.ConsoleAppender  log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout  #log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n    #log4j.rootLogger=error,myfile  log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender  log4j.appender.myfile.File=/tmp/flume.log  log4j.appender.myfile.layout=org.apache.log4j.PatternLayout  log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

5.4.3 SQLSourceHelper

  1. 屬性說明:
  1. 方法說明:
  1. 代碼分析
  1. 代碼實現:
public class SQLSourceHelper {        private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);        private int runQueryDelay, //兩次查詢的時間間隔              startFrom,            //開始id              currentIndex,	        //當前id              recordSixe = 0,      //每次查詢返回結果的條數              maxRow;                //每次查詢的最大條數        private String table,       //要操作的表              columnsToSelect,     //用戶傳入的查詢的列              customQuery,          //用戶傳入的查詢語句              query,                 //構建的查詢語句              defaultCharsetResultSet;//編碼集        //上下文,用來獲取配置文件      private Context context;        //為定義的變量賦值(默認值),可在flume任務的配置文件中修改      private static final int DEFAULT_QUERY_DELAY = 10000;      private static final int DEFAULT_START_VALUE = 0;      private static final int DEFAULT_MAX_ROWS = 2000;      private static final String DEFAULT_COLUMNS_SELECT = "*";      private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";        private static Connection conn = null;      private static PreparedStatement ps = null;      private static String connectionURL, connectionUserName, connectionPassword;        //加載靜態資源  static {            Properties p = new Properties();            try {              p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));              connectionURL = p.getProperty("dbUrl");              connectionUserName = p.getProperty("dbUser");              connectionPassword = p.getProperty("dbPassword");              Class.forName(p.getProperty("dbDriver"));            } catch (IOException | ClassNotFoundException e) {              LOG.error(e.toString());          }      }        //獲取JDBC連接      private static Connection InitConnection(String url, String user, String pw) {          try {                Connection conn = DriverManager.getConnection(url, user, pw);                if (conn == null)                  throw new SQLException();                return conn;            } catch (SQLException e) {              e.printStackTrace();          }            return null;      }        //構造方法  SQLSourceHelper(Context context) throws ParseException {            //初始化上下文          this.context = context;            //有默認值參數:獲取flume任務配置文件中的參數,讀不到的採用默認值          this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);            this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);            this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);            this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);            //無默認值參數:獲取flume任務配置文件中的參數          this.table = context.getString("table");          this.customQuery = context.getString("custom.query");            connectionURL = context.getString("connection.url");            connectionUserName = context.getString("connection.user");            connectionPassword = context.getString("connection.password");            conn = InitConnection(connectionURL, connectionUserName, connectionPassword);            //校驗相應的配置信息,如果沒有默認值的參數也沒賦值,拋出異常          checkMandatoryProperties();            //獲取當前的id          currentIndex = getStatusDBIndex(startFrom);            //構建查詢語句          query = buildQuery();      }        //校驗相應的配置信息(表,查詢語句以及數據庫連接的參數)  private void checkMandatoryProperties() {            if (table == null) {              throw new ConfigurationException("property table not set");          }            if (connectionURL == null) {              throw new ConfigurationException("connection.url property not set");          }            if (connectionUserName == null) {              throw new ConfigurationException("connection.user property not set");          }            if (connectionPassword == null) {              throw new ConfigurationException("connection.password property not set");          }      }        //構建sql語句  private String buildQuery() {            String sql = "";            //獲取當前id          currentIndex = getStatusDBIndex(startFrom);          LOG.info(currentIndex + "");            if (customQuery == null) {              sql = "SELECT " + columnsToSelect + " FROM " + table;          } else {              sql = customQuery;          }            StringBuilder execSql = new StringBuilder(sql);            //以id作為offset          if (!sql.contains("where")) {              execSql.append(" where ");              execSql.append("id").append(">").append(currentIndex);                return execSql.toString();          } else {              int length = execSql.toString().length();                return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;          }      }        //執行查詢  List<List<Object>> executeQuery() {            try {              //每次執行查詢時都要重新生成sql,因為id不同              customQuery = buildQuery();                //存放結果的集合              List<List<Object>> results = new ArrayList<>();                if (ps == null) {                  //                  ps = conn.prepareStatement(customQuery);              }                ResultSet result = ps.executeQuery(customQuery);                while (result.next()) {                    //存放一條數據的集合(多個列)                  List<Object> row = new ArrayList<>();                    //將返回結果放入集合                  for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {                      row.add(result.getObject(i));                  }                    results.add(row);              }                LOG.info("execSql:" + customQuery + "nresultSize:" + results.size());                return results;          } catch (SQLException e) {              LOG.error(e.toString());                // 重新連接              conn = InitConnection(connectionURL, connectionUserName, connectionPassword);            }            return null;      }        //將結果集轉化為字符串,每一條數據是一個list集合,將每一個小的list集合轉化為字符串  List<String> getAllRows(List<List<Object>> queryResult) {            List<String> allRows = new ArrayList<>();            if (queryResult == null || queryResult.isEmpty())              return allRows;            StringBuilder row = new StringBuilder();            for (List<Object> rawRow : queryResult) {                Object value = null;                for (Object aRawRow : rawRow) {                    value = aRawRow;                    if (value == null) {                      row.append(",");                  } else {                      row.append(aRawRow.toString()).append(",");                  }              }                allRows.add(row.toString());              row = new StringBuilder();          }            return allRows;      }        //更新offset元數據狀態,每次返回結果集後調用。必須記錄每次查詢的offset值,為程序中斷續跑數據時使用,以id為offset      void updateOffset2DB(int size) {          //以source_tab做為KEY,如果不存在則插入,存在則更新(每個源表對應一條記錄)          String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"                  + this.table                  + "','" + (recordSixe += size)                  + "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";            LOG.info("updateStatus Sql:" + sql);            execSql(sql);      }        //執行sql語句  private void execSql(String sql) {            try {              ps = conn.prepareStatement(sql);                LOG.info("exec::" + sql);                ps.execute();          } catch (SQLException e) {              e.printStackTrace();          }      }        //獲取當前id的offset  private Integer getStatusDBIndex(int startFrom) {            //從flume_meta表中查詢出當前的id是多少          String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");            if (dbIndex != null) {              return Integer.parseInt(dbIndex);          }            //如果沒有數據,則說明是第一次查詢或者數據表中還沒有存入數據,返回最初傳入的值          return startFrom;      }        //查詢一條數據的執行語句(當前id)  private String queryOne(String sql) {            ResultSet result = null;            try {              ps = conn.prepareStatement(sql);              result = ps.executeQuery();                while (result.next()) {                  return result.getString(1);              }          } catch (SQLException e) {              e.printStackTrace();          }            return null;      }        //關閉相關資源  void close() {            try {              ps.close();              conn.close();          } catch (SQLException e) {              e.printStackTrace();          }      }        int getCurrentIndex() {          return currentIndex;      }        void setCurrentIndex(int newValue) {          currentIndex = newValue;      }        int getRunQueryDelay() {          return runQueryDelay;      }        String getQuery() {          return query;      }        String getConnectionURL() {          return connectionURL;      }        private boolean isCustomQuerySet() {          return (customQuery != null);      }        Context getContext() {          return context;      }        public String getConnectionUserName() {          return connectionUserName;      }        public String getConnectionPassword() {          return connectionPassword;      }        String getDefaultCharsetResultSet() {          return defaultCharsetResultSet;      }  }

5.4.4 MySQLSource

代碼實現:

public class SQLSource extends AbstractSource implements Configurable, PollableSource {        //打印日誌  private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);        //定義sqlHelper      private SQLSourceHelper sqlSourceHelper;          @Override      public long getBackOffSleepIncrement() {          return 0;      }        @Override      public long getMaxBackOffSleepInterval() {          return 0;      }        @Override  public void configure(Context context) {            try {              //初始化              sqlSourceHelper = new SQLSourceHelper(context);          } catch (ParseException e) {              e.printStackTrace();          }      }        @Override  public Status process() throws EventDeliveryException {            try {              //查詢數據表              List<List<Object>> result = sqlSourceHelper.executeQuery();                //存放event的集合              List<Event> events = new ArrayList<>();                //存放event頭集合              HashMap<String, String> header = new HashMap<>();                //如果有返回數據,則將數據封裝為event              if (!result.isEmpty()) {                    List<String> allRows = sqlSourceHelper.getAllRows(result);                    Event event = null;                    for (String row : allRows) {                      event = new SimpleEvent();                      event.setBody(row.getBytes());                      event.setHeaders(header);                      events.add(event);                  }                    //將event寫入channel                  this.getChannelProcessor().processEventBatch(events);                    //更新數據表中的offset信息                  sqlSourceHelper.updateOffset2DB(result.size());              }                //等待時長              Thread.sleep(sqlSourceHelper.getRunQueryDelay());                return Status.READY;          } catch (InterruptedException e) {              LOG.error("Error procesing row", e);                return Status.BACKOFF;          }      }        @Override  public synchronized void stop() {            LOG.info("Stopping sql source {} ...", getName());            try {              //關閉資源              sqlSourceHelper.close();          } finally {              super.stop();          }      }  }

5.5 測試

5.5.1 Jar包準備

  1. 將MySql驅動包放入Flume的lib目錄下
[atguigu@hadoop102 flume]$ cp   /opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar   /opt/module/flume/lib/
  1. 打包項目並將Jar包放入Flume的lib目錄下

5.5.2 配置文件準備

  1. 創建配置文件並打開
[atguigu@hadoop102 job]$ touch mysql.conf  [atguigu@hadoop102 job]$ vim mysql.conf 
  1. 添加如下內容
# Name the components on this agent  a1.sources = r1  a1.sinks = k1  a1.channels = c1    # Describe/configure the source  a1.sources.r1.type = com.atguigu.source.SQLSource  a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource  a1.sources.r1.connection.user = root  a1.sources.r1.connection.password = 000000  a1.sources.r1.table = student  a1.sources.r1.columns.to.select = *  #a1.sources.r1.incremental.column.name = id  #a1.sources.r1.incremental.value = 0  a1.sources.r1.run.query.delay=5000    # Describe the sink  a1.sinks.k1.type = logger    # Describe the channel  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinks.k1.channel = c1

5.5.3 MySql表準備

  1. 創建MySqlSource數據庫
CREATE DATABASE mysqlsource;
  1. 在MySqlSource數據庫下創建數據表Student和元數據表Flume_meta
CREATE TABLE `student` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `name` varchar(255) NOT NULL,  PRIMARY KEY (`id`)  );  CREATE TABLE `flume_meta` (  `source_tab` varchar(255) NOT NULL,  `currentIndex` varchar(255) NOT NULL,  PRIMARY KEY (`source_tab`)  );
  1. 向數據表中添加數據
1 zhangsan  2 lisi  3 wangwu  4 zhaoliu

5.5.4 測試並查看結果

  1. 任務執行
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1   --conf-file job/mysql.conf -Dflume.root.logger=INFO,console
  1. 結果展示,如圖6-2所示: