大數據開發-Spark-Streaming處理數據到mysql
- 2021 年 2 月 28 日
- 筆記
前面一篇講到streamin讀取kafka數據加工處理後寫到kafka數據,大數據開發-Spark-開發Streaming處理數據 && 寫入Kafka是針對比如推薦領域,實時標籤等場景對於實時處理結果放到mysql也是一種常用方式,假設一些車輛調度的地理位置資訊處理後寫入到mysql
1.說明
數據表如下:
create database test;
use test;
DROP TABLE IF EXISTS car_gps;
CREATE TABLE IF NOT EXISTS car_gps(
deployNum VARCHAR(30) COMMENT '調度編號',
plateNum VARCHAR(10) COMMENT '車牌號',
timeStr VARCHAR(20) COMMENT '時間戳',
lng VARCHAR(20) COMMENT '經度',
lat VARCHAR(20) COMMENT '緯度',
dbtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '數據入庫時間',
PRIMARY KEY(deployNum, plateNum, timeStr))
2.編寫程式
首先引入mysql的驅動
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
2.1 jdbc寫入mysql
package com.hoult.Streaming.work
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import com.hoult.structed.bean.BusInfo
import org.apache.spark.sql.ForeachWriter
class JdbcHelper extends ForeachWriter[BusInfo] {
var conn: Connection = _
var statement: PreparedStatement = _
override def open(partitionId: Long, epochId: Long): Boolean = {
if (conn == null) {
conn = JdbcHelper.openConnection
}
true
}
override def process(value: BusInfo): Unit = {
//把數據寫入mysql表中
val arr: Array[String] = value.lglat.split("_")
val sql = "insert into car_gps(deployNum,plateNum,timeStr,lng,lat) values(?,?,?,?,?)"
statement = conn.prepareStatement(sql)
statement.setString(1, value.deployNum)
statement.setString(2, value.plateNum)
statement.setString(3, value.timeStr)
statement.setString(4, arr(0))
statement.setString(5, arr(1))
statement.executeUpdate()
}
override def close(errorOrNull: Throwable): Unit = {
if (null != conn) conn.close()
if (null != statement) statement.close()
}
}
object JdbcHelper {
var conn: Connection = _
val url = "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf8"
val username = "root"
val password = "123456"
def openConnection: Connection = {
if (null == conn || conn.isClosed) {
val p = new Properties
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, username, password)
}
conn
}
}
2.2 通過foreach來寫入mysql
package com.hoult.Streaming.work
import com.hoult.structed.bean.BusInfo
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
object KafkaToJdbc {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
//1 獲取sparksession
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName(KafkaToJdbc.getClass.getName)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
//2 定義讀取kafka數據源
val kafkaDf: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "linux121:9092")
.option("subscribe", "test_bus_info")
.load()
//3 處理數據
val kafkaValDf: DataFrame = kafkaDf.selectExpr("CAST(value AS STRING)")
//轉為ds
val kafkaDs: Dataset[String] = kafkaValDf.as[String]
//解析出經緯度數據,寫入redis
//封裝為一個case class方便後續獲取指定欄位的數據
val busInfoDs: Dataset[BusInfo] = kafkaDs.map(BusInfo(_)).filter(_ != null)
//將數據寫入MySQL表
busInfoDs.writeStream
.foreach(new JdbcHelper)
.outputMode("append")
.start()
.awaitTermination()
}
}
2.4 創建topic和從消費者端寫入數據
kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic test_bus_info --partitions 2 --replication-factor 1
kafka-console-producer.sh --broker-list linux121:9092 --topic test_bus_info
吳邪,小三爺,混跡於後台,大數據,人工智慧領域的小菜鳥。
更多請關注