一、概述
Hudi(Hadoop Upserts Deletes and Incrementals),簡稱Hudi,是一個流式資料湖平臺
,支援對海量資料快速更新,內建表格式,支援事務的儲存層、 一系列表服務、資料服務(開箱即用的攝取工具)以及完善的運維監控工具,它可以以極低的延遲將資料快速儲存到HDFS或雲端儲存(S3)的工具,最主要的特點支援記錄級別的插入更新(Upsert)和刪除,同時還支援增量查詢。
GitHub地址:https://github。com/apache/hudi
官方文件:https://hudi。apache。org/cn/docs/overview
關於Apache Hudi 資料湖 也可以參考我這篇文章:大資料Hadoop之——新一代流式資料湖平臺 Apache Hudi
二、Hudi CLI
構建hudi後,可以透過cd hudi cli&&
。/hudi-cli。sh
啟動shell。一個hudi表駐留在DFS上的一個稱為basePath的位置,我們需要這個位置才能連線到hudi表。Hudi庫有效地在內部管理此表,使用。hoodie子資料夾跟蹤所有元資料。
編譯生成的包如下:
# 啟動。/hudi-cli/hudi-cli。sh
三、Spark 與 Hudi 整合使用
Hudi 流式資料湖平臺,協助管理資料,藉助HDFS檔案系統儲存資料,使用Spark操作資料。
Hadoop 安裝可參考我這篇文章:大資料Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce)
Hadoop HA安裝可參考我這篇文章:大資料Hadoop之——Hadoop 3。3。4 HA(高可用)原理與實現(QJM)
Spark 環境配置可以參考我這篇文章:大資料Hadoop之——計算引擎Spark
1)Spark 測試
cd $SPARK_HOMEhdfs dfs -mkdir /tmp/hdfs dfs -put README。md /tmp/hdfs dfs -text /tmp/README。md# 啟動spark-shell。/bin/spark-shell ——master local[2]val datasRDD = sc。textFile(“/tmp/README。md”)# 行數datasRDD。count()# 讀取第一行資料datasRDD。first()val dataframe = spark。read。textFile(“/tmp/README。md”)dataframe。printSchemadataframe。show(10,false)
2)Spark 與 Hudi 整合使用
官方示例:https://hudi。apache。org/docs/quick-start-guide/
在spark-shell命令列,對Hudi表資料進行操作,需要執行spark-shell命令是,新增相關的依賴包,命令如下:
1、啟動spark-shell
【第一種方式】線上聯網下載相關jar包
### 啟動spark-shell,使用spark-shell操作hudi資料湖### 第一種方式。/bin/spark-shell \ ——master local[2] \ ——packages org。apache。hudi:hudi-spark3。2-bundle_2。12:0。12。0 \ ——conf ‘spark。serializer=org。apache。spark。serializer。KryoSerializer’ \ ——conf ‘spark。sql。catalog。spark_catalog=org。apache。spark。sql。hudi。catalog。HoodieCatalog’ \ ——conf ‘spark。sql。extensions=org。apache。spark。sql。hudi。HoodieSparkSessionExtension’### 上述命令需要聯網,基於ivy下載下載相關jar包到本地,然後載入到CLASSPATH,其中包含三個jar包。
【第二種方式】離線使用已經下載好的jar包
### 第二種方式,使用——jarscd /opt/apachewget https://repo1。maven。org/maven2/org/apache/spark/spark-avro_2。12/3。3。0/spark-avro_2。12-3。3。0。jarcd $SPARK_HOME。/bin/spark-shell \——master local[2] \——jars /opt/apache/hudi-0。12。0/packaging/hudi-spark-bundle/target/hudi-spark3。2-bundle_2。12-0。12。0。jar,/opt/apache/hudi-0。12。0/hudi-examples/hudi-examples-spark/target/lib/unused-1。0。0。jar,/opt/apache/spark-avro_2。12-3。3。0。jar \——conf ‘spark。sql。catalog。spark_catalog=org。apache。spark。sql。hudi。catalog。HoodieCatalog’ \——conf “spark。serializer=org。apache。spark。serializer。KryoSerializer”
2、匯入park及Hudi相關包
import org。apache。hudi。QuickstartUtils。_import scala。collection。JavaConversions。_import org。apache。spark。sql。SaveMode。_import org。apache。hudi。DataSourceReadOptions。_import org。apache。hudi。DataSourceWriteOptions。_import org。apache。hudi。config。HoodieWriteConfig。_import org。apache。hudi。common。model。HoodieRecord
3、定義變數
val tableName = “hudi_trips_cow”# 儲存到HDFSval basePath = “hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow”# 儲存到本地# val basePath = “file:///tmp/hudi_trips_cow”
4、模擬生成Trip乘車資料
##構建DataGenerator物件,用於模擬生成10條Trip乘車資料val dataGen = new DataGenerator val inserts = convertToStringList(dataGen。generateInserts(10))
其中,DataGenerator可以用於生成測試資料,用來完成後續操作。
5、將模擬資料List轉換為DataFrame資料集
##轉成dfval df = spark。read。json(spark。sparkContext。parallelize(inserts,2))##檢視資料結構df。printSchema()##檢視資料df。show()# 指定欄位查詢df。select(“rider”,“begin_lat”,“begin_lon”,“driver”,“end_lat”,“end_lon”,“fare”,“partitionpath”,“ts”,“uuid”)。show(10,truncate=false)
6、將資料寫入到hudi
# 將資料儲存到hudi表中,由於Hudi誕生時基於Spark框架,所以SparkSQL支援Hudi資料來源,直接透過format指定資料來源Source,設定相關屬性儲存資料即可,注意,hudi不是正真儲存資料,而是管理資料。df。write。format(“hudi”)。 options(getQuickstartWriteConfigs)。 option(PRECOMBINE_FIELD_OPT_KEY, “ts”)。 option(RECORDKEY_FIELD_OPT_KEY, “uuid”)。 option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”)。 option(TABLE_NAME, tableName)。 mode(Overwrite)。 save(basePath)## 重要引數說明#引數:getQuickstartWriteConfigs,設定寫入/更新資料至Hudi時,Shuffle時分割槽數目#引數:PRECOMBINE_FIELD_OPT_KEY,資料合併時,依據主鍵欄位#引數:RECORDKEY_FIELD_OPT_KEY,每條記錄的唯一id,支援多個欄位#引數:PARTITIONPATH_FIELD_OPT_KEY,用於存放資料的分割槽欄位
本地儲存
HDFS 儲存
四、Flink 與 Hudi 整合使用
官方示例:https://hudi。apache。org/docs/flink-quick-start-guide
1)啟動flink叢集
下載地址:http://flink。apache。org/downloads。html
### 1、下載軟體包wget https://dlcdn。apache。org/flink/flink-1。14。6/flink-1。14。6-bin-scala_2。12。tgztar -xf flink-1。14。6-bin-scala_2。12。tgzexport FLINK_HOME=/opt/apache/flink-1。14。6### 2、設定HADOOP_CLASSPATH# HADOOP_HOME is your hadoop root directory after unpack the binary package。export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`export HADOOP_CONF_DIR=‘/opt/apache/hadoop/etc/hadoop’### 3、啟動單節點flink 叢集# Start the Flink standalone cluster,這裡先修改slot數量,預設是1,這裡改成4# taskmanager。numberOfTaskSlots: 4cd $FLINK_HOME。/bin/start-cluster。sh# 測試可用性。/bin/flink run examples/batch/WordCount。jar
2) 啟動flink SQL 客戶端
# 【第一種方式】指定jar包。/bin/sql-client。sh embedded -j 。。/hudi-0。12。0/packaging/hudi-flink-bundle/target/hudi-flink1。14-bundle-0。12。0。jar shell# 【第二種方式】還可以將jar包放在$FINK_HOME/lib目錄下。/bin/sql-client。sh embedded shell
3)新增資料
—— sets up the result mode to tableau to show the results directly in the CLISET ‘sql-client。execution。result-mode’ = ‘tableau’;CREATE TABLE t1( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( ‘connector’ = ‘hudi’, ‘path’ = ‘hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1’, ‘table。type’ = ‘MERGE_ON_READ’ —— this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE);INSERT INTO t1 VALUES (‘id1’,‘Danny’,23,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’);—— insert data using valuesINSERT INTO t1 VALUES (‘id1’,‘Danny’,23,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’), (‘id2’,‘Stephen’,33,TIMESTAMP ‘1970-01-01 00:00:02’,‘par1’), (‘id3’,‘Julian’,53,TIMESTAMP ‘1970-01-01 00:00:03’,‘par2’), (‘id4’,‘Fabian’,31,TIMESTAMP ‘1970-01-01 00:00:04’,‘par2’), (‘id5’,‘Sophia’,18,TIMESTAMP ‘1970-01-01 00:00:05’,‘par3’), (‘id6’,‘Emma’,20,TIMESTAMP ‘1970-01-01 00:00:06’,‘par3’), (‘id7’,‘Bob’,44,TIMESTAMP ‘1970-01-01 00:00:07’,‘par4’), (‘id8’,‘Han’,56,TIMESTAMP ‘1970-01-01 00:00:08’,‘par4’);
HDFS上檢視
4)查詢資料(批式查詢)
select * from t1;
5)更新資料
—— this would update the record with key ‘id1’insert into t1 values (‘id1’,‘Danny’,27,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’);
6)Streaming Query(流式查詢)
首先建立表t2,設定相關屬性,以
流的方式查詢讀取,對映到上面表:t1
read。streaming。enabled
設定為true,表明透過streaming的方式讀取表資料;
read。streaming。check-interval
指定了source監控新的commits的間隔時間4s
table。type
設定表型別為 MERGE_ON_READ
CREATE TABLE t2( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( ‘connector’ = ‘hudi’, ‘path’ = ‘hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1’, ‘table。type’ = ‘MERGE_ON_READ’, ‘read。streaming。enabled’ = ‘true’, —— this option enable the streaming read ‘read。start-commit’ = ‘20210316134557’, —— specifies the start commit instant time ‘read。streaming。check-interval’ = ‘4’ —— specifies the check interval for finding new source commits, default 60s。);—— Then query the table in stream modeselect * from t2;
注意:檢視可能會遇到如下錯誤:
[ERROR] Could not execute SQL statement。 Reason: java。lang。ClassNotFoundException: org。apache。hadoop。hive。ql。io。parquet。MapredParquetInputFormat
【解決】新增
hadoop-mapreduce-client-core-xxx。jar
和
hive-exec-xxx。jar
到Flink lib中。
cp /opt/apache/hadoop-3。3。2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3。3。2。jar $FLINK_HOME/libcp 。/hudi-0。12。0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2。3。1-core。jar $FLINK_HOME/lib
Hive 與 Hudi的整合,小夥伴可以先看官網文件:https://hudi。apache。org/docs/syncing_metastore/#flink-setup
Spark 和 Hudi整合,Flink 與 Hudi整合先到這裡了,還有很多其它大資料元件與Hudi的整合示例講解會放在後面文章講解,請小夥伴耐心等待,有任何疑問歡迎留言,會持續更新【大資料+雲原生】相關的文章~