又來了,Kafka 到 Paimon 的這條路...這次加點 Avro
嗯...上次我們是怎麼做的?我想想。喔對,上次是把 IoT 的 JSON 資料,直接丟到 Kafka Topic,然後用 Flink CDC action,把它同步到 Apache Paimon。很直接,但...說真的,那個 JSON 就只是個 byte stream,感覺有點...粗糙。
真實世界的數據流大概也就是長這樣啦。不過,總是會想讓它更好、更快、更優化一點吧?所以...這次我們來加點料。把上次那個 payload,嗯,稍微「拋光」一下。
這次我們打算先用 Avro 來做序列化。這樣做的話,就得在 Confluent Schema Registry 註冊一個 Avro schema,還有 key。聽起來多一個步驟,但其實...嗯,後面會知道為什麼這麼搞。
重點一句話
老實說,從純 JSON 串流換成 Avro 序列化,不只是改個設定而已。你得為了 Debezium-Avro 格式,回頭去改造你的 Python producer 和 JSON 結構,這一步...嗯,花了我最多時間。
為什麼要搞這麼麻煩?Schema Evolution 啊
直接用 JSON 不是不行,但你想想,IoT 設備的資料格式很可能會變。今天多一個感測器讀數,明天加個 GPS 座標。如果用純 JSON,每次變動,下游的 Flink job 可能就掛了,或者你得手動去改 table schema,很煩。
用了 Avro 和 Schema Registry 之後,事情就變得...比較可控。你可以定義 schema 的演進規則。例如,新增一個欄位,只要它有預設值,舊的 consumer 也能讀,不會出錯。這就是所謂的 schema evolution。Paimon 本身也支援 schema evolution,所以這兩者搭配起來,整個管線的彈性就高很多。你看,我們後面會從最簡單的 payload 開始,一步步加上欄位,看看 Flink 和 Paimon 是不是真的能跟上。
我們的資料一樣是從 Python 程式產生的,模擬 IoT 感測器。然後...這次環境也稍微升級了一下。Confluent Kafka、Apache Flink、還有 Apache Paimon 都用了新一點的版本。反正,所有程式碼都在 Git repository 裡,老樣子,一堆 Makefiles 和 Docker-compose。就不細講環境怎麼搭了,README 裡面都有。
怎麼做?痛苦但必要的步驟
好,來講講到底做了什麼。整個過程,最卡關的地方,真的不是 Flink 那端,而是源頭。
第一步,你得先把整個 stack 跑起來。這部分照著 repo 裡的 `README.md` 和 `Makefile` 做就好。`make run`,等一下,再 `make deploy`,然後建 Topic、註冊 schema...那些腳本跑一跑。
最關鍵的改變:改造 Python Producer 和 Payload
這就是那個...嗯,我卡最久的地方。之前,我們的 JSON 很單純:
{
"ts": 123421452622,
"siteId": 1009,
"deviceId": 1042,
"measurement": 1013.3997
}
但為了要配合 Flink 的 Kafka CDC connector 讀取 `debezium-avro` 格式,你不能再這麼任性了。connector 需要一個特定的結構。說到底,這東西其實是 Debezium 定下來的規矩。這也讓我想到,你看 Confluent 的文件,他們會講得很美好。但實務上,像 Jark Wu 他們在 Alibaba Cloud 處理那麼大規模的 Flink,他們會更在意這種格式一致性跟 CDC 的細節。所以...嗯,學到了。
所以,我必須把 Python code (`simulate.py` 裡面) 大改,讓它產生的 JSON 長得像下面這樣:
{
"before": null,
"after": {
"ts": 123421452622,
"siteId": 1009,
"metadata": {
"deviceId": 1042,
"sensorId": 10180,
"unit": "Psi"
},
"measurement": 1013.3997
},
"source": { ... },
"op": "c",
"ts_ms": 166...
}
你看,多了好幾層。最重要的是 `before` 和 `after` 這兩個 block,CDC 就是靠這個來判斷是新增 (`c`reate)、更新 (`u`pdate) 還是刪除 (`d`elete)。因為我們是 append-only,所以 `op` 就固定是 `c`,`before` 是 `null`。
還有一個,`siteId`。因為我把它設定成 Kafka topic 的 key,所以它必須在 payload 的根層級。這也是一個...嗯,試了才知道的眉角。說真的,搞定這個 payload 格式,大概...花了整個過程 80% 的時間吧。最後還是靠 ChatGPT 幫忙拼湊出最後一塊拼圖,哈,工具嘛,能用就用。
Flink Action 指令的變化
搞定了源頭,Flink 這邊的指令也要跟著調整。主要就是多了跟 Kafka、Avro、Schema Registry 相關的參數。
這是最後跑起來的 `flink run` 指令:
/opt/flink/bin/flink run \
/opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \
kafka_sync_table \
-Dpipeline.name='sync-kafka-topic-to-paimon-s3' \
--kafka_conf properties.bootstrap.servers=broker:29092 \
--kafka_conf topic=factory_iot \
--kafka_conf value.format=debezium-avro \
--kafka_conf key.format=debezium-avro \
--kafka_conf key.field=siteId \
--kafka_conf properties.group.id=123456 \
--kafka_conf schema.registry.url=http://schema-registry:9081 \
--kafka_conf scan.startup.mode=earliest-offset \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://metastore:9083 \
--warehouse s3a://warehouse/paimon/ \
--database iot \
--table factory_iot \
--table_conf changelog-producer=input \
--table_conf write-mode=append-only
你看,多了 `--kafka_conf value.format=debezium-avro`、`key.format` 還有 `schema.registry.url`。這些就是告訴 Flink Action:「嘿,Topic 裡面的東西不是普通 JSON 喔,是用 Avro 包裝過的 Debezium 格式,schema 在那個 URL 裡,你自己去拿」。
喔對了,還有一個 `--kafka_conf scan.startup.mode=earliest-offset`。這個...文件上好像沒特別強調,但最好還是加上。就算 Topic 是新的,加上這個比較保險,確保 job 會從最早的 offset 開始讀,不會漏掉資料。
風險與應變...或者說,Dependency Hell
這件事...嗯,真的很煩。我不是 Java 開發者,所以搞清楚那些 JAR 檔要怎麼湊在一起,真的像在探險,隨時都會精神錯亂。為了讓 Avro 和 Schema Registry 能動,`flink/lib` 目錄下要加好幾個 JAR 檔。
我整理了一下,跟上次純 JSON 的版本比,到底多了哪些東西。這應該比看 `getlibs.sh` 清楚。
| 比較項目 | 之前 (純 JSON pipeline) | 現在 (Avro pipeline) |
|---|---|---|
| 核心 JAR |
就...paimon-flink-action.jar,還有 Flink Kafka connector 的基本 JAR。 |
除了基本的,還得加上一堆 Kafka Avro Serializer 和 Schema Registry Client 的 JAR。不然它根本不知道 `debezium-avro` 是什麼鬼。 |
| 設定複雜度 |
很簡單,指定 topic 跟 bootstrap server 就差不多了。 |
多了 `value.format`、`key.format`、`schema.registry.url`...每個都不能錯。 |
| Payload 準備 |
沒什麼好準備的,Python dict 轉 JSON 字串,送出去,收工。 |
這就是大魔王。要手動組出 Debezium 那個 `before`/`after` 結構。超煩。 |
| 除錯難度 |
還好,訊息看不懂頂多就是 JSON 格式錯了。 |
喔...地獄。可能是 Avro schema 不對、Registry 連不上、JAR 檔版本衝突、payload 結構錯...隨便一個都能卡半天。 |
老實說,這邊真的要感謝 Giannis Polyzos 和 Jark Wu。在我卡關的時候,他們提點了幾個「你少了這個、漏了那個」的關鍵,才讓我走出來。這就是開源社群...嗯,好玩的地方吧。
來測試 Schema Evolution 吧
好,痛苦的部分結束了。現在來看看我們這麼辛苦是為了什麼。跑起來之後,Flink job manager 看起來一切正常。
一開始,我們送的是最簡單的 payload (`app_iot1`)。接著,我跑了 `app_iot2`,這個版本會在 metadata 裡面多加 `ts_human`(人類可讀的時間)和一個 `location` 物件(經緯度)。
結果呢?Flink job 完全沒事,Paimon 那邊的 table schema 也自動增加了這幾個欄位。不需要重啟 job,也不需要手動 `ALTER TABLE`。完美。
再來,我們覺得運氣不錯,再跑 `app_iot3`,這次在 metadata 又多加了一個 `deviceType` 欄位。結果一樣,資料無縫接軌地流進去,schema 自動更新。
這就是...嗯,這就是我們一開始選擇 Avro 和 Schema Registry 的回報。前面雖然痛苦,但換來了後面這個彈性。我覺得,這很酷。
所以,這樣值得嗎?
總結一下...我們算是建立了一個能自動處理 schema 變動的資料管線。從 Kafka Topic,流經 Flink,最後用 Apache Parquet 格式存到 Paimon。雖然過程中有不少坑,特別是 payload 格式和 JAR 檔相依性那邊...但看到 schema evolution 能這麼順暢地運作,就覺得,嗯,這半天的折騰還算值得。
希望這次的踩坑紀錄對你有幫助。這種東西,真的是充滿了兔子洞,一不小心就鑽進去出不來了...但那一半的樂趣,不也就是這樣嗎?
換你說說:你在處理 Kafka Schema Registry 或 Debezium 格式時,踩過最深的坑是什麼?是哪個參數或設定,讓你卡了最久才恍然大悟?在下面留言分享一下吧,讓大家少走點冤枉路。
