首頁  >  文章  >  系統教程  >  即時抽取和基於日誌的資料同步一致性

即時抽取和基於日誌的資料同步一致性

王林
王林轉載
2024-01-16 14:36:05661瀏覽

作者:王東

#宜信技術研發中心架構師

  • 目前就職於宜信技術研發中心,擔任架構師,負責串流運算和大數據業務產品解決方案。
  • 曾任職於Naver china(韓國最大搜尋引擎公司)中國研發中心資深工程師,多年從事CUBRID分散式資料庫叢集開發和CUBRID資料庫引擎開發http://www.cubrid.org/blog/news/cubrid- cluster-introduction/

主題簡介:

  1. #DWS的背景介紹
  2. dbus wormhole整體架構與技術實作方案
  3. DWS的實際運用案例
前言

大家好,我是王東,來自宜信技術研發中心,這是我來社群的第一次分享,如果有什麼不足,請大家多多指正、包涵。

這次分享的主題是《基於日誌的DWS平台實作與應用》,主要是分享一下目前我們在宜信中所做的一些事情。這個主題裡麵包含到2個團隊很多兄弟姊妹的努力的結果(我們團隊和山巍團隊的成果)。這次就由我代為執筆,盡我努力跟大家介紹一下。

其實整個實作從原理上來說是比較簡單的,當然也牽涉到不少技術。我會試著用盡量簡單的方式來表達,讓大家了解這個事情的原理和意義。過程中,大家有問題可以隨時提出,我會盡力去解答。

DWS是一個簡稱,是由3個子項目組成,我稍後再做解釋。

一、背景

事情是從公司前段時間的需求說起,大家知道宜信是網路金融企業,我們的許多數據與標準網路企業不同,大致來說就是:

即時抽取和基於日誌的資料同步一致性

#玩資料的人都知道資料是非常有價值的,然後這些資料是保存在各個系統的資料庫中,如何讓需要資料的使用方得到一致性、即時的資料呢?

過去的一般做法有幾種是:
  1. DBA開放各系統的備庫,在業務低高峰期(如夜間),使用方各自抽取所需資料。由於抽取時間不同,各個資料使用方資料不一致,資料發生衝突,而且重複抽取,相信不少DBA很頭痛這個事情。
  2. 公司統一的大數據平台,透過Sqoop 在業務低高峰期到各個系統統一抽取數據, 並保存到Hive表中, 然後為其他數據使用方提供數據服務。這種做法解決了一致性問題,但時效性差,基本上是T 1的時效。
  3. 基於trigger的方式取得增量變更,主要問題是業務方侵入性大,而且trigger也帶來效能損失。

這些方案都不算完美。我們在了解和考慮了不同實現方式後,最後借鑒了 linkedin的思想,認為要同時解決資料一致性和即時性,比較合理的方法應該是來自於log。

即時抽取和基於日誌的資料同步一致性

#(此圖來自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad- idea/)

把增量的Log當作一切系統的基礎。後續的數據使用方,透過訂閱kafka來消費log。

例如:

  • 大數據的使用方可以將資料儲存到Hive表或Parquet檔案給Hive或Spark查詢;
  • 提供搜尋服務的使用方可以儲存到Elasticsearch或HBase 中;
  • 提供快取服務的使用方可以將日誌快取到Redis或alluxio;
  • 資料同步的使用方可以將資料儲存到自己的資料庫中;
  • 由於kafka的日誌是可以重複消費的,並且緩存一段時間,各個使用方可以透過消費kafka的日誌來達到既能保持與資料庫的一致性,也能保證即時性;

為什麼要使用log和kafka作為基礎,而不使用Sqoop來抽取呢?因為:

即時抽取和基於日誌的資料同步一致性

#為什麼不使用dual write(雙寫)呢? ,請參考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

我這裡就不多做解釋了。

二、總體架構

於是我們提出了建構一個基於log的公司級的平台的想法。

下面解釋一下DWS平台, DWS平台是有3個子項目組成:

  1. Dbus(資料匯流排):負責將資料即時從來源端即時抽出,並轉換為約定的自帶schema的json格式資料(UMS 資料),放入kafka中;
  2. Wormhole(資料交換平台):負責從kafka讀出資料 將資料寫入目標;
  3. Swifts(即時計算平台):負責從kafka中讀出數據,即時計算,並將數據寫回kafka。

即時抽取和基於日誌的資料同步一致性

#圖中:

  • Log extractor和dbus共同完成資料擷取和資料轉換,抽取包含全量和增量抽取。
  • Wormhole可以將所有日誌資料保存到HDFS; 也可以將資料落地到所有支援jdbc的資料庫,落地到HBash,Elasticsearch,Cassandra等;
  • Swifts支援以配置和SQL的方式實現對進行串流計算,包括支援串流join,look up,filter,window aggregation等功能;
  • Dbus web是dbus的配置管理端,rider除了組態管理以外,還包括對Wormhole和Swifts運行時管理,資料品質校驗等。

由於時間關係,我今天主要介紹DWS中的Dbus和Wormhole,在需要的時候附帶介紹一下Swifts。

三、dbus解決方案
日誌解析

如前面所說,Dbus主要解決的是將日誌從來源端即時的抽出。這裡我們以MySQL為例子,簡單說明如何實作。

我們知道,雖然MySQL InnoDB有自己的log,MySQL主備同步是透過binlog來實現的。如下圖:

即時抽取和基於日誌的資料同步一致性

#圖片來自:https://github.com/alibaba/canal

#而binlog有三種模式:
  1. Row 模式:日誌中會記錄成每一行資料被修改的形式,然後在slave端再對相同的資料進行修改。
  2. Statement 模式: 每一筆會修改資料的sql都會記錄到 master的bin-log。 slave在複製的時候SQL程序會解析成和原來master端執行過的相同的SQL來再次執行。
Mixed模式: MySQL會根據執行的每一個具體的sql語句來區分對待記錄的日誌形式,也就是在Statement和Row之間選擇一種。

他們各自的優缺點如下:即時抽取和基於日誌的資料同步一致性

#這裡來自:http://www.jquerycn.cn/a_13625

由於statement 模式的缺點,在與我們的DBA溝通過程中了解到,實際生產過程中都使用row 模式進行複製。這使得讀取全量日誌成為可能。

通常我們的MySQL佈局是採用2個master主庫(vip) 1個slave從庫1個backup容災庫的解決方案,由於容災庫通常是用於異地容災,實時性不高也不便於部署。

為了最小化對來源端產生影響,顯然我們讀取binlog日誌應該從slave從函式庫讀取。

讀取binlog的方案比較多,github上不少,參考https://github.com/search?utf8=✓&q=binlog。最後我們選用了阿里的canal做位日誌抽取方。

###Canal最早被用於阿里中美機房同步, canal原理相對比較簡單:###
  1. Canal模擬MySQL Slave的互動協議,偽裝自己為MySQL Slave,向MySQL Slave傳送dump協定
  2. MySQL master收到dump請求,開始推送binary log給Slave(也就是canal)
  3. Canal解析binary log物件(原始為byte流)

即時抽取和基於日誌的資料同步一致性

#圖片來自:https://github.com/alibaba/canal

#解決方案

Dbus 的MySQL版主要解決方案如下:即時抽取和基於日誌的資料同步一致性

#對於增量的log,透過訂閱Canal Server的方式,我們得到了MySQL的增量日誌:
  • 依照Canal的輸出,日誌是protobuf格式,開發增量Storm程序,將資料即時轉換為我們定義的UMS格式(json格式,稍後我會介紹),並儲存到kafka中;
  • 增量Storm程式也負責捕捉schema變化,以控製版本號碼;
  • 增量Storm的設定資訊保存在Zookeeper中,以滿足高可用需求。
Kafka既作為輸出結果也作為處理過程中的緩衝器和訊息解構區。

在考慮使用Storm作為解決方案的時候,我們主要是認為Storm有以下優點:
  • 技術相對成熟,較穩定,與kafka搭配也算標準組合;
  • 即時性比較高,能夠滿足即時性需求;
  • 滿足高可用需求;
透過配置Storm並發度,可以活動效能擴展的能力; 全量抽取

#對於流水錶,有增量部分就夠了,但是許多表需要知道最初(已存在)的資訊。這時候我們需要initial load(第一次載入)。

對於initial load(第一次載入),同樣開發了全量抽取Storm程式透過jdbc連接的方式,從來源端資料庫的備庫進行拉取。 initial load是拉全部數據,所以我們推薦在業務低高峰期進行。還好只做一次,不需要每天做。

全量抽取,我們借鏡了Sqoop的想法。將全量抽取Storm分為了2 個部分:
  1. 資料分片
實際抽選

資料分片需要考慮分片列,依照設定和自動選擇列將資料依照範圍來分片,並將分片資訊儲存到kafka。 即時抽取和基於日誌的資料同步一致性

#以下是具體的分片策略:即時抽取和基於日誌的資料同步一致性

#全量抽取的Storm程式是讀取kafka的分片訊息,採用多個並發度並行連接資料庫備庫進行拉取。因為抽取的時間可能很長。抽取過程中將即時狀態寫入Zookeeper中,方便心跳程式監控。 即時抽取和基於日誌的資料同步一致性

#統一訊息格式

#無論是增量或全量,最終輸出到kafka中的訊息都是我們約定的一個統一訊息格式,稱為UMS(unified message schema)格式。

如下圖所示:即時抽取和基於日誌的資料同步一致性 即時抽取和基於日誌的資料同步一致性

######

訊息中schema部分,定義了namespace 是由 類型 資料來源名 schema名 表名 版本號 分庫號 分錶號 能夠描述整個公司的所有表,透過一個namespace就能唯一定位。

  • _ums_op_ 表示資料的類型是I(insert),U(update),D(刪除);
  • _ums_ts_ 發生增刪改的事件的時間戳,顯然新的資料發生的時間戳更新;
  • _ums_id_ 訊息的唯一id,保證訊息是唯一的,但這裡我們保證了訊息的先後順序(稍後解釋);

payload是指具體的數據,一個json包裡面可以包含1條至多條數據,提高數據的有效載荷。

UMS中支援的資料類型,參考了Hive類型並進行簡化,基本上包含了所有資料類型。

全量和增量的一致性

在整個資料傳輸中,為了盡量的保證日誌訊息的順序性,kafka我們使用的是1個partition的方式。在一般情況下,基本上是順序的和唯一的。

但是我們知道寫kafka會失敗,有可能重寫,Storm也用重做機制,因此,我們並不嚴格保證exactly once和完全的順序性,但保證的是at least once。

因此_ums_id_變得特別重要。

對於全量抽取,_ums_id_是唯一的,從zk中每個並發度分別取不同的id片區,保證了唯一性和性能,填寫負數,不會與增量數據衝突,也保證他們是早於增量訊息的。

對於增量抽取,我們使用的是MySQL的日誌檔案號碼 日誌偏移量作為唯一id。 Id作為64位元的long整數,高7位元用於日誌檔案號,低12位元作為日誌偏移量。

例如:000103000012345678。 103 是日誌檔案號,12345678 是日誌偏移量。

這樣,從日誌層面保證了物理唯一性(即使重做也這個id號也不變),同時也保證了順序性(還能定位日誌)。透過比較_ums_id_ 消費日誌就能透過比較_ums_id_知道哪一則訊息更新。

其實_ums_ts_與_ums_id_意圖是類似的,只不過有時候_ums_ts_可能會重複,即在1毫秒中發生了多個操作,這樣就得靠比較_ums_id_了。

心跳監控與預警

#整個系統涉及到資料庫的主備同步,Canal Server,多個並發度Storm進程等各個環節。

因此對流程的監控和預警就特別重要。

透過心跳模組,例如每分鐘(可設定)對每個被抽取的表插入一條心態資料並保存發送時間,這個心跳表也被抽取,跟著整個流程下來,與被同步表在實際上走相同的邏輯(因為多個並發的的Storm可能有不同的分支),當收到心跳包的時候,即便沒有任何增刪改的數據,也能證明整條鏈路是通的。

Storm程式和心跳程式將資料傳送公開的統計topic,再由統計程式儲存到influxdb中,使用grafana進行展示,就可以看到如下效果:

即時抽取和基於日誌的資料同步一致性

#圖中是某業務系統的即時監控資訊。上面是即時流量狀況,下面是即時延時情況。可以看到,即時性還是很不錯的,基本上1~2秒資料就已經到末端kafka。

Granfana提供的是一種即時監控能力。

如果出現延遲,則是透過dbus的心跳模組發送郵件警報或簡訊警報。

即時脫敏

#考慮到資料安全性,對於有脫敏需求的場景,Dbus的全量storm和增量storm程式也完成了即時脫敏的功能。脫敏方式有3種:

即時抽取和基於日誌的資料同步一致性

#總結一下:簡單的說,Dbus就是將各種來源的數據,即時的導出,並以UMS的方式提供訂閱, 支援即時脫敏,實際監控和警報。

四、Wormhole解決方案

說完Dbus,該說一下Wormhole,為什麼兩個專案不是一個,而要透過kafka來對接呢?

其中很大一個原因就是解耦,kafka具有天然的解耦能力,程式直接可以透過kafka做異步的訊息傳遞。 Dbus和Wornhole內部也使用了kafka做訊息傳遞和解耦。

另外一個原因是,UMS是自描述的,透過訂閱kafka,任何有能力的使用方來直接消費UMS來使用。

雖然UMS的結果可以直接訂閱,但還需要開發的工作。 Wormhole解決的是:提供一鍵式的配置,將kafka中的資料落地到各種系統中,讓沒有開發能力的資料使用方透過wormhole來實現使用資料。

即時抽取和基於日誌的資料同步一致性

#如圖所示,Wormhole 可以將kafka中的UMS 落地到各種系統,目前用的最多的HDFS,JDBC的資料庫和HBase。

在技術堆疊上, wormhole選擇使用spark streaming來進行。

在Wormhole中,一條flow是指從一個namaspace從來源端到目標端。一個spark streaming服務於多條flow。

即時抽取和基於日誌的資料同步一致性

#選用Spark的理由是很充分的:

  • Spark天然的支援各種異質儲存系統;
  • 雖然Spark Stream比Storm延時稍差,但Spark有著更好的吞吐量和更好的運算效能;
  • Spark在支援平行計算方面有更強的靈活性;
  • Spark提供了一個技術堆疊內解決Sparking Job,Spark Streaming,Spark SQL的統一功能,便於後期開發;

這裡補充說一下Swifts的作用:

  • Swifts的本質是讀取kafka中的UMS數據,進行即時計算,將結果寫入kafka的另外一個topic。
  • 即時運算可以是很多種方式:例如過濾filter,projection(投影),lookup, 串流join window aggregation,可以完成各種具有業務價值的串流即時運算。

Wormhole和Swifts比較如下:

即時抽取和基於日誌的資料同步一致性

#落HDFS

#透過Wormhole Wpark Streaming程式消費kafka的UMS,首先UMS log可以被儲存到HDFS上。

kafka一般只保存若干天的信息,不會保存全部信息,而HDFS中可以保存所有的歷史增刪改的信息。這就使得很多事情變成可能:

  • 透過重播HDFS中的日誌,我們能夠還原任意時間的歷史快照。
  • 可以做拉鍊表,還原每筆記錄的歷史訊息,以便於分析;
  • 當程式出現錯誤是,可以透過回灌(backfill),重新消費訊息,重新形成新的快照。

可以說HDFS中的日誌是很多的事情基礎。

介於Spark原生對parquet支援的很好,Spark SQL能夠對Parquet提供很好的查詢。 UMS落地到HDFS上是保存到Parquet檔案中的。 Parquet的內容是所有log的增刪改資訊以及_ums_id_,_ums_ts_都會存下來。

Wormhole spark streaming根據namespace 將資料分佈儲存到不同的目錄中,即不同的表和版本放在不同目錄中。

即時抽取和基於日誌的資料同步一致性

#由於每次寫的Parquet都是小文件,大家知道HDFS對於小文件效能並不好,因此另外還有一個job,每天定時將這些的Parquet文件進行合併成大文件。

每個Parquet檔案目錄都帶有檔案資料的起始時間和結束時間。這樣在回灌資料時,可以根據選取的時間範圍來決定需要讀取哪些Parquet文件,不必讀取全部資料。

插入或更新資料的冪等性

常常我們遇到的需求是,將資料經過加工落地到資料庫或HBase。那麼這裡牽涉到的一個問題就是,什麼樣的資料可以被更新到資料?

這裡最重要的一個原則就是資料的冪等性。

無論是遇到增刪改任何的數據,我們面臨的問題都是:

  1. 該更新哪一行;
  2. 更新的策略是什麼。

對於第一個問題,其實就需要定位資料要找一個唯一的鍵,常見的有:

  1. 使用業務庫的主鍵;
  2. 由業務方指定幾個欄位做聯合唯一索引;

對於第二個問題,就涉及到_ums_id_了,因為我們已經保證了_ums_id_大的值更新,因此在找到對應資料行後,根據這個原則來進行替換更新。

即時抽取和基於日誌的資料同步一致性

#之所以要軟刪除和加入_is_active_列,是為了這樣一種情況:

如果已經插入的_ums_id_比較大,是刪除的資料(表示這個資料已經刪除了), 如果不是軟刪除,此時插入一個_ums_id_小的資料(舊資料),就會真的插入進去。

這就導致舊資料被插入了。不冪等了。所以被刪除的資料依然保留(軟刪除)是有價值的,它能被用來保證資料的冪等性。

HBase的保存

#插入資料到Hbase中,相當要簡單一些。不同的是HBase可以保留多個版本的資料(當然也可以只保留一個版本)預設是保留3個版本;

因此插入資料到HBase,需要解決的問題是:

  1. 選擇適合的rowkey:Rowkey的設計是可以選的,使用者可以選擇來源表的主鍵,也可以選擇若干列做聯合主鍵。
  2. 選擇適當的version:使用_ums_id_ 較大的偏移量(例如100億) 作為row的version。

Version的選擇很有意思,利用_ums_id_的唯一性和自增性,與version本身的比較關係一致:即version較大等價於_ums_id_較大,對應的版本較新。

從提高效能的角度,我們可以將整個Spark Streaming的Dataset集合直接插入HBase,不需要比較。讓HBase基於version自動替我們判斷哪些資料可以保留,哪些資料不需要保留。

Jdbc的插入資料:

插入資料到資料庫中,保證冪等的原理雖然簡單,要想提高效能在實作上就變得複雜很多,總不能一條一條的比較然後在插入或更新。

我們知道Spark的RDD/dataset都是以集合的方式來操作以提高效能,同樣的我們需要以集合運算的方式實現冪等性。

具體思路是:

  1. 先根據集合中的主鍵到目標資料庫中查詢,得到一個已有資料集合;
  2. 與dataset中的集合比較,分出兩類:

A:不存在的數據,也就是這部分數據insert就可以;

B:存在的數據,比較_ums_id_, 最終只將哪些_ums_id_更新較大row到目標資料庫,小的直接拋棄。

使用Spark的同學都知道,RDD/dataset都是可以partition的,可以使用多個worker並進行操作以提高效率。

在考慮並發情況下,插入和更新都可能出現失敗,那麼還有考慮失敗後的策略。

例如:因為別的worker已經插入,那麼因為唯一性約束插入失敗,那麼需要改為更新,還要比較_ums_id_看是否能夠更新。

對於無法插入其他情況(例如目標系統有問題),Wormhole還有重試機制。說起來細節特別多。這裡就不多介紹了。

有些還在開發中。

插入到其他儲存中的就不多介紹了,總的原則是:根據各自儲存自身特性,設計基於集合的,並發的插入資料實作。這些都是Wormhole為了效能而做的努力,使用Wormhole的使用者不必關心 。

五、運用案例
即時行銷

說了那麼多,DWS有什麼實際運用呢?下面我來介紹某系統使用DWS實現了的即時行銷。

即時抽取和基於日誌的資料同步一致性

#如上圖所示:

系統A的資料都保存到自己的資料庫中,我們知道,宜信提供許多金融服務,其中包括借款,而藉款過程中很重要的就是信用審核。

借款人需要提供證明具有信用價值的信息,例如央行徵信報告,是具有最強信用數據的數據。而銀行流水,網購流水也是具有較強的信用屬性的資料。

借款人透過Web或手機APP在系統A中填寫信用資訊時,可能會某些原因無法繼續,雖然可能這個借款人是一個優質潛在客戶,但以前由於無法或很久才能知道這個訊息,所以實際上這樣的客戶是流失了。

應用了DWS以後,借款人已經填寫的資訊已經記錄到資料庫中,並透過DWS即時的進行抽取、計算和落地到目標庫中。根據對客戶的評分,評價出優質客戶。然後立刻將這個客戶的資訊輸出到客服系統中。

客服人員在很短的時間(幾分鐘以內)就透過打電話的方式聯繫上這個借款人(潛客),進行客戶關懷,將這個潛客轉換為真正的客戶。我們知道借款有時是效性的,如果時間太久就沒有價值了。

如果沒有即時抽取/計算/落庫的能力,那麼這一切都無法實現。

即時報表系統

#另外一個即時報表的應用如下:

即時抽取和基於日誌的資料同步一致性

#我們資料使用方的資料來自多個系統,以前是透過T 1的方式獲得報表信息,然後指導第二天的運營,這樣時效性很差。

透過DWS,將資料從多個系統中即時抽取,計算和落地,並提供報表展示,使得營運可以及時做出部署和調整,快速應對。

六、總結

#說了那麼多,大致總結一下:

  • DWS技術上基於主流即時串流大數據技術框架,高可用大吞吐強水平擴容,低延遲高容錯最終一致。
  • DWS能力上支援異質多源多目標系統,支援多資料格式(結構化半結構化非結構化資料)和即時技術能力。
  • DWS將三個子專案合併作為一個平台推出,使得我們具備了即時的能力, 驅動各種即時場景應用。

適合情境包括:即時同步/即時運算/即時監控/即時報表/即時分析/即時洞察/即時管理/即時營運/即時決策

感謝大家的聆聽,這次分享到此為止。

Q&A

Q1:Oracle log reader有開源方案嗎?

A1:對於Oracle業界也有許多商業解決方案,例如:Oracle GoldenGate(原來的goldengate), Oracle Xstream, IBM InfoSphere Change Data Capture(原來的DataMirror),Dell SharePlex (原來的Quest ),國內的DSG superSync等,開源的方案好用的很少。

Q2:這個專案投入了多少人力物力?感覺有點複雜。

Q2:DWS是三個子項目組成,平均每個項目5~7人。是有點複雜,其實也是試圖用大數據技術來解決我們公司目前遇到的困難。

因為是搞大數據相關技術,所有團隊裡面的兄弟姊妹都還是比較happy的:)

其實這裡面,Dbus和Wormhole相對固定模式化,容易輕鬆重複使用。 Swifts即時運算是與每個業務相關比較大的,自訂比較強,相對比較麻煩一些。

Q3:宜信的這個DWS系統會開源麼?

A3:我們也考慮過向社群貢獻,就像宜信的其他開源專案一樣,目前專案剛剛成形,還有待進一步磨練,我相信未來的某個時候,我們會給它開源出來。

Q4:架構師怎麼理解,是不是系統工程師?

A4:不是系統工程師,在我們宜信有多位架構師,應該算是以技術驅動業務的技術管理人員。包含產品設計,技術管理等。

Q5:複製方案是否為OGG?

A5:OGG與上面提到的其他商業解決方案都是可選方案。

文章來源:DBAplus社群(dbaplus)

以上是即時抽取和基於日誌的資料同步一致性的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:linuxprobe.com。如有侵權,請聯絡admin@php.cn刪除