第8章. Kafkaストリームでストリームを結合する
この作品はAIを使って翻訳されている。ご意見、ご感想をお待ちしている:translation-feedback@oreilly.com
現在、orders ストリームには商品の詳細情報は含まれていないが、前章では、Debeziumをセットアップして、MySQLのproducts テーブルへの変更をキャプチャし、products ストリームに書き込むようにした。この章では、ストリームプロセッサを使用して、orders ストリームとproducts ストリームを結合する方法を学ぶ。新しいストリームをApache Pinotに入れ、ダッシュボードを売れ筋商品とカテゴリーで更新する。
Kafkaストリームで注文を充実させる
第4章では、Kafka Streamsを使用して、orders ストリームに対するウィンドウ集約を作成し、直近数分間の注文数と売上を計算できるようにした。このセクションでは、orders ストリームに含まれるすべての注文項目を、products ストリームの詳細とともに格納する、enriched-order-items という新しいストリームを作成する。図8-1に、これから作成するストリームの詳細を示す。
図8-1. Kafka Streamsを使用したエンリッチド・オーダーのアーキテクチャ
図8-1をさらに分解して、図8-2に示すように、Kafka Streamsグラフに存在するさまざまなプロセッサを視覚化することができる。
図8-2. Kafkaストリームプロセッサ
さて、これから構築するものの概要がわかったので、早速取りかかろう。例8-1に示すように、関心のある製品の詳細は、payload プロパティの下にある。
例8-1. products ストリームのメッセージ
{"payload":{"before":null,"after":{"id":1,"name":"Moroccan Spice Pasta Pizza - Veg","description":"A pizza with...","category":"veg pizzas","price":335,"image":"https://oreil.ly/LCGSv","created_at":"2022-12-05T16:56:02Z","updated_at":1670259362000}}}
このevent のpayload.after.id は、あるオーダーのitems のproductId にマッピングされる。payload.before 以下はすべて無視することにする。なぜなら、我々は更新されたオーダーにしか興味がなく、どのような変更がなされたかには興味がないからだ。
これらのメッセージをアンパックするために、Debeziumのシリアライズ/デシリアライズコード。pom.xmlファイルに以下の依存関係を追加する:
<dependencies> ...