Flink實時計算應用實踐:問題剖析及解決方案

Flink實時計算應用實踐:問題剖析及解決方案

1。簡介

隨公司業務的發展,對實時計算的需求也越來越多,目前Flink已廣泛應用於實時ETL,實時數倉、特徵工程和線上資料服務等業務場景。

本文首先簡單介紹了Flink實時計算基本概念,然後引出基於Flink進行實時計算開發過程當中碰到的一些問題,透過對這些常見實時計算問題的產生原因進行深入分析,進而給出相應問題的解決方案。

2。Flink基本概念

Flink是一個分散式大資料處理引擎和框架,可對有無界和有界資料流進行有狀態的計算,能夠部署在各種叢集環境中,對各種規模大小的資料進行快速計算。

在進行Flink應用開發前,我們先理解流treams、狀態State、時間Time ,Watermark等Flink基礎語義概念以及Flink 兼顧靈活性和方便性的多層次API。

2.1 流(Stream)

Stream分為有限資料流與無限資料流,unbounded stream 是有始無終的資料流,即無限資料流;而bounded stream 是限定大小的有始有終的資料集合,即有限資料流,二者的區別在於無限資料流的資料會隨時間的推演而持續增加,計算持續進行且不存在結束的狀態,相對的有限資料流資料大小固定,計算最終會完成並處於結束的狀態。

2.2 狀態(State)

狀態是計算過程中的產生的計算結果,在容錯恢復和Checkpoint中有重要的作用。流計算在本質上是Incremental Processing,因此需要將計算的中間結果儲存到狀態中,例如按天統計每個區域的GMV,因此當訂單資料到達時,需要將前面計算的該訂單所屬區域的GMV資料從狀態中讀取出來,然後再與該訂單額進行累加,累加完後將結果再次更新到狀態中;Flink正是透過將計算中間結果儲存到狀態後端中,才保證在整個分散式系統執行失敗或者掛掉的情況下做到Exactly-once,增強容災的效果。圖2-1是運算元在聚合計算時,讀寫state狀態示意圖。

Flink實時計算應用實踐:問題剖析及解決方案

圖2-1運算元讀寫狀態示意圖

2.3 時間語義(Time)

Flink的時間語義分為Event time、Ingestion time和Processing time。Flink的無限資料流是一個持續的過程,時間是我們判斷業務狀態是否滯後,資料處理是否及時的重要依據。

Flink實時計算應用實踐:問題剖析及解決方案

圖2-2 Flink時間語義

Event Time是在事件生產端產生該事件的時間,例如使用者下的訂單,訂單的下單時間就是事件產生的時間Event Time。

Ingestion time是事件進入Flink的時間。

Processing time是事件被處理時,當前的系統時間。

2.4 Watermark

流處理從事件產生,到流經source,再到operator運算元,中間是有一個過程和時間的,雖然大部分情況下,流到operator運算元的資料都是按照事件產生的時間順序來的,但是也不排除由於網路、分散式等原因,導致亂序的產生。

因此一旦出現亂序,如果只根據Event Time來決定window的執行,不能保證在視窗關閉之前,所有的資料都能到達,但我們又不能無限期的等待下去,此時我們需要有一個機制來儲存在一個特定時間後,必須觸發視窗計算,這一個機制就是watermark。Watermark特點如下:

Watermark是一種衡量Event Time進展的機制。

Watermark是用於處理亂序事件的, 它與window相結合,可以正確處理亂序事件。

watermark是單調遞增的。watermark = Math。max(eventTime - dalayTime ,watermark), 一旦watermark大於等於某個視窗的end_time時,就會觸發視窗的計算,因此Watermark就是用觸發視窗計算的。圖2-3是無序資料流中的watermark示例,它的最大亂序時間為2。

Flink實時計算應用實踐:問題剖析及解決方案

圖2-3 無序資料的watermark

Watermark具有傳遞性。如果一個運算元上游有多個運算元,則該運算元的watermark值為取上游所有流入該運算元的watermark的最小值。圖2-4為watermark的傳遞性圖。

Flink實時計算應用實踐:問題剖析及解決方案

圖2-4 Watermark的傳遞性

2.5 API層

API 通常分為三層,由上而下可分為SQL 、Table API、DataStream API、ProcessFunction三層,API的表達能力及業務抽象能力都非常強大,但越接近SQL層,表達能力會逐步減弱,抽象能力會增強,反之,ProcessFunction 層API的表達能力非常強,可以進行多種靈活方便的操作,但抽象能力也相對越小。圖2-5為Flink API的層結構圖。

Flink實時計算應用實踐:問題剖析及解決方案

圖2-5 Flink API層結構圖

3。Flink實時計算常見問題分析

3.1 資料亂序問題分析

資料亂序是指Flink在使用Event Time處理流式資料時,由於分散式或網路原因,導致資料到達處理機制進行處理時並不是按照資料產生的時間先後順序到達的。導致資料亂序的場景如下:

1、kafka多分割槽導致資料亂序

圖3-1中的數字表示資料產生的時間順序,資料生產端按資料產生時間將資料推送到kafka,因由kafka存在多個分割槽,在kafka的預設分發機制下,資料到流動如圖3-1所示:

Flink實時計算應用實踐:問題剖析及解決方案

圖3-1 Kafka多分割槽亂序圖

圖3-1所示,假如Flink的source的並行度與kafka的分割槽一致,資料經過Flink的source後,再經過keyby分割槽開窗後,資料就會存在亂序情況,例如key為0的分割槽運算元,視窗接受到的資料順序是:1,11,26,2。

2、由於網路延遲導致資料亂序

當存在多個數據生產端時,每一個生產端都將產生的資料,透過網路傳輸將資料傳送到kafka,這裡為了說明網路延遲,將kafka的分割槽設定為1。由於每一個生產端到kafka端走的網路路徑的不同,可能存在有一些存在鏈路阻塞,長短等情況,從而導致資料到kafak的時間並不是按照資料產生的時間順序到達的。圖3-2為網路原因導致資料亂序的示意圖。

Flink實時計算應用實踐:問題剖析及解決方案

圖3-2 網路延遲導致資料亂序圖

3.2 Flink大狀態場景及問題分析

Flink大狀態形成的主是因為flink在計算過程中,存在大量的計算中間結果。那麼在那些場景下會使Flink的狀態比較大,目前在我們實踐中主要有下面幾種場景:

1、資料量大且開窗的時間比較長,例如開1天,1周甚至1個月的視窗。這種開窗時間比較長,要快取的資料就比較大,因此狀態也會比較大。例如按天統計UV資料。

2、資料量比較大的二條流join時相互等待的時間比較長,例如訂單資料與物流的配送資料做join得到訂單的物流狀態,這種情況下,訂單資料可能要等待物流配送資料最大時間要1天左右。這也會導致二條流的狀態比較大。

3、Group By分組的key的資料量大,計算的指標項非常多或計算的步驟多複雜度高,這種情況會導致要儲存大量的中間計算結果,從而導致狀態比較大。

圖3-3是一個後端曝光流與前端使用者點選流join場景下checkpoint資訊圖,在二條流的資料相互等待30分鐘的情況下,checkpoint就已經比較大了,如果等待時間更長,狀態會更大。

Flink實時計算應用實踐:問題剖析及解決方案

圖3-3 Flink checkpoint圖

在狀態比較大的場景下,可能存在以下問題:

1、Flink的堆外記憶體超額使用,導致Yarn將Task Manager(以下簡稱TM)的容器kill掉,導致任務Failover。這種情況主要是Flink對RocksDB的使用記憶體沒有限制,當狀態越來越大的時,堆外記憶體使用量就會超額使用,一旦TM的記憶體超過Yarn分配給TM所在容器的記憶體,就會導致Yarn將容器kill掉。這種情況我們可以透過Yarn的日誌分析得到原因。

2、Checkpoint狀態大,Checkpoint時間長。這種情況,我們可以首先從Sub Task checkpoint資訊入手。如圖3-4所示:

Flink實時計算應用實踐:問題剖析及解決方案

圖3-4 subTask checkpoint詳情圖

先看checkpoint Data Size大小,如果狀態很大,再看End to End Duration時間,如果時間比較大,比如做一次checkpoint要花十幾甚至幾十分鐘的話,我們接著再看具體的sub Task checkpoint的時間耗時情況。

1、Checkpoint時sub Task的Latest Acknowledgement的狀態一直為n/a

這種情況下要去分析是任務是否存在背壓的情況。我們可以透過Flink Web UI檢視運算元merics的inPoolUsage和outPoolUsage的使用率。如果outPoolUsage的使用低而inPoolUsage使用率高,說明背壓是由該運算元產生的。如果inPoolUsage使用率低,而outPoolUsage使用率很高則說明是下游運算元計算處理不及時導致上游運算元背壓。繼續按上面的方法排查,最終找到產生背壓的運算元

Flink實時計算應用實踐:問題剖析及解決方案

圖3-5 運算元輸入輸出緩衝區使用率圖

2、關注Sync Duration和Async Duration時間

◇Sync Duration時間很長,說明磁碟IO可能成為效能瓶頸

◇Async Duration時間很長,說明是網路IO可能成為效能瓶頸或是資料同步執行緒太少

3.3 資料傾斜問題

資料傾斜由於資料分佈不均勻,造成資料大量的集中到一點,造成資料熱點。在這種Flink實時計算場景中,我們會發現在某個運算元的處理上,一些sub task處理的資料量很大,有一些sub task處理的資料量很小,從flink的叢集介面上,我們也可以看到運算元的背壓情況,在這種場景下,我們透過增加運算元的並行度,並不能改善運算元的背壓情況。例如PV的場景中,有一些網頁的點選量很高,而有一些網頁的點量很少,從而導致某些sub task背壓嚴重。

Flink實時計算應用實踐:問題剖析及解決方案

圖3-6 資料傾斜示意圖

4。實時計算常見問題的解決方案

4.1 資料亂序場景的處理

Flink處理資料亂序的方式有三種:

1、Watermark

我們在設定watermark時,可以設定一個最大的亂序時間,而watermark是以事件時間減去所允許的最大亂序時間作為watermark,因此相當於多給了資料一定的時間,然後關閉視窗,觸發計算。

2、允許遲到(allowedLateness)

如果在watermark的基礎上有的資料還是可能會遲到,這時我們可以再多給資料一定的可以遲到的時間,此時當watermark到達視窗大小時觸發計算,但是不會關閉視窗,而是直到所允許的遲到時間後,才會真正關閉視窗。

3、側輸出流

當資料遲到的時間非常久,前兩種都失效時使用,相當於遲到資料歸放入一個分支流中進行單獨計算。

4。1。1 Flink SQL處理亂序

◇建立kafka動態表

在建立Kafka動態表時,我們使用watermark for orderTime as orderTime – interval ‘2’MINUTE語句來說明提取的watermark為訂單時間減2分鐘。

Flink實時計算應用實踐:問題剖析及解決方案

◇開窗聚合計算

Flink實時計算應用實踐:問題剖析及解決方案

如果想每10秒輸出一次計算結果,可以設定下面引數:

Flink實時計算應用實踐:問題剖析及解決方案

在SQL層面,目前只支援使用watermark來處理資料亂序問題。

4。2。1 DataStream API處理亂序

Flink實時計算應用實踐:問題剖析及解決方案

從上面程式碼可以看出,我們使用時間分配器時,使用WatermarkStrateg。

forBoundedOutOfOrderness(Duration。ofMillis(jobPrarameter。getOutOfBoundMs()))方法來設定資料的最大亂序時間。

Flink實時計算應用實踐:問題剖析及解決方案

在開視窗後,我們又使用DataStream的allowedLatenes(Time。seconds(30))來設定視窗延遲關閉時間為30秒。

如果資料在watermark大於等於視窗的結束時間+最大延遲時間之後才到達,這時,如果我們不做任何處理的話的資料就會被丟棄掉,如果當我們不想丟棄掉這些資料,我們還可以透過側輸出流來解決。解決步驟如下:

1、建立側輸出的Tag標誌物件。

2、在allowedLateness之後,設定將資料輸出到outputTag標誌的側輸出流。

3、從視窗計算後得到的流中透過getSideOutput(outputTag)獲取遲到資料的側輸出流。

4、得到遲到資料的側輸出流後,我們就可以根據業務需要對側輸出流的資料進行相應的處理。在下面的程式碼中我們將視窗聚合計算得到的流與側輸出流進行connect連線後,得到一個ConnectedStreams物件,在ConnectedStreams物件上使用flatMap方法的CoFlatMapFunction將側輸出流的資料累加到聚合計算的結果流上時進行輸出,從而保證了聚合計算後的資料的準確性。

Flink實時計算應用實踐:問題剖析及解決方案

因此,與SQL相比,使用DataStream API我們可以更加靈活的處理資料,因此在處理資料亂序時,根據需要,我們可以同時使用三種處理亂序的方案來處理亂序資料。而SQL目前只支援使用Watermark的方式來處理亂序資料。

4.2 Flink大狀態問題解決方案

在上面的Flink大狀態場景及問題分析章節,我們可以知道大狀態會導致如下問題:

1、記憶體超額使用

2、Checkpoint時間過長

以上問題,我們可以透過以下辦法來解決:

4。2。1 減少狀態大小

如果狀態比較大,我們是否可以減少狀態呢,答案是肯定的。例如當我們狀態使用的POJO類物件時,我們可以使用ProtoStuff來減少物件的儲存空間,從而減少物件序列化後的儲存空間大小。下面是使用ProtoStuff進行序列化和反序列化的程式碼示例。

Flink實時計算應用實踐:問題剖析及解決方案

注:在目前使用的ProtoStuff的版本,由於不支援時間型別,所以對時間型別可以轉換成字串或Long型別來儲存。

4。2。2 記憶體超額使用

如果TM是因為記憶體不足被kill掉,解決方案有二種:

1、想辦法減少狀態大小(如上面的第一點)。

2、增加TM的記憶體,如原來TM為4G,可以增加到6G,甚至更多。

4。2。3 Checkpoint時間長

◇修改checkpoint策略

預設情況下,flink checkpoint 是全量備份,當狀態很大,全量備份會導致每次的checkpoint耗時會很長,開啟checkpoint的增量配置,在可以顯著減少每次checkpoint狀態的大小,以及checkpoint的時間。修改flink-conf。yaml中的配置,將state。backend。incremental設定為true。

◇運算元背壓導致checkpoint時間過長

如果是某個運算元的背壓原因導致checkpoint時間很長,我們可以去最佳化該運算元,找出最耗時的操作進行最佳化。下面是我遇到一些場景的最佳化方案:

1、與維度表join。這種情況下,一是加快取並設定資料過期時間。二是使用Redis來存放維度表的資料。

2、資料傾斜。這一塊我們將在資料傾斜一節專門來講,這裡就不再說了。

運算元的計算邏輯複雜。將計算過程拆分到多個運算元中執行,而不是在一個運算元中完成所有的邏輯計算。

◇磁碟IO導致checkpoint時間過長

如果是因為磁碟IO問題,我們可以透過以下方案來解決:

1、檢查磁碟是否為SSD磁碟, 如果不是,則建議使用SSD磁碟。

2、掛載多塊磁碟,在flink-conf。yaml中透過配置state。backend。rocksdb。localdir來指定多個掛載目錄。不同的目錄掛載到不同的磁碟。

3、透過上面的操作後,還是存在IO問題,比如有的磁碟IO使用高,有的使用低。在這種情況下,可能是由於多個sub task共用同一塊磁碟的問題,導致負載不均衡。解決這一個問題,通常的策略就是負載均衡策略。通用的負載均衡策略有hash, 隨機以及輪循等策略。Flink預設使用的策略是隨機策略,原始碼如下:

Flink實時計算應用實踐:問題剖析及解決方案

如果想使用其他策略,可以修改這裡的原始碼。

◇資料傳輸導致checkpoint時間過長

Async Duration是指RocksDB將本地checkpoint的狀態備份到其他持久化儲存系統所花費的時間,其他儲存系統如HDFS檔案系統。如果是由於Async Duration時間長,先檢查網路IO使用是否很高,如果不是很高,說明RocksDB將本地狀態資料上傳到HDFS時花費的時間太多,這時我們可以提高RocksDB的資料傳輸執行緒數量,因為RocksDB預設的資料傳輸執行緒數量為1,所以我們可以增加執行緒數量來減少Async Duration的時間。增加資料傳輸執行緒數量的程式碼設定如下:

Flink實時計算應用實踐:問題剖析及解決方案

4.3 資料傾斜問題解決方案

4。3。1 Local-Global Aggregation策略

Flink對於資料傾斜的解決方案是採用Local-Global Aggregation策略。Local-Global將一個組聚合分為兩個階段,即首先在上游進行區域性聚合,然後在下游進行全域性聚合。它類似於MapReduce的Combine + Reduce模式。它的原理如下圖4-1所示。

Flink實時計算應用實踐:問題剖析及解決方案

圖4-1 Local-Global Aggregation最佳化示意圖

Flink sql使用Local-Global Aggregation策略非常簡單,只要設定如下配置即可:

Flink實時計算應用實踐:問題剖析及解決方案

4。3。2 Split-Distinct Aggregation

Local-Global 最佳化可有效消除一般聚合的資料傾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在處理distinct 的聚合時,它的效能並不令人滿意。例如下面的SQL:

Flink實時計算應用實踐:問題剖析及解決方案

如果不同鍵(即user_id)的值稀疏,則 COUNT DISTINCT不擅長減少記錄。即使啟用了Local-Global最佳化,也無濟於事。因為累加器仍然包含幾乎所有的原始記錄,全域性聚合將成為瓶頸。如下圖4-2左側所示。

Flink實時計算應用實踐:問題剖析及解決方案

圖4-2 Split-Distinct Aggregation最佳化示意圖

Split-Distinct Aggregation最佳化的想法是將不同的聚合(例如COUNT(DISTINCT col))分成兩個級別。

第一步是按照day和bucket鍵值進行分組聚合。bucket鍵值等於HASH_CODE(user_id) % BUCKET_NUM。BUCKET_NUM預設1024,可透過table。optimizer。distinct-agg。split。bucket-num選項配置。

第二步按原始鍵(即day)分組,SUM用於聚合來自不同儲存桶的 COUNT DISTINCT 值。因為相同的distinct key只會在同一個bucket中計算,所以變換是等價的。

上面的sql透過split-distinct aggregation最佳化後,SQL相當於下面的SQL:

Flink實時計算應用實踐:問題剖析及解決方案

Flink SQL要開啟split-distinct aggregation最佳化,只需要設定下面的配置即可:

Flink實時計算應用實踐:問題剖析及解決方案

5。總結

本文深入分析了Flink實時計算應用實踐中的常見問題:資料亂序,大狀態作業最佳化,資料傾斜,並且對相應問題提出了可行的解決方案。後續我們將探討Flink在智慧風控,實時資料入湖,實時監控等其它業務場景的應用實踐。