Kafka 串接 Flink CDC 寫入 S3 上的 Apache Paimon:資料流架構實作解析(Part 3)

Published on: | Last updated:

又來了,Kafka 到 Paimon 的這條路...這次加點 Avro

嗯...上次我們是怎麼做的?我想想。喔對,上次是把 IoT 的 JSON 資料,直接丟到 Kafka Topic,然後用 Flink CDC action,把它同步到 Apache Paimon。很直接,但...說真的,那個 JSON 就只是個 byte stream,感覺有點...粗糙。

真實世界的數據流大概也就是長這樣啦。不過,總是會想讓它更好、更快、更優化一點吧?所以...這次我們來加點料。把上次那個 payload,嗯,稍微「拋光」一下。

這次我們打算先用 Avro 來做序列化。這樣做的話,就得在 Confluent Schema Registry 註冊一個 Avro schema,還有 key。聽起來多一個步驟,但其實...嗯,後面會知道為什麼這麼搞。

整個資料流程的樣子,從 Kafka 到 Paimon on S3
整個資料流程的樣子,從 Kafka 到 Paimon on S3

重點一句話

老實說,從純 JSON 串流換成 Avro 序列化,不只是改個設定而已。你得為了 Debezium-Avro 格式,回頭去改造你的 Python producer 和 JSON 結構,這一步...嗯,花了我最多時間。

為什麼要搞這麼麻煩?Schema Evolution

直接用 JSON 不是不行,但你想想,IoT 設備的資料格式很可能會變。今天多一個感測器讀數,明天加個 GPS 座標。如果用純 JSON,每次變動,下游的 Flink job 可能就掛了,或者你得手動去改 table schema,很煩。

用了 Avro 和 Schema Registry 之後,事情就變得...比較可控。你可以定義 schema 的演進規則。例如,新增一個欄位,只要它有預設值,舊的 consumer 也能讀,不會出錯。這就是所謂的 schema evolution。Paimon 本身也支援 schema evolution,所以這兩者搭配起來,整個管線的彈性就高很多。你看,我們後面會從最簡單的 payload 開始,一步步加上欄位,看看 Flink 和 Paimon 是不是真的能跟上。

我們的資料一樣是從 Python 程式產生的,模擬 IoT 感測器。然後...這次環境也稍微升級了一下。Confluent Kafka、Apache Flink、還有 Apache Paimon 都用了新一點的版本。反正,所有程式碼都在 Git repository 裡,老樣子,一堆 Makefiles 和 Docker-compose。就不細講環境怎麼搭了,README 裡面都有。

怎麼做?痛苦但必要的步驟

好,來講講到底做了什麼。整個過程,最卡關的地方,真的不是 Flink 那端,而是源頭。

第一步,你得先把整個 stack 跑起來。這部分照著 repo 裡的 `README.md` 和 `Makefile` 做就好。`make run`,等一下,再 `make deploy`,然後建 Topic、註冊 schema...那些腳本跑一跑。

最關鍵的改變:改造 Python Producer 和 Payload

這就是那個...嗯,我卡最久的地方。之前,我們的 JSON 很單純:

{
    "ts": 123421452622,
    "siteId": 1009,
    "deviceId": 1042,
    "measurement": 1013.3997
}

但為了要配合 Flink 的 Kafka CDC connector 讀取 `debezium-avro` 格式,你不能再這麼任性了。connector 需要一個特定的結構。說到底,這東西其實是 Debezium 定下來的規矩。這也讓我想到,你看 Confluent 的文件,他們會講得很美好。但實務上,像 Jark Wu 他們在 Alibaba Cloud 處理那麼大規模的 Flink,他們會更在意這種格式一致性跟 CDC 的細節。所以...嗯,學到了。

所以,我必須把 Python code (`simulate.py` 裡面) 大改,讓它產生的 JSON 長得像下面這樣:

{
    "before": null,
    "after": {
        "ts": 123421452622,
        "siteId": 1009,
        "metadata": {
            "deviceId": 1042,
            "sensorId": 10180,
            "unit": "Psi"
        },
        "measurement": 1013.3997
    },
    "source": { ... },
    "op": "c",
    "ts_ms": 166...
}

你看,多了好幾層。最重要的是 `before` 和 `after` 這兩個 block,CDC 就是靠這個來判斷是新增 (`c`reate)、更新 (`u`pdate) 還是刪除 (`d`elete)。因為我們是 append-only,所以 `op` 就固定是 `c`,`before` 是 `null`。

還有一個,`siteId`。因為我把它設定成 Kafka topic 的 key,所以它必須在 payload 的根層級。這也是一個...嗯,試了才知道的眉角。說真的,搞定這個 payload 格式,大概...花了整個過程 80% 的時間吧。最後還是靠 ChatGPT 幫忙拼湊出最後一塊拼圖,哈,工具嘛,能用就用。

Flink Action 指令的變化

搞定了源頭,Flink 這邊的指令也要跟著調整。主要就是多了跟 Kafka、Avro、Schema Registry 相關的參數。

這是最後跑起來的 `flink run` 指令:

/opt/flink/bin/flink run \
        /opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \
        kafka_sync_table \
        -Dpipeline.name='sync-kafka-topic-to-paimon-s3' \
        --kafka_conf properties.bootstrap.servers=broker:29092 \
        --kafka_conf topic=factory_iot \
        --kafka_conf value.format=debezium-avro \
        --kafka_conf key.format=debezium-avro \
        --kafka_conf key.field=siteId \
        --kafka_conf properties.group.id=123456 \
        --kafka_conf schema.registry.url=http://schema-registry:9081 \
        --kafka_conf scan.startup.mode=earliest-offset \
        --catalog_conf metastore=hive \
        --catalog_conf uri=thrift://metastore:9083 \
        --warehouse s3a://warehouse/paimon/ \
        --database iot \
        --table factory_iot \
        --table_conf changelog-producer=input \
        --table_conf write-mode=append-only

你看,多了 `--kafka_conf value.format=debezium-avro`、`key.format` 還有 `schema.registry.url`。這些就是告訴 Flink Action:「嘿,Topic 裡面的東西不是普通 JSON 喔,是用 Avro 包裝過的 Debezium 格式,schema 在那個 URL 裡,你自己去拿」。

喔對了,還有一個 `--kafka_conf scan.startup.mode=earliest-offset`。這個...文件上好像沒特別強調,但最好還是加上。就算 Topic 是新的,加上這個比較保險,確保 job 會從最早的 offset 開始讀,不會漏掉資料。

Flink UI 顯示 Job 正在執行
Flink UI 顯示 Job 正在執行

風險與應變...或者說,Dependency Hell

這件事...嗯,真的很煩。我不是 Java 開發者,所以搞清楚那些 JAR 檔要怎麼湊在一起,真的像在探險,隨時都會精神錯亂。為了讓 Avro 和 Schema Registry 能動,`flink/lib` 目錄下要加好幾個 JAR 檔。

我整理了一下,跟上次純 JSON 的版本比,到底多了哪些東西。這應該比看 `getlibs.sh` 清楚。

比較項目 之前 (純 JSON pipeline) 現在 (Avro pipeline)
核心 JAR

就...paimon-flink-action.jar,還有 Flink Kafka connector 的基本 JAR。

除了基本的,還得加上一堆 Kafka Avro Serializer 和 Schema Registry Client 的 JAR。不然它根本不知道 `debezium-avro` 是什麼鬼。

設定複雜度

很簡單,指定 topic 跟 bootstrap server 就差不多了。

多了 `value.format`、`key.format`、`schema.registry.url`...每個都不能錯。

Payload 準備

沒什麼好準備的,Python dict 轉 JSON 字串,送出去,收工。

這就是大魔王。要手動組出 Debezium 那個 `before`/`after` 結構。超煩。

除錯難度

還好,訊息看不懂頂多就是 JSON 格式錯了。

喔...地獄。可能是 Avro schema 不對、Registry 連不上、JAR 檔版本衝突、payload 結構錯...隨便一個都能卡半天。

老實說,這邊真的要感謝 Giannis Polyzos 和 Jark Wu。在我卡關的時候,他們提點了幾個「你少了這個、漏了那個」的關鍵,才讓我走出來。這就是開源社群...嗯,好玩的地方吧。

來測試 Schema Evolution 吧

好,痛苦的部分結束了。現在來看看我們這麼辛苦是為了什麼。跑起來之後,Flink job manager 看起來一切正常。

一開始,我們送的是最簡單的 payload (`app_iot1`)。接著,我跑了 `app_iot2`,這個版本會在 metadata 裡面多加 `ts_human`(人類可讀的時間)和一個 `location` 物件(經緯度)。

Payload 結構演進,從簡單到複雜
Payload 結構演進,從簡單到複雜

結果呢?Flink job 完全沒事,Paimon 那邊的 table schema 也自動增加了這幾個欄位。不需要重啟 job,也不需要手動 `ALTER TABLE`。完美。

再來,我們覺得運氣不錯,再跑 `app_iot3`,這次在 metadata 又多加了一個 `deviceType` 欄位。結果一樣,資料無縫接軌地流進去,schema 自動更新。

這就是...嗯,這就是我們一開始選擇 Avro 和 Schema Registry 的回報。前面雖然痛苦,但換來了後面這個彈性。我覺得,這很酷。

所以,這樣值得嗎?

總結一下...我們算是建立了一個能自動處理 schema 變動的資料管線。從 Kafka Topic,流經 Flink,最後用 Apache Parquet 格式存到 Paimon。雖然過程中有不少坑,特別是 payload 格式和 JAR 檔相依性那邊...但看到 schema evolution 能這麼順暢地運作,就覺得,嗯,這半天的折騰還算值得。

希望這次的踩坑紀錄對你有幫助。這種東西,真的是充滿了兔子洞,一不小心就鑽進去出不來了...但那一半的樂趣,不也就是這樣嗎?

換你說說:你在處理 Kafka Schema Registry 或 Debezium 格式時,踩過最深的坑是什麼?是哪個參數或設定,讓你卡了最久才恍然大悟?在下面留言分享一下吧,讓大家少走點冤枉路。

Related to this topic:

Comments

  1. profile
    Guest 2025-09-14 Reply
    喂,這份CDC和IoT數據流的指南看起來有點複雜耶。不是說好的簡單嗎?感覺像是在炫技,普通工程師真的看得懂嘛?
  2. profile
    Guest 2025-06-11 Reply
    喂,這篇文章看起來有點深奧耶!我家孩子最近也在學資料工程,不過這些技術術語聽起來真的有點嚇人。你們是不是要把機器變聰明啊?
  3. profile
    Guest 2025-06-08 Reply
    業界大咖!這篇CDC技術指南真的很讚啊,尤其是跨平台數據流的設計。Avro序列化確實是IoT領域的遊戲規則改變者,不知道你對Schema Registry有什麼獨特見解嗎?期待聽聽你的專業觀點!
  4. profile
    Guest 2025-05-15 Reply
    嗨!我是一名大學生,最近在研究Kafka和Flink的相關技術。能否分享一些學習資源或實作範例?特別是有關Avro序列化和CDC的部分,我會非常感激!謝謝!
  5. profile
    Guest 2025-05-02 Reply
    這些技術聽起來很有趣!但我有點不明白什麼是CDC和Avro序列化,對孩子的學習會有幫助嗎?還是說這些內容太複雜了呢?
  6. profile
    Guest 2025-04-24 Reply
    這些主題聽起來真的很有趣,但我有點疑惑,關於如何處理模式演變的部分,你們是不是能再深入探討一下?畢竟在實際應用中,這可是個棘手的問題啊!
  7. profile
    Guest 2025-04-23 Reply
    這些文章真的很有幫助!我最近也在學習如何利用IoT數據流,想知道大家對於Kafka和Flink的使用經驗是什麼?感覺這些技術能讓我們的生活變得更便利呢!