国产人妻精品区一区二区,国产农村妇女毛片精品久久,JAPANESE日本丰满少妇,日本公妇理伦a片,射精专区一区二区朝鲜

基于flink的數字集成方案
作者 | 移動Labs2023-03-14

1、概述

在實際私有化物聯網平(ping)臺項(xiang)目中,部(bu)分存量(liang)(liang)設備(bei)(bei)(bei)由于異構(gou)總(zong)線、多(duo)制式以太網(wang)(wang)、協議多(duo)樣化(hua)等因素導(dao)致無法(fa)直(zhi)接連(lian)接物(wu)聯(lian)網(wang)(wang)平(ping)臺,大(da)量(liang)(liang)數(shu)據(ju)較難集成,平(ping)臺側(ce)(ce)和設備(bei)(bei)(bei)側(ce)(ce)面(mian)臨大(da)量(liang)(liang)定(ding)制化(hua)開發,成本較高。因此難以推動(dong)客戶(hu)或設備(bei)(bei)(bei)廠商(shang)進行存量(liang)(liang)設備(bei)(bei)(bei)接入改造,導(dao)致設備(bei)(bei)(bei)無法(fa)直(zhi)連(lian)物(wu)聯(lian)網(wang)(wang)平(ping)臺,無法(fa)達到物(wu)聯(lian)網(wang)(wang)平(ping)臺對企業所(suo)有(you)設備(bei)(bei)(bei)數(shu)據(ju)進行統一(yi)納管。

企業(ye)內部存量的(de)數(shu)(shu)據采集系(xi)統(tong)多為“煙囪式”,各個廠商(shang)的(de)系(xi)統(tong)只(zhi)需對接自己廠商(shang)的(de)設備(bei)即可,數(shu)(shu)據孤島問題(ti)突(tu)出。

各(ge)“煙囪”的數據格式(shi)各(ge)不(bu)相同,定制化采集(ji)任務代碼不(bu)可(ke)復用(yong),費(fei)時費(fei)力,難以(yi)同時支撐多個項目。

除(chu)了設(she)備(bei)(bei)數(shu)(shu)據(ju)采(cai)集(ji)外,還有業務(wu)數(shu)(shu)據(ju)采(cai)集(ji)需(xu)求,傳(chuan)統物聯網(wang)系(xi)統只能(neng)采(cai)集(ji)設(she)備(bei)(bei)數(shu)(shu)據(ju)而(er)無法集(ji)成業務(wu)數(shu)(shu)據(ju)。

基于flink的數字集成方案

2、技術選型

數(shu)字(zi)集成技術(shu)通(tong)過對不同系統(tong)數(shu)據(ju)(ju)的(de)(de)抽取(Extract),數(shu)據(ju)(ju)清洗和轉換(Transformation)以及輸入最(zui)終的(de)(de)目(mu)標系統(tong)(Load),打(da)通(tong)各個業務孤島,實(shi)(shi)現(xian)數(shu)據(ju)(ju)互聯互通(tong),助力(li)企(qi)業數(shu)字(zi)化轉型。由于(yu)物聯網(wang)場景下的(de)(de)數(shu)據(ju)(ju)處理(li)大多都要求實(shi)(shi)時(shi)性,所(suo)以要求實(shi)(shi)現(xian)時(shi)具備(bei)實(shi)(shi)時(shi)數(shu)據(ju)(ju)處理(li)能(neng)力(li)。實(shi)(shi)時(shi)計(ji)(ji)算也(ye)被稱作流(liu)計(ji)(ji)算,代(dai)(dai)表(biao)是Storm、Spark Streaming、Flink等大數(shu)據(ju)(ju)技術(shu)。計(ji)(ji)算引(yin)擎也(ye)在不斷更新迭(die)代(dai)(dai),從第一代(dai)(dai)的(de)(de)Hadoop MapReduce,到(dao)第二代(dai)(dai)的(de)(de)Spark,再(zai)(zai)到(dao)第三代(dai)(dai)的(de)(de)Flink技術(shu),從批處理(li)到(dao)微批,再(zai)(zai)到(dao)真正的(de)(de)流(liu)式計(ji)(ji)算。

Apache Flink是一個(ge)開源的(de)(de)流(liu)(liu)(liu)處理框架,應(ying)用于(yu)分布(bu)式(shi)、高性能、高可(ke)用的(de)(de)數(shu)(shu)據(ju)流(liu)(liu)(liu)應(ying)用程序。可(ke)以(yi)(yi)處理有(you)限數(shu)(shu)據(ju)流(liu)(liu)(liu)和無限數(shu)(shu)據(ju),即能夠處理有(you)邊界和無邊界的(de)(de)數(shu)(shu)據(ju)流(liu)(liu)(liu)。無邊界的(de)(de)數(shu)(shu)據(ju)流(liu)(liu)(liu)就是真正意義上的(de)(de)流(liu)(liu)(liu)數(shu)(shu)據(ju),所(suo)以(yi)(yi)Flink是支(zhi)持流(liu)(liu)(liu)計(ji)算的(de)(de)。Flink可(ke)以(yi)(yi)部署(shu)在各種集(ji)群環(huan)境,可(ke)以(yi)(yi)對各種大小規模的(de)(de)數(shu)(shu)據(ju)進(jin)行快速(su)計(ji)算。

Flink框架具(ju)備強大的流式ETL的能(neng)力,依靠其豐富的算子(zi)實現。

2.1 Source算子

Flink可以使用(yong)StreamExecutionEnvironment.addSource(source)來為我們(men)的程序添加數據來源。

Flink已經提供(gong)了(le)若干實(shi)現好的source functions,當(dang)然(ran)也可(ke)通過實(shi)現SourceFunction來(lai)自(zi)定義非并(bing)行的source或者實(shi)現ParallelSourceFunction接口或者擴展RichParallelSourceFunction來(lai)自(zi)定義并(bing)行的source。

Flink在流(liu)處理上的source大(da)致有(you)4大(da)類(lei):

  • 基(ji)于本(ben)地集合的source(Collection-based-source)

  • 基于文(wen)件(jian)的(de)source(File-based-source)- 讀取(qu)文(wen)本文(wen)件(jian),即符(fu)合TextInputFormat規范的(de)文(wen)件(jian),并(bing)將其作為字符(fu)串返(fan)回

  • 基(ji)于網絡(luo)套接字的(de)source(Socket-based-source)- 從socket讀取。元素可以用分隔符切分。

  • 自定義(yi)的(de)source(Custom-source)

使用自定義Source算子可實現豐富的數據抽取(qu)功能。

2.2 Transform轉換算子

① map

將DataStream中的每(mei)一個元素(su)轉換為另外一個元素(su),如將元素(su)x變為原(yuan)來的5倍:

dataStream.map { x => x * 5 }

② FlatMap

采用(yong)一(yi)(yi)個(ge)數(shu)(shu)據元并(bing)生成零個(ge),一(yi)(yi)個(ge)或多個(ge)數(shu)(shu)據元。如(ru),將句(ju)子分(fen)割為單詞(ci)的flatmap函數(shu)(shu):

dataStream.flatMap { str => str.split(" ") }

③ Filter

計(ji)算每個數(shu)據(ju)元的(de)布爾函數(shu),并(bing)保存函數(shu)返回true的(de)數(shu)據(ju)元。如,過濾掉零(ling)值的(de)過濾器:

dataStream.filter { x != 0 }

當然flink還具備(bei)很(hen)多其他功(gong)能的(de)轉(zhuan)(zhuan)換(huan)算子(zi),如(ru)KeyBy、Reduce、Aggregations等(deng),通過豐(feng)富(fu)的(de)轉(zhuan)(zhuan)換(huan)算子(zi),flink可實(shi)現(xian)對(dui)數據的(de)清洗和轉(zhuan)(zhuan)換(huan)功(gong)能。

2.3 Sink算子

Flink的sink算(suan)子(zi)支(zhi)持將數據輸(shu)出(chu)到(dao)(dao)(dao):本地(di)文件、本地(di)集(ji)合、HDFS,除此之外(wai),還支(zhi)持:sink到(dao)(dao)(dao)kafka、sink到(dao)(dao)(dao)mysql、sink到(dao)(dao)(dao)redis以及自定義sink算(suan)子(zi)。

通過自定義sink算子將清洗轉換完成(cheng)的數據輸入目標系統。

3、數字集成實現

實現過程如下:

實現過程

第一步(bu),抽象(xiang)定義基(ji)礎控件(jian)類(lei)

數(shu)字集成基于flink可抽象定義3類(lei)基礎功能控件,每類(lei)控件又可根據不同的(de)功能實現具(ju)體的(de)子類(lei)功能控件;詳細(xi)如下:

基礎功能(neng)控(kong)件(jian)分為三(san)類:數據(ju)源控(kong)件(jian)、數據(ju)輸出(chu)控(kong)件(jian)、數據(ju)處理控(kong)件(jian)。

基于flink的數字集成方案

數(shu)(shu)據(ju)源(yuan)控件(jian):將Source算子抽象(xiang)定義成(cheng)具備抽取數(shu)(shu)據(ju)功能的數(shu)(shu)據(ju)源(yuan)控件(jian)類,并制(zhi)定相應的配置規范,使用時(shi)只(zhi)需根據(ju)規范配置文件(jian),系統根據(ju)配置文件(jian)創建具體的實(shi)例化對象(xiang),實(shi)現數(shu)(shu)據(ju)抽取功能;

數據(ju)(ju)操作控(kong)件(jian):根據(ju)(ju)不同的基礎功能需求將Transform算子抽象(xiang)成數據(ju)(ju)處理控(kong)件(jian)類,制(zhi)定相應(ying)的配置規(gui)范(fan),使用時只需根據(ju)(ju)規(gui)范(fan)配置文(wen)件(jian),系統根據(ju)(ju)配置創建相應(ying)的實例化對象(xiang)實現數據(ju)(ju)處理功能;

數(shu)據(ju)輸(shu)出(chu)(chu)控件:將Sink算子抽象成數(shu)據(ju)輸(shu)出(chu)(chu)控件類(lei),制定相應的配置規(gui)范(fan),使用時只需根據(ju)規(gui)范(fan)配置文件,系統根據(ju)配置創建實例化(hua)對象實現數(shu)據(ju)輸(shu)出(chu)(chu)功能。

同(tong)時系統內部明確定義flink算子之間流轉的(de)數(shu)據格(ge)式(shi)作為內部流轉數(shu)據格(ge)式(shi)以及根據配置(zhi)輸出每(mei)個基(ji)礎功能控(kong)件輸出的(de)數(shu)據格(ge)式(shi)。

第二步,根據抽象定(ding)(ding)義的基礎功能控件(jian),制定(ding)(ding)具體配置規范

基礎功(gong)能(neng)控(kong)件規范如下:

基礎功能控件規范

通過(guo)以上兩(liang)步規(gui)范定義后,在(zai)(zai)同一個(ge)系(xi)統中,同一個(ge)處(chu)理過(guo)程只(zhi)需(xu)要定義一個(ge)基礎功能控件(jian)規(gui)范。如Kafka消費者(zhe)(zhe)所(suo)需(xu)的(de)配(pei)置如Kafka集(ji)群地(di)址、消費群組、數(shu)(shu)(shu)(shu)據所(suo)在(zai)(zai)topic、數(shu)(shu)(shu)(shu)據所(suo)在(zai)(zai)分區(qu)key,消費位置等,只(zhi)需(xu)要規(gui)定上述(shu)舉(ju)例這樣一個(ge)Kafka消費控件(jian)并(bing)開發(fa)實(shi)現,該(gai)控件(jian)就(jiu)可(ke)(ke)以在(zai)(zai)該(gai)系(xi)統中復(fu)用,每次配(pei)置的(de)數(shu)(shu)(shu)(shu)據處(chu)理工作流(liu),復(fu)用Kafka消費控件(jian)類并(bing)根據新(xin)配(pei)置的(de)源系(xi)統提供的(de)Kafka集(ji)群地(di)址、數(shu)(shu)(shu)(shu)據所(suo)在(zai)(zai)topic等配(pei)置即可(ke)(ke)實(shi)例化該(gai)工作流(liu)所(suo)需(xu)的(de)kafka 消費者(zhe)(zhe),實(shi)現過(guo)程從開發(fa)無(wu)數(shu)(shu)(shu)(shu)次Kafka Consumer的(de)代(dai)碼變(bian)為實(shi)現一次Kafka Consumer控件(jian)代(dai)碼,大量節省開發(fa)時(shi)間和(he)開發(fa)成本。

第三步(bu),通過對基(ji)本功(gong)(gong)能的(de)抽象(xiang),實(shi)現(xian)如HTTP請求(qiu)、kafka生產、數(shu)據(ju)遍歷(li)、條(tiao)件循(xun)環、數(shu)據(ju)映射、MySQL寫操(cao)作等(deng)基(ji)礎(chu)功(gong)(gong)能控件并實(shi)現(xian),再根據(ju)各(ge)個基(ji)礎(chu)功(gong)(gong)能運行的(de)先后邏輯組裝相應配置執行腳(jiao)本來編排組建成一個完(wan)整flink流(liu)處理鏈路,即(ji)可完(wan)成不同系統間的(de)數(shu)據(ju)集成功(gong)(gong)能。

如(ru)在私有(you)(you)化項目中有(you)(you)將設備廠(chang)商(shang)云(yun)平(ping)(ping)臺(tai)中智能(neng)門鎖(suo)(suo)狀態信(xin)息(xi)同步(bu)至自有(you)(you)云(yun)平(ping)(ping)臺(tai)進行智能(neng)門鎖(suo)(suo)控制的需(xu)求,由(you)于智能(neng)門鎖(suo)(suo)設備協(xie)議與自有(you)(you)物聯(lian)網(wang)平(ping)(ping)臺(tai)數(shu)據采(cai)集協(xie)議不適(shi)配,無法直連(lian),由(you)設備廠(chang)商(shang)云(yun)平(ping)(ping)臺(tai)提供(gong)智能(neng)門鎖(suo)(suo)狀態信(xin)息(xi)推(tui)送功能(neng),由(you)自有(you)(you)物聯(lian)網(wang)平(ping)(ping)臺(tai)提供(gong)推(tui)送數(shu)據接(jie)收接(jie)口,完成(cheng)智能(neng)門鎖(suo)(suo)狀態信(xin)息(xi)的同步(bu)功能(neng)。

在此案例中,通(tong)過flink框架的(de)(de)自(zi)定義Source算(suan)子實現HTTP POST功(gong)能(neng)接口的(de)(de)HTTP監聽控件(jian)完成設備廠商云平臺的(de)(de)推送數(shu)(shu)據(ju)(ju)接收功(gong)能(neng),將(jiang)接收到(dao)的(de)(de)智(zhi)能(neng)門鎖狀態(tai)信(xin)息(xi)根據(ju)(ju)智(zhi)能(neng)門鎖ID、狀態(tai)status與自(zi)有云平臺存儲(chu)的(de)(de)狀態(tai)進行比較的(de)(de)IF分支控件(jian),將(jiang)存在狀態(tai)變(bian)化(hua)的(de)(de)智(zhi)能(neng)門鎖狀態(tai)信(xin)息(xi)數(shu)(shu)據(ju)(ju)向后序(xu)Sink算(suan)子流轉(zhuan),通(tong)過自(zi)定義Sink算(suan)子實現自(zi)有云平臺數(shu)(shu)據(ju)(ju)上傳功(gong)能(neng),完成智(zhi)能(neng)門鎖狀態(tai)信(xin)息(xi)的(de)(de)跨(kua)平臺更新功(gong)能(neng)。

基于flink的數字集成方案

第四步(bu),根(gen)據組建好的執(zhi)行邏輯生成有向(xiang)無環圖,提交Flink運行,具體如下:

通過(guo)對不同(tong)的(de)基礎(chu)功(gong)能控件(jian),基于(yu)有(you)向無環(huan)(huan)(huan)圖,將基礎(chu)功(gong)能控件(jian)放入(ru)有(you)向無環(huan)(huan)(huan)圖的(de)頂(ding)點,其(qi)中(zhong)(zhong)整(zheng)個圖中(zhong)(zhong)只有(you)一個數據(ju)源控件(jian),且無其(qi)他基礎(chu)功(gong)能控件(jian)可以(yi)將數據(ju)傳(chuan)輸(shu)(shu)(shu)給(gei)它;數據(ju)輸(shu)(shu)(shu)出(chu)控件(jian)和(he)數據(ju)操作控件(jian)可以(yi)多(duo)個,對應(ying)多(duo)條(tiao)分支處理邏輯。將數據(ju)傳(chuan)輸(shu)(shu)(shu)方(fang)向作為有(you)向無環(huan)(huan)(huan)圖的(de)邊,以(yi)此(ci)(ci)連(lian)接和(he)組(zu)織(zhi)跨(kua)系統數據(ju)傳(chuan)輸(shu)(shu)(shu)過(guo)程中(zhong)(zhong)針(zhen)對數據(ju)的(de)不同(tong)邏輯順序,生成一條(tiao)完(wan)整(zheng)的(de)數據(ju)傳(chuan)輸(shu)(shu)(shu)處理鏈路,將此(ci)(ci)圖完(wan)整(zheng)實現,提交flink執行,即可實現完(wan)整(zheng)的(de)數據(ju)抽(chou)取、轉換(huan)以(yi)及輸(shu)(shu)(shu)出(chu)的(de)數字集成功(gong)能。

4、總結

最后(hou)我(wo)們來總結下基于Flink的(de)數字(zi)集(ji)成(cheng)能(neng)(neng)(neng)力的(de)實現(xian)。得益于flink在ETL數據(ju)(ju)集(ji)成(cheng)上(shang)的(de)豐富(fu)(fu)能(neng)(neng)(neng)力以及算子之(zhi)間易于處(chu)理的(de)基礎功(gong)(gong)能(neng)(neng)(neng),我(wo)們將(jiang)(jiang)flink的(de)3類(lei)算子進行(xing)抽象定(ding)義實現(xian)3類(lei)基礎功(gong)(gong)能(neng)(neng)(neng)控件,實現(xian)不同(tong)的(de)數據(ju)(ju)處(chu)理過程。根據(ju)(ju)不同(tong)的(de)功(gong)(gong)能(neng)(neng)(neng)需求,通(tong)過Source算子實現(xian)從消(xiao)息隊(dui)列(lie)、API、數據(ju)(ju)庫等(deng)多種(zhong)數據(ju)(ju)源抽取數據(ju)(ju)的(de)功(gong)(gong)能(neng)(neng)(neng);通(tong)過豐富(fu)(fu)的(de)Transform算子實現(xian)數據(ju)(ju)的(de)清洗、篩選、轉換的(de)功(gong)(gong)能(neng)(neng)(neng);最后(hou)可(ke)(ke)通(tong)過Sink算子實現(xian)將(jiang)(jiang)目標格式數據(ju)(ju)輸入(ru)目標系統接收(shou)數據(ju)(ju)的(de)渠(qu)道如消(xiao)息隊(dui)列(lie)、數據(ju)(ju)庫、API等(deng)。綜上(shang)所述,基于Flink的(de)數字(zi)集(ji)成(cheng)能(neng)(neng)(neng)力是可(ke)(ke)以實現(xian)并且具備(bei)豐富(fu)(fu)功(gong)(gong)能(neng)(neng)(neng)和可(ke)(ke)擴展性的(de)。

熱門文章
近日,智次方·物聯網智庫與瀚云科技工業互聯網研究院副院長孫文荊就工業互聯網平臺如何拉動中小企業參與數字化轉型等話題進行了深入交流,并以此為基礎撰寫了本篇文章,以期為讓讀者更了解我國工業互聯網平臺發展現
2023-03-14
X