大資料Hadoop之——Apache Hudi 資料湖實戰操作

一、概述

Hudi(Hadoop Upserts Deletes and Incrementals),簡稱Hudi,是一個流式資料湖平臺

,支援對海量資料快速更新,內建表格式,支援事務的儲存層、 一系列表服務、資料服務(開箱即用的攝取工具)以及完善的運維監控工具,它可以以極低的延遲將資料快速儲存到HDFS或雲端儲存(S3)的工具,最主要的特點支援記錄級別的插入更新(Upsert)和刪除,同時還支援增量查詢。

GitHub地址:https://github。com/apache/hudi

官方文件:https://hudi。apache。org/cn/docs/overview

關於Apache Hudi 資料湖 也可以參考我這篇文章:大資料Hadoop之——新一代流式資料湖平臺 Apache Hudi

大資料Hadoop之——Apache Hudi 資料湖實戰操作

二、Hudi CLI

構建hudi後,可以透過cd hudi cli&&

。/hudi-cli。sh

啟動shell。一個hudi表駐留在DFS上的一個稱為basePath的位置,我們需要這個位置才能連線到hudi表。Hudi庫有效地在內部管理此表,使用。hoodie子資料夾跟蹤所有元資料。

編譯生成的包如下:

大資料Hadoop之——Apache Hudi 資料湖實戰操作

# 啟動。/hudi-cli/hudi-cli。sh

大資料Hadoop之——Apache Hudi 資料湖實戰操作

三、Spark 與 Hudi 整合使用

Hudi 流式資料湖平臺,協助管理資料,藉助HDFS檔案系統儲存資料,使用Spark操作資料。

大資料Hadoop之——Apache Hudi 資料湖實戰操作

Hadoop 安裝可參考我這篇文章:大資料Hadoop原理介紹+安裝+實戰操作(HDFS+YARN+MapReduce)

Hadoop HA安裝可參考我這篇文章:大資料Hadoop之——Hadoop 3。3。4 HA(高可用)原理與實現(QJM)

Spark 環境配置可以參考我這篇文章:大資料Hadoop之——計算引擎Spark

1)Spark 測試

cd $SPARK_HOMEhdfs dfs -mkdir /tmp/hdfs dfs -put README。md /tmp/hdfs dfs -text /tmp/README。md# 啟動spark-shell。/bin/spark-shell ——master local[2]val datasRDD = sc。textFile(“/tmp/README。md”)# 行數datasRDD。count()# 讀取第一行資料datasRDD。first()val dataframe = spark。read。textFile(“/tmp/README。md”)dataframe。printSchemadataframe。show(10,false)

大資料Hadoop之——Apache Hudi 資料湖實戰操作

2)Spark 與 Hudi 整合使用

官方示例:https://hudi。apache。org/docs/quick-start-guide/

在spark-shell命令列,對Hudi表資料進行操作,需要執行spark-shell命令是,新增相關的依賴包,命令如下:

1、啟動spark-shell

【第一種方式】線上聯網下載相關jar包

### 啟動spark-shell,使用spark-shell操作hudi資料湖### 第一種方式。/bin/spark-shell \ ——master local[2] \ ——packages org。apache。hudi:hudi-spark3。2-bundle_2。12:0。12。0 \ ——conf ‘spark。serializer=org。apache。spark。serializer。KryoSerializer’ \ ——conf ‘spark。sql。catalog。spark_catalog=org。apache。spark。sql。hudi。catalog。HoodieCatalog’ \ ——conf ‘spark。sql。extensions=org。apache。spark。sql。hudi。HoodieSparkSessionExtension’### 上述命令需要聯網,基於ivy下載下載相關jar包到本地,然後載入到CLASSPATH,其中包含三個jar包。

【第二種方式】離線使用已經下載好的jar包

### 第二種方式,使用——jarscd /opt/apachewget https://repo1。maven。org/maven2/org/apache/spark/spark-avro_2。12/3。3。0/spark-avro_2。12-3。3。0。jarcd $SPARK_HOME。/bin/spark-shell \——master local[2] \——jars /opt/apache/hudi-0。12。0/packaging/hudi-spark-bundle/target/hudi-spark3。2-bundle_2。12-0。12。0。jar,/opt/apache/hudi-0。12。0/hudi-examples/hudi-examples-spark/target/lib/unused-1。0。0。jar,/opt/apache/spark-avro_2。12-3。3。0。jar \——conf ‘spark。sql。catalog。spark_catalog=org。apache。spark。sql。hudi。catalog。HoodieCatalog’ \——conf “spark。serializer=org。apache。spark。serializer。KryoSerializer”

2、匯入park及Hudi相關包

import org。apache。hudi。QuickstartUtils。_import scala。collection。JavaConversions。_import org。apache。spark。sql。SaveMode。_import org。apache。hudi。DataSourceReadOptions。_import org。apache。hudi。DataSourceWriteOptions。_import org。apache。hudi。config。HoodieWriteConfig。_import org。apache。hudi。common。model。HoodieRecord

3、定義變數

val tableName = “hudi_trips_cow”# 儲存到HDFSval basePath = “hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow”# 儲存到本地# val basePath = “file:///tmp/hudi_trips_cow”

4、模擬生成Trip乘車資料

##構建DataGenerator物件,用於模擬生成10條Trip乘車資料val dataGen = new DataGenerator val inserts = convertToStringList(dataGen。generateInserts(10))

其中,DataGenerator可以用於生成測試資料,用來完成後續操作。

5、將模擬資料List轉換為DataFrame資料集

##轉成dfval df = spark。read。json(spark。sparkContext。parallelize(inserts,2))##檢視資料結構df。printSchema()##檢視資料df。show()# 指定欄位查詢df。select(“rider”,“begin_lat”,“begin_lon”,“driver”,“end_lat”,“end_lon”,“fare”,“partitionpath”,“ts”,“uuid”)。show(10,truncate=false)

6、將資料寫入到hudi

# 將資料儲存到hudi表中,由於Hudi誕生時基於Spark框架,所以SparkSQL支援Hudi資料來源,直接透過format指定資料來源Source,設定相關屬性儲存資料即可,注意,hudi不是正真儲存資料,而是管理資料。df。write。format(“hudi”)。 options(getQuickstartWriteConfigs)。 option(PRECOMBINE_FIELD_OPT_KEY, “ts”)。 option(RECORDKEY_FIELD_OPT_KEY, “uuid”)。 option(PARTITIONPATH_FIELD_OPT_KEY, “partitionpath”)。 option(TABLE_NAME, tableName)。 mode(Overwrite)。 save(basePath)## 重要引數說明#引數:getQuickstartWriteConfigs,設定寫入/更新資料至Hudi時,Shuffle時分割槽數目#引數:PRECOMBINE_FIELD_OPT_KEY,資料合併時,依據主鍵欄位#引數:RECORDKEY_FIELD_OPT_KEY,每條記錄的唯一id,支援多個欄位#引數:PARTITIONPATH_FIELD_OPT_KEY,用於存放資料的分割槽欄位

本地儲存

大資料Hadoop之——Apache Hudi 資料湖實戰操作

HDFS 儲存

大資料Hadoop之——Apache Hudi 資料湖實戰操作

四、Flink 與 Hudi 整合使用

官方示例:https://hudi。apache。org/docs/flink-quick-start-guide

1)啟動flink叢集

下載地址:http://flink。apache。org/downloads。html

### 1、下載軟體包wget https://dlcdn。apache。org/flink/flink-1。14。6/flink-1。14。6-bin-scala_2。12。tgztar -xf flink-1。14。6-bin-scala_2。12。tgzexport FLINK_HOME=/opt/apache/flink-1。14。6### 2、設定HADOOP_CLASSPATH# HADOOP_HOME is your hadoop root directory after unpack the binary package。export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`export HADOOP_CONF_DIR=‘/opt/apache/hadoop/etc/hadoop’### 3、啟動單節點flink 叢集# Start the Flink standalone cluster,這裡先修改slot數量,預設是1,這裡改成4# taskmanager。numberOfTaskSlots: 4cd $FLINK_HOME。/bin/start-cluster。sh# 測試可用性。/bin/flink run examples/batch/WordCount。jar

大資料Hadoop之——Apache Hudi 資料湖實戰操作

2) 啟動flink SQL 客戶端

# 【第一種方式】指定jar包。/bin/sql-client。sh embedded -j 。。/hudi-0。12。0/packaging/hudi-flink-bundle/target/hudi-flink1。14-bundle-0。12。0。jar shell# 【第二種方式】還可以將jar包放在$FINK_HOME/lib目錄下。/bin/sql-client。sh embedded shell

3)新增資料

—— sets up the result mode to tableau to show the results directly in the CLISET ‘sql-client。execution。result-mode’ = ‘tableau’;CREATE TABLE t1( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( ‘connector’ = ‘hudi’, ‘path’ = ‘hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1’, ‘table。type’ = ‘MERGE_ON_READ’ —— this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE);INSERT INTO t1 VALUES (‘id1’,‘Danny’,23,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’);—— insert data using valuesINSERT INTO t1 VALUES (‘id1’,‘Danny’,23,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’), (‘id2’,‘Stephen’,33,TIMESTAMP ‘1970-01-01 00:00:02’,‘par1’), (‘id3’,‘Julian’,53,TIMESTAMP ‘1970-01-01 00:00:03’,‘par2’), (‘id4’,‘Fabian’,31,TIMESTAMP ‘1970-01-01 00:00:04’,‘par2’), (‘id5’,‘Sophia’,18,TIMESTAMP ‘1970-01-01 00:00:05’,‘par3’), (‘id6’,‘Emma’,20,TIMESTAMP ‘1970-01-01 00:00:06’,‘par3’), (‘id7’,‘Bob’,44,TIMESTAMP ‘1970-01-01 00:00:07’,‘par4’), (‘id8’,‘Han’,56,TIMESTAMP ‘1970-01-01 00:00:08’,‘par4’);

大資料Hadoop之——Apache Hudi 資料湖實戰操作

HDFS上檢視

大資料Hadoop之——Apache Hudi 資料湖實戰操作

4)查詢資料(批式查詢)

select * from t1;

大資料Hadoop之——Apache Hudi 資料湖實戰操作

5)更新資料

—— this would update the record with key ‘id1’insert into t1 values (‘id1’,‘Danny’,27,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’);

6)Streaming Query(流式查詢)

首先建立表t2,設定相關屬性,以

流的方式查詢讀取,對映到上面表:t1

read。streaming。enabled

設定為true,表明透過streaming的方式讀取表資料;

read。streaming。check-interval

指定了source監控新的commits的間隔時間4s

table。type

設定表型別為 MERGE_ON_READ

CREATE TABLE t2( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( ‘connector’ = ‘hudi’, ‘path’ = ‘hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1’, ‘table。type’ = ‘MERGE_ON_READ’, ‘read。streaming。enabled’ = ‘true’, —— this option enable the streaming read ‘read。start-commit’ = ‘20210316134557’, —— specifies the start commit instant time ‘read。streaming。check-interval’ = ‘4’ —— specifies the check interval for finding new source commits, default 60s。);—— Then query the table in stream modeselect * from t2;

注意:檢視可能會遇到如下錯誤:

[ERROR] Could not execute SQL statement。 Reason: java。lang。ClassNotFoundException: org。apache。hadoop。hive。ql。io。parquet。MapredParquetInputFormat

【解決】新增

hadoop-mapreduce-client-core-xxx。jar

hive-exec-xxx。jar

到Flink lib中。

cp /opt/apache/hadoop-3。3。2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3。3。2。jar $FLINK_HOME/libcp 。/hudi-0。12。0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2。3。1-core。jar $FLINK_HOME/lib

大資料Hadoop之——Apache Hudi 資料湖實戰操作

Hive 與 Hudi的整合,小夥伴可以先看官網文件:https://hudi。apache。org/docs/syncing_metastore/#flink-setup

Spark 和 Hudi整合,Flink 與 Hudi整合先到這裡了,還有很多其它大資料元件與Hudi的整合示例講解會放在後面文章講解,請小夥伴耐心等待,有任何疑問歡迎留言,會持續更新【大資料+雲原生】相關的文章~