投稿者

インターシステムズジャパン
記事 Toshihiko Minamoto · 4月 23 15m read

InterSystems IRIS Interoperability(相互運用性)によるデータストリーミング

現代のデータアーキテクチャでは、リアルタイムのデータ収集、データ変換、データ移動、データロードのソリューションを活用し、データレイク、分析用倉庫、ビッグデータリポジトリを構築しております。様々なソースからのデータを、それらを利用する操作に影響を及ぼすことなく分析することを可能にします。これを実現するためには、継続的、拡張的、弾力的、かつ堅牢なデータフローを確立することが不可欠です。そのための最も一般的な方法は、CDC(変更データキャプチャ) 技術によるものです。CDCは小さなデータセットの生成を監視し、このデータを自動的に収集して、分析用データリポジトリを含む1つ以上の受信先に配信します。主な利点は、分析におけるD+1(データ生成の翌日)の遅延が解消される点です。データは生成されるとすぐにソースで検知され、その後、対象の宛先へ複製されるためです。

本記事では、CDCシナリオにおいて最もよく使用される2つのデータソース(データソースおよびデータ宛先として)についてご説明いたします。データソース(元)としては、SQLデータベースおよびCSVファイルにおけるCDCの活用方法について探ってまいります。データ宛先として、カラム型データベース(一般的な高性能分析データベースのシナリオ)とKafkaトピック(クラウドおよび/または複数のリアルタイムデータコンシューマーへストリーミングデータを送信する標準的なアプローチ)を使用いたします。

 

概要

本記事では、以下の相互運用性シナリオに対するサンプルを提示いたします:

 

 

  1. SQLCDCAdapterは、SQLInboundAdapterを利用してSQLデータベース内の新規レコードを監視し、JDBC接続とSQL言語を用いてそれらを抽出いたします。
  2. SQLCDCAdapterは、収集したデータをメソッド内にてカプセル化し、CDCProcess(BPL表記を用いたビジネスプロセス)へ配信いたします。
  3. CDCプロセスはSQLデータをメッセージとして受信し、SQL操作を用いてデータをIRISに永続化するとともに、Kafka操作を用いて取得したデータをKafkaトピックへ送信いたします。
  4. SQLOperationは、メッセージデータを列指向ストレージとしてモデル化されたInterSystems IRIS持続性クラスに永続化いたします。コラム指向ストレージは、分析データに対して優れたクエリ・パフォーマンスを提供するオプションです。
  5. KafkaオペレーションはメッセージをJSON形式に変換し、Kafkaトピックへ送信します。そこではクラウドデータレイクやその他のサブスクライバーがメッセージを消費することができます。
  6. これらのデータフローはリアルタイムで実行され、継続的なデータフローを確立します。
  7. BAMサービスは、縦欄式のテーブルからビジネス指標をリアルタイムで算出いたします。
  8. BIダッシュボードは、結果として得られたビジネス指標をユーザーにリアルタイムで表示します。

サンプルのインストール

iris-cdc-sample (https://openexchange.intersystems.com/package/iris-cdc-sample)は、上記で説明したシナリオを実装したサンプルアプリケーションです。インストールするには、以下のステップに従って行います:

1.リポジトリを任意のローカルディレクトリにクローンまたはgit pullします:

$ git clone https://github.com/yurimarx/iris-cdc-sample.git

2. このディレクトリでターミナルを開き、下記のコマンドを実行いたします:

$ docker-compose build

3. プロジェクトでIRISコンテナを実行します:

$ docker-compose up -d

サンプルコンポーネント

このサンプルでは、以下のコンテナが使用されています:

  • IRIS:InterSystems IRISプラットフォーム(以下を含む):

    • IRISカラム型データベース(収集したデータストレージ用)。
    • IRISの相互運用性により、CDC(変更データキャプチャ)プロセスを実行するプロダクション環境を構築します。プロダクション環境では外部データベース(PostgreSQL)からデータを収集し、IRISに永続化するとともに、Kafkaトピックへも送信します。
    • IRIS BAM(ビジネス・アクティビティ監視)により、製品別のリアルタイム売上指標を算出し、ダッシュボードに表示いたします。
  • salesdb:リアルタイムで収集される販売データを格納するPostgreSQLデータベース。

  • zookeeper:Kafkaブローカーを管理するために使用されるサービスです。

  • kafka:販売トピックを持つKafkaブローカーは、販売データをリアルタイムイベントとして受信し配布するために利用されます。

  • kafka-ui:トピックおよびイベントの管理と操作のためのKafka Webインターフェースです。

services:
  iris:
    build:
      context: .
      dockerfile: Dockerfile
    restart: always
    command: --check-caps false --ISCAgent false
    ports:
      - 1972
      - 52795:52773
      - 53773
    volumes:
      - ./:/home/irisowner/dev/
    networks:
      - cdc-network
  salesdb:
    image: postgres:14-alpine
    container_name: sales_db
    restart: always
    environment:
      POSTGRES_USER: sales_user
      POSTGRES_PASSWORD: welcome1
      POSTGRES_DB: sales_db
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
      - postgres_data:/var/lib/postgresql/data/
    ports:
      - "5433:5432"
    networks:
      - cdc-network
zookeeper:

image: confluentinc/cp-zookeeper:7.5.0

container_name: zookeeper

hostname: zookeeper

networks:

- cdc-network

ports:

- "2181:2181"

environment:

ZOOKEEPER_CLIENT_PORT: 2181

ZOOKEEPER_TICK_TIME: 2000

kafka:

image: confluentinc/cp-kafka:7.5.0

container_name: kafka

hostname: kafka

networks:

- cdc-network

ports:

- "9092:9092"

depends_on:

- zookeeper

environment:

KAFKA_BROKER_ID: 1

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092

KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-ui:

image: provectuslabs/kafka-ui:latest

container_name: kafka-ui

hostname: kafka-ui

networks:

- cdc-network

ports:

- "8080:8080"

depends_on:

- kafka

environment:

KAFKA_CLUSTERS_0_NAME: local_kafka_cluster

KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

volumes:

postgres_data:

driver: local

networks: cdc-network: driver: bridge  

 

カラム型テーブルの作成

縦欄式のテーブルは、以下のような正規化されていないデータを格納するために使用されます:

     
製品名 店舗名 販売価格(価格)
オレンジ 店舗1 120
オレンジ  店舗1 200
バナナ 店舗2 100
バナナ 店舗1 120
オレンジ 店舗2 110

製品名および店舗名の値は反復的に使用されるため、データを縦欄式(行ではなく列単位)で保存することで、ストレージスペースを節約し、優れたデータ検索パフォーマンスを実現します。従来、このタイプのデータ処理にはBIキューブの作成が必要でした。しかし、カラム型ストレージはこの課題を解決し、操作データからキューブへの複製作業を不要にします。 以下に、サンプルにおける売上縦欄式のテーブルを作成する方法を示します:

1. dc.cdcパッケージ内に、新しいSales ObjectScriptクラスを作成します。

2.以下のソースコードを記述します:

Class dc.cdc.Sales Extends %Persistent [ DdlAllowed, Final ]
{

Parameter STORAGEDEFAULT = "columnar"; Parameter USEEXTENTSET = 1; Property ProductName As %String; Property StoreName As %String; Property SalesValue As %Double; }  

 

3. STORAGEDEFAULT パラメータは「columnar」に設定することで、dc_cdc.Sales テーブルが従来行単位の形式ではなく、カラム型ストレージを確実に使用します。

収集したデータを活用するためのビジネス操作の作成  

SalesSqlServiceを使用して販売データをStreamContainerに収集した後(実装は不要です。「Doing CDC」セクションの操作環境セットアップで設定を終了)、ビジネス操作によりStreamContainerをプロセスし、PostgreSQLから販売データを抽出し、販売テーブルに保存する必要があります。以下の手順を遂行します: 1. dc.cdcパッケージ内にSalesOperationクラスを作成します。 2. 以下のソースコードを記述します:

Class dc.cdc.SalesOperation Extends Ens.BusinessOperation
{
Method ProcessSalesData(pRequest As Ens.StreamContainer, Output pResponse As Ens.StringResponse) As %Status

{

Set tSC = $$$OK

Set pResponse = ##class(Ens.StringResponse).%New()
<span class="hljs-keyword">Try</span> {
    
    <span class="hljs-keyword">Set</span> tStream = pRequest.Stream
    
    <span class="hljs-keyword">Do</span> tStream.Rewind()

    <span class="hljs-keyword">Set</span> content = <span class="hljs-string">""</span>
    <span class="hljs-keyword">While</span> 'tStream.AtEnd {
        <span class="hljs-keyword">Set</span> content = content _ tStream.<span class="hljs-keyword">Read</span>(<span class="hljs-number">4096</span>) 
    }

    <span class="hljs-keyword">Set</span> tDynamicObject = {}.<span class="hljs-built_in">%FromJSON</span>(content)
    
    <span class="hljs-keyword">Set</span> sales = <span class="hljs-keyword">##class</span>(dc.cdc.Sales).<span class="hljs-built_in">%New</span>()
    <span class="hljs-keyword">Set</span> sales.ProductName = tDynamicObject.<span class="hljs-string">"product_name"</span>
    <span class="hljs-keyword">Set</span> sales.StoreName = tDynamicObject.<span class="hljs-string">"store_name"</span>
    <span class="hljs-keyword">Set</span> sales.SalesValue = tDynamicObject.<span class="hljs-string">"sales_value"</span>
    <span class="hljs-keyword">Set</span> tSC = sales.<span class="hljs-built_in">%Save</span>()

    <span class="hljs-keyword">Set</span> pResponse.StringValue = tDynamicObject.<span class="hljs-built_in">%ToJSON</span>()
    
} <span class="hljs-keyword">Catch</span> (ex) {
    <span class="hljs-keyword">Set</span> tSC = ex.AsStatus()
    <span class="hljs-keyword">Set</span> pResponse.StringValue = <span class="hljs-string">"Error while saving sales data!"</span>
    <span class="hljs-built_in">$$$LOGERROR</span>(<span class="hljs-string">"Error while saving sales data: "</span> _ ex.DisplayString())
}

<span class="hljs-keyword">Quit</span> tSC
}
XData MessageMap

{

<MapItems>

<MapItem MessageType="Ens.StreamContainer">

<Method>ProcessSalesData</Method>

</MapItem>

</MapItems>

}

}  

 

3. ProcessSalesData方法は、MessageMapの定義により、StreamContainer型のメッセージを受信します。

4. この方法は、JSON文字列として取得された販売データを読み取り、そのJSONをDynamicObjectにロードし、Salesオブジェクトを作成し、そのプロパティ値を設定し、それをSalesテーブルに保存します。

5. 最後に、この方法はレスポンス内の販売データを表すJSON文字列を返します。

 

販売モニタリングのためのBAMサービスの作成

InterSystems IRIS for InteroperabilityにはBAM機能が含まれており、アナリティクスダッシュボードを使用して操作環境でリアルタイムに処理されるビジネスデータを監視することができるようになります。BAMサービスを作成するには、以下の手順に従って取得します: 1. dc.cdc パッケージ内に、Ens.BusinessMetric を継承する SalesMetric という新しいクラスを作成します。 2. 以下のソースコードを記述します:

Class dc.cdc.SalesMetric Extends Ens.BusinessMetric
{
Property TotalSales As Ens.DataType.Metric(UNITS = "$US") [ MultiDimensional ];

Query MetricInstances() As %SQLQuery

{

SELECT distinct(ProductName) FROM dc_cdc.Sales

}
/// このクラスの指標セットを計算し、更新すること

Method OnCalculateMetrics() As %Status

{

Set product = ..%Instance

Set SalesSum = 0.0

&sql(

select sum(SalesValue) into :SalesSum from dc_cdc.Sales where ProductName = :product

)
<span class="hljs-keyword">Set</span> <span class="hljs-built_in">..TotalSales</span> = SalesSum

<span class="hljs-keyword">Quit</span> <span class="hljs-built_in">$$$OK</span>
}

}  

 

3. TotalSalesプロパティにより、製品ごとの売上合計をリアルタイムで監視することが可能となります。 4. MetricInstancesクエリは、監視対象となる製品を定義します。 5. OnCalculateMetrics方法は、各製品ごとの売上高の合計を計算します。 6. このクラスはダッシュボードにおいて、製品別の総売上高をリアルタイムで生成するために活用されます。

CDC - 変更データキャプチャプロセスの実行とプロダクション

必要なすべてのETL(抽出、変換、ロード)プロセスを含む最終的なプロダクションダイアグラムは、以下のとおりです:

 

次の手順に従ってください。

1. CDCプロダクションへ移動します:http://localhost:52795/csp/user/EnsPortal.ProductionConfig.zen?PRODUCTION=dc.cdc.CDCProduction 2. 新しいEnsLib.JavaGateway.ServiceをJavaという名で作成します(SalesSqlServiceに必要です)。 3. SalesSqlService(SQLCDCService)というビジネスサービスを生成し、以下のパラメータを設定します:

a. DSN(PostgreSQLの接続文字列):jdbc:postgresql://sales_db:5432/sales_db. b.認証情報:PostgreSQLにアクセスするためのユーザー名(sales_user)とパスワード(welcome1)でpg_credを作成します。 c. ターゲト設定名:SalesProcess(CDCプロセス) d. クエリ(消費されるデータを選択するために):select * from sales。 e. キースフィールド名(IRISが既に収集された行を追跡するために使用する列):id。 f. Java Gateway Service(CDCアダプターがJDBCを使用するため必要です):Java(本環境ではJava Gatewayを使用します)。 g. JDBCドライバー: org.postgresql.Driver。 h. JDBCクラスパス(PostgreSQLと接続するためのドライバーで、Dockerfileスクリプト経由でコピーされます): /home/irisowner/dev/postgresql-42.7.8.jar.  

4. 新しいdc.cdc.SalesMetricオブジェクトを作成し、SalesMetricと命名します。 5. 以下のパラメータで新しいEnsLib.Kafka.Operationオブジェクトを生成し、SalesKafkaOperation(Kafka操作)と命名します:

a. ClientID: iris b. サーバー: kafka:9092

6. SalesOperation という名前の新しい dc.cdc.SalesOperation を構築します。 7. SalesProcessという名前のビジネスプロセスを開発します。BPLの実装ロジックは以下の通りとします:

a. 最後のダイアグラム:

 

b. 2つのコンテキストプロパティを作成します:

i. Sales(型:Ens.StringResponse): 販売データをJSON文字列として保存します。 ii. KafkaMessage(型:EnsLib.Kafka.Message): で収集されたデータをKafkaトピック「sales-topic」へ送信するために使用します。

c.コールを生成し、販売テーブルに保存し、以下を設定します:

i. 対象:SalesOperation ii. リクエストメッセージクラス:Ens.StreamContainer(データはストリームとして収集されます) iii. リクエストアクション:

 

iv. レスポンスメッセージクラス: Ens.StringResponse(ストリームは、収集されたデータのJSON文字列表現に変換されます)

v. レスポンスアクション:

 

d. コードブロックを作成し、ObjectScriptコードを記述します。このコードは、販売データをイベントとしてKafkaブローカーのsales-topicに公開するために必要なプロパティをKafkaメッセージに設定します(JSON文字列として):

Set context.KafkaMessage.topic = "sales-topic"
 Set context.KafkaMessage.value = context.Sales.StringValue 
 Set context.KafkaMessage.key = "iris"

e. 「Kafka Sales Topic に送信」というコールを設計し、以下を設計します。

i. 対象:SalesKafkaOperation ii. リクエストメッセージクラス:%Library.Persistent (KafkaMessage is Persistent) iii. リクエストアクション:

 

f. 「Send Response」を名前とするアサインを作成し、以下の内容で設定します:

i. プロパティ: response.StringValue ii. 値: 「Process finished!(処理が完了しました!)」

CDCの結果を示すこと

CDCProductionを有効にした後、データベース管理ツール(DBeaverまたはPgAdmin)を使用してPostgreSQLのsalesテーブルにいくつかのレコードを登録し、プロダクションメッセージの結果をご確認しんます。

 

シーケンス・ダイアグラムを参照し、CDCプロセスを理解します(任意のメッセージヘッダーリンクをクリックします):

 

アナリティクスダッシュボードにおけるBAMモニタリングの表示

リアルタイムでデータを収集する際には、当然ながらダッシュボードで結果を即座に確認します。以下の手順で実現できます: 1. アナリティクス > ユーザーポータルへ移動します:

 

2. 「ダッシュボードを追加」をクリックします:

 

3. 以下のプロパティを設定し、「OK」をクリックします:

a. フォルダ:Ens/Analytics b. ダッシュボードの名前:Sales BAM

4. ウィジェットをタップします:

 

5. プラスボタンをクリックします:

 

6. 以下の通りウィジェットを設定します:  

7. 全体ダッシュボード領域を覆うよう、新しいウィジェットを調整します。

8. それでは、WidgetSalesを選択します:

9. コントロールを選択します。   

10. プラスボタンをクリックします:   

11. 以下の図のようにコントロールを設定します(リアルタイムで総売上高を表示し、自動更新されます):

12. 新しい値が収集されるたびに、ダッシュボードでは総売上高の更新値が即座に表示されます。

詳細については:

InterSystemsのドキュメントは、CDC、BAM、Kafka、および相互運用性に関するプロダクション環境について、より深く理解するお手伝いをいたします。詳細については、以下のページをご覧ください:

  1. BAM: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EGIN_options#EGIN_options_bam
  2. Kafka: https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ITECHREF_kafka
  3. SQLアダプター(SQLテーブル向けCDC):https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=ESQL_intro
  4. ETL/CDCプロダクションの構築:https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=PAGE_interop_languages
  5. BPL(ビジュアルローコードビジネスプロセス):https://docs.intersystems.com/iris20252/csp/docbook/Doc.View.cls?KEY=EBPL_use
元の記事へ さんが書いた @Yuri Marx