快速学习-Flume高级之自定义MySQLSource

  • 2020 年 2 月 19 日
  • 笔记

第5章 Flume高级之自定义MySQLSource

5.1 自定义Source说明

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source。

如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。 官方也提供了自定义source的接口:

官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source

5.3 自定义MySQLSource组成

5.2 自定义MySQLSource步骤

根据官方说明自定义MySqlSource需要继承AbstractSource类并实现Configurable和PollableSource接口。 实现相应方法: getBackOffSleepIncrement()//暂不用 getMaxBackOffSleepInterval()//暂不用 configure(Context context)//初始化context

process()//获取数据(从MySql获取数据,业务处理比较复杂,所以我们定义一个专门的类——SQLSourceHelper来处理跟MySql的交互),封装成Event并写入Channel,这个方法被循环调用stop()//关闭相关的资源

5.4 代码实现

5.4.1 导入Pom依赖

<dependencies>      <dependency>          <groupId>org.apache.flume</groupId>          <artifactId>flume-ng-core</artifactId>          <version>1.7.0</version>      </dependency>      <dependency>          <groupId>mysql</groupId>          <artifactId>mysql-connector-java</artifactId>          <version>5.1.27</version>      </dependency>  </dependencies>

5.4.2 添加配置信息

在ClassPath下添加jdbc.properties和log4j. properties jdbc.properties:

dbDriver=com.mysql.jdbc.Driver  dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8  dbUser=root  dbPassword=000000

log4j. properties:

#--------console-----------  log4j.rootLogger=info,myconsole,myfile  log4j.appender.myconsole=org.apache.log4j.ConsoleAppender  log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout  #log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n    #log4j.rootLogger=error,myfile  log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender  log4j.appender.myfile.File=/tmp/flume.log  log4j.appender.myfile.layout=org.apache.log4j.PatternLayout  log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

5.4.3 SQLSourceHelper

  1. 属性说明:
  1. 方法说明:
  1. 代码分析
  1. 代码实现:
public class SQLSourceHelper {        private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);        private int runQueryDelay, //两次查询的时间间隔              startFrom,            //开始id              currentIndex,	        //当前id              recordSixe = 0,      //每次查询返回结果的条数              maxRow;                //每次查询的最大条数        private String table,       //要操作的表              columnsToSelect,     //用户传入的查询的列              customQuery,          //用户传入的查询语句              query,                 //构建的查询语句              defaultCharsetResultSet;//编码集        //上下文,用来获取配置文件      private Context context;        //为定义的变量赋值(默认值),可在flume任务的配置文件中修改      private static final int DEFAULT_QUERY_DELAY = 10000;      private static final int DEFAULT_START_VALUE = 0;      private static final int DEFAULT_MAX_ROWS = 2000;      private static final String DEFAULT_COLUMNS_SELECT = "*";      private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";        private static Connection conn = null;      private static PreparedStatement ps = null;      private static String connectionURL, connectionUserName, connectionPassword;        //加载静态资源  static {            Properties p = new Properties();            try {              p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));              connectionURL = p.getProperty("dbUrl");              connectionUserName = p.getProperty("dbUser");              connectionPassword = p.getProperty("dbPassword");              Class.forName(p.getProperty("dbDriver"));            } catch (IOException | ClassNotFoundException e) {              LOG.error(e.toString());          }      }        //获取JDBC连接      private static Connection InitConnection(String url, String user, String pw) {          try {                Connection conn = DriverManager.getConnection(url, user, pw);                if (conn == null)                  throw new SQLException();                return conn;            } catch (SQLException e) {              e.printStackTrace();          }            return null;      }        //构造方法  SQLSourceHelper(Context context) throws ParseException {            //初始化上下文          this.context = context;            //有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值          this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);            this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);            this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);            this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);            //无默认值参数:获取flume任务配置文件中的参数          this.table = context.getString("table");          this.customQuery = context.getString("custom.query");            connectionURL = context.getString("connection.url");            connectionUserName = context.getString("connection.user");            connectionPassword = context.getString("connection.password");            conn = InitConnection(connectionURL, connectionUserName, connectionPassword);            //校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常          checkMandatoryProperties();            //获取当前的id          currentIndex = getStatusDBIndex(startFrom);            //构建查询语句          query = buildQuery();      }        //校验相应的配置信息(表,查询语句以及数据库连接的参数)  private void checkMandatoryProperties() {            if (table == null) {              throw new ConfigurationException("property table not set");          }            if (connectionURL == null) {              throw new ConfigurationException("connection.url property not set");          }            if (connectionUserName == null) {              throw new ConfigurationException("connection.user property not set");          }            if (connectionPassword == null) {              throw new ConfigurationException("connection.password property not set");          }      }        //构建sql语句  private String buildQuery() {            String sql = "";            //获取当前id          currentIndex = getStatusDBIndex(startFrom);          LOG.info(currentIndex + "");            if (customQuery == null) {              sql = "SELECT " + columnsToSelect + " FROM " + table;          } else {              sql = customQuery;          }            StringBuilder execSql = new StringBuilder(sql);            //以id作为offset          if (!sql.contains("where")) {              execSql.append(" where ");              execSql.append("id").append(">").append(currentIndex);                return execSql.toString();          } else {              int length = execSql.toString().length();                return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;          }      }        //执行查询  List<List<Object>> executeQuery() {            try {              //每次执行查询时都要重新生成sql,因为id不同              customQuery = buildQuery();                //存放结果的集合              List<List<Object>> results = new ArrayList<>();                if (ps == null) {                  //                  ps = conn.prepareStatement(customQuery);              }                ResultSet result = ps.executeQuery(customQuery);                while (result.next()) {                    //存放一条数据的集合(多个列)                  List<Object> row = new ArrayList<>();                    //将返回结果放入集合                  for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {                      row.add(result.getObject(i));                  }                    results.add(row);              }                LOG.info("execSql:" + customQuery + "nresultSize:" + results.size());                return results;          } catch (SQLException e) {              LOG.error(e.toString());                // 重新连接              conn = InitConnection(connectionURL, connectionUserName, connectionPassword);            }            return null;      }        //将结果集转化为字符串,每一条数据是一个list集合,将每一个小的list集合转化为字符串  List<String> getAllRows(List<List<Object>> queryResult) {            List<String> allRows = new ArrayList<>();            if (queryResult == null || queryResult.isEmpty())              return allRows;            StringBuilder row = new StringBuilder();            for (List<Object> rawRow : queryResult) {                Object value = null;                for (Object aRawRow : rawRow) {                    value = aRawRow;                    if (value == null) {                      row.append(",");                  } else {                      row.append(aRawRow.toString()).append(",");                  }              }                allRows.add(row.toString());              row = new StringBuilder();          }            return allRows;      }        //更新offset元数据状态,每次返回结果集后调用。必须记录每次查询的offset值,为程序中断续跑数据时使用,以id为offset      void updateOffset2DB(int size) {          //以source_tab做为KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)          String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"                  + this.table                  + "','" + (recordSixe += size)                  + "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";            LOG.info("updateStatus Sql:" + sql);            execSql(sql);      }        //执行sql语句  private void execSql(String sql) {            try {              ps = conn.prepareStatement(sql);                LOG.info("exec::" + sql);                ps.execute();          } catch (SQLException e) {              e.printStackTrace();          }      }        //获取当前id的offset  private Integer getStatusDBIndex(int startFrom) {            //从flume_meta表中查询出当前的id是多少          String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");            if (dbIndex != null) {              return Integer.parseInt(dbIndex);          }            //如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值          return startFrom;      }        //查询一条数据的执行语句(当前id)  private String queryOne(String sql) {            ResultSet result = null;            try {              ps = conn.prepareStatement(sql);              result = ps.executeQuery();                while (result.next()) {                  return result.getString(1);              }          } catch (SQLException e) {              e.printStackTrace();          }            return null;      }        //关闭相关资源  void close() {            try {              ps.close();              conn.close();          } catch (SQLException e) {              e.printStackTrace();          }      }        int getCurrentIndex() {          return currentIndex;      }        void setCurrentIndex(int newValue) {          currentIndex = newValue;      }        int getRunQueryDelay() {          return runQueryDelay;      }        String getQuery() {          return query;      }        String getConnectionURL() {          return connectionURL;      }        private boolean isCustomQuerySet() {          return (customQuery != null);      }        Context getContext() {          return context;      }        public String getConnectionUserName() {          return connectionUserName;      }        public String getConnectionPassword() {          return connectionPassword;      }        String getDefaultCharsetResultSet() {          return defaultCharsetResultSet;      }  }

5.4.4 MySQLSource

代码实现:

public class SQLSource extends AbstractSource implements Configurable, PollableSource {        //打印日志  private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);        //定义sqlHelper      private SQLSourceHelper sqlSourceHelper;          @Override      public long getBackOffSleepIncrement() {          return 0;      }        @Override      public long getMaxBackOffSleepInterval() {          return 0;      }        @Override  public void configure(Context context) {            try {              //初始化              sqlSourceHelper = new SQLSourceHelper(context);          } catch (ParseException e) {              e.printStackTrace();          }      }        @Override  public Status process() throws EventDeliveryException {            try {              //查询数据表              List<List<Object>> result = sqlSourceHelper.executeQuery();                //存放event的集合              List<Event> events = new ArrayList<>();                //存放event头集合              HashMap<String, String> header = new HashMap<>();                //如果有返回数据,则将数据封装为event              if (!result.isEmpty()) {                    List<String> allRows = sqlSourceHelper.getAllRows(result);                    Event event = null;                    for (String row : allRows) {                      event = new SimpleEvent();                      event.setBody(row.getBytes());                      event.setHeaders(header);                      events.add(event);                  }                    //将event写入channel                  this.getChannelProcessor().processEventBatch(events);                    //更新数据表中的offset信息                  sqlSourceHelper.updateOffset2DB(result.size());              }                //等待时长              Thread.sleep(sqlSourceHelper.getRunQueryDelay());                return Status.READY;          } catch (InterruptedException e) {              LOG.error("Error procesing row", e);                return Status.BACKOFF;          }      }        @Override  public synchronized void stop() {            LOG.info("Stopping sql source {} ...", getName());            try {              //关闭资源              sqlSourceHelper.close();          } finally {              super.stop();          }      }  }

5.5 测试

5.5.1 Jar包准备

  1. 将MySql驱动包放入Flume的lib目录下
[atguigu@hadoop102 flume]$ cp   /opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar   /opt/module/flume/lib/
  1. 打包项目并将Jar包放入Flume的lib目录下

5.5.2 配置文件准备

  1. 创建配置文件并打开
[atguigu@hadoop102 job]$ touch mysql.conf  [atguigu@hadoop102 job]$ vim mysql.conf 
  1. 添加如下内容
# Name the components on this agent  a1.sources = r1  a1.sinks = k1  a1.channels = c1    # Describe/configure the source  a1.sources.r1.type = com.atguigu.source.SQLSource  a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource  a1.sources.r1.connection.user = root  a1.sources.r1.connection.password = 000000  a1.sources.r1.table = student  a1.sources.r1.columns.to.select = *  #a1.sources.r1.incremental.column.name = id  #a1.sources.r1.incremental.value = 0  a1.sources.r1.run.query.delay=5000    # Describe the sink  a1.sinks.k1.type = logger    # Describe the channel  a1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100    # Bind the source and sink to the channel  a1.sources.r1.channels = c1  a1.sinks.k1.channel = c1

5.5.3 MySql表准备

  1. 创建MySqlSource数据库
CREATE DATABASE mysqlsource;
  1. 在MySqlSource数据库下创建数据表Student和元数据表Flume_meta
CREATE TABLE `student` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `name` varchar(255) NOT NULL,  PRIMARY KEY (`id`)  );  CREATE TABLE `flume_meta` (  `source_tab` varchar(255) NOT NULL,  `currentIndex` varchar(255) NOT NULL,  PRIMARY KEY (`source_tab`)  );
  1. 向数据表中添加数据
1 zhangsan  2 lisi  3 wangwu  4 zhaoliu

5.5.4 测试并查看结果

  1. 任务执行
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1   --conf-file job/mysql.conf -Dflume.root.logger=INFO,console
  1. 结果展示,如图6-2所示: