Dockerを用いたkafka検証環境構築

今年のテーマの一つにリアルタイム処理とかストリーミングデータ処理があります。気象データを毎秒送信するプログラムを擬似センサとして扱って、センサから上がってくるデータをリアルタイムに可視化・異常検知・蓄積して、蓄積したデータを学習して異常検知モデルを継続的に学習させるようなシステム(のサンプル)を作りたいなと思っています。
手始めにkafkaというメッセージキューについて試したので忘れないよう記録に残しておきます。(環境構築で疲れたので大したこと試せてないですが、、、)
githubに用意されているshellを実行するだけでお試しはほぼできるので、一部はPythonでソースを書きながら試してみます。



kafkaの概要

kafkaとはApacheが提供するOSSの分散メッセージキューです。イメージ的にはGCP のPub/Subです。こちらのQiita記事に非常に詳しく情報がまとめられているので詳しくは記事を参照してください。

Topic, Consumer, Producerについて

kafkaは大雑把にこの3つを知っておく必要があります。イメージレベルで記載すると、Topic:情報を書き込む場所、Producer:Topicに書き込む人、Consumer:Topicを読み取る人となります。Consumerは自発的にTopicを見に行くか、Topicに書き込みが発生したらそのデータを転送してもらうことができます。1つのTopicに複数のProducerやConsumerがいても問題ありません。

Consumerが複数いる場合はグループ内の誰かしら1人にメッセージが届くことになるので、分散処理に最適です。センサデータを可視化するシステムと今後の学習用にDBに保存するシステムで別グループを設定することで、それぞれのシステムに全データを送ることができます。システム内で分散したいときはグループ内にConsumerを複数用意すればOKです。

Dockerで構築

docker-composeも必要です。まずはこちらをcloneしてdocker-compose.ymlの「KAFKA_ADVERTISED_HOST_NAME」をdockerが使っているネットワークに合わせて編集します。※よくわからなければ「172.17.0.1」でOK
その後docker-composeでコンテナを立ち上げましょう。せっかくなのでkafka用のコンテナを3つ立ち上げます。

git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker
vim docker-compose.yml

docker-compose up -d --scale kafka=3

kafkaコンテナに入るためのshellも用意されているので先程設定した「KAFKA_ADVERTISED_HOST_NAME」を引数に実行します。普通にdocker run -it で入っても良いと思います。

./start-kafka-shell.sh 172.17.0.1

Topicを作ってみる

Topicを作るためのshellも用意されているので書きを実行して 「test」というTopicを作ってみましょう。docker-composeでコンテナを起動する際に1コンテナで起動していた場合は「–replication-factor」は1としてください。(0かも)可用性のためにレプリするオプションですが1コンテナだとレプリできません。

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test --partitions 4 --replication-factor 2 --bootstrap-server `broker-list.sh`

最後のbroker-list.shでDockerコンテナが使っているポートを確認して渡しています。また、–partitionsはkafkaのTopicに書き込まれたデータをどれだけ分割して保持するかを指定してます。デフォルトは2らしいです。

これでTopicが作られたのでProducerとConsumerを用意してTopicに値を書いたり受け取ったりしてみましょう。

Producerを作ってみる

例によってProducerを作るshellが用意されているので実行します。

$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=`broker-list.sh`

> hello kafka
> can you hear me?

まだConsumerがいないので、ここではTopicにデータが書き込まれるだけで何も起こりません。

Consumerを作ってみる

今までの作業とは別のターミナルを用意して、kafkaコンテナに入りましょう。その後、下記を実行します。(まぁ、先程のProducerをCtrl + Cで止めても良いのですが、せっかくならリアルタイムにメッセージが届くところを確認したいですよね?)

$KAFKA_HOME/bin/kafka-console-consumer.sh --topic=test --from-beginning --bootstrap-server `broker-list.sh`

するとConsumerが作成され、–from-beginningとしているのでConsumerが作成される前のメッセージも読み取り対象となり、先程入力したメッセージが表示されるはずです。このまま前のターミナルに戻ってメッセージを入力するとConsumer側のコンソールにリアルタイムに表示されます。

ConsumerとProducerをPythonで作ってみる

せっかくなのでPythonでkafkaにメッセージを送ったり受け取ったりしてみます。コンテナ内に入ったConsumer用、Producer用コンソールでそれぞれ下記を実行してpythonとライブラリを追加します。

apk add python3
pip3 install kafka-python

Consumer用のソースとProducer用のソースをそれぞれ追加します。

【consumer.py】
from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
   'test',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    # group_idが同じConsumerが複数いるとメッセージが分散して届く
    group_id='realtime-block',
    # 受け取ったデータをデシリアライズする方法を指定することも可能
    # 先程JSON形式じゃないデータを送ってしまったので今回は使わない
    ### value_deserializer=lambda m: loads(m.decode('utf-8')),
    # このIP:PortはKAFKA_ADVERTISED_HOST_NAMEとコンテナに割り当てられたポート
    bootstrap_servers=['172.17.0.1:32768','172.17.0.1:32769','172.17.0.1:32770'])

# これでメッセージが来るまで待ち続け、来たら表示することが可能
for m in consumer:
    print(m.value)

【producer.py】
from kafka import KafkaProducer
from json import dumps
from time import sleep

producer = KafkaProducer(
   value_serializer=lambda m: dumps(m).encode('utf-8'), 
   bootstrap_servers=['172.17.0.1:32768','172.17.0.1:32769','172.17.0.1:32770'])
# producer.send()だけだと何故か送信できなかったので、送信結果をgetする。
# 多分何かしらの設定をすればsend()だけで送れるはず。
producer.send("test", value={"hello": "producer"}).get(timeout=10)
for i in range(100):
    tmp_value = {'id': i}
    producer.send('test', value=tmp_value).get(timeout=10)
    sleep(1)

それぞれ実行することでメッセージのやり取りができていることが確認できます。ProducerはConsumerがどんなグループを持っているかを意識せず、ただ単にTopicに書き込めば良く、Consumer側は好きにグループを追加したり同じグループ内にConsumerを増やして分散して良いのがポイントだと思います。あと、Producerのスクリプトはバックグラウンド実行を利用して複数並列に実行すると楽しいです(笑)。
※なぜproducer.send()だけでは送信できず、producer.send().get()で送信結果を求めないといけないのかは不明です。。。

ちなみに、常にデータを待ち受けるのではなく、好きな量で刈り取ることもできます。上で作ったConsumerとgroup_idを分けることでリアルタイムで待ち受けるConsumerとデータの取り合いを防いでいます。(同じにするとid:1は刈り取り側、id:2はリアルタイム側って片方にしかデータが行かなくなります。はじめ取り合いしているって発想がなくて何が届いているかデバッグする文が入っています。動作確認するには良いかなと思うので残しています。)

from kafka import KafkaConsumer
from json import loads
from time import sleep

consumer = KafkaConsumer(
               'test',
               auto_offset_reset='earliest',
               enable_auto_commit=True,
               group_id='batch-block', # ここをrealtime-blockにすると取り合い
               value_deserializer=lambda m: loads(m.decode('utf-8')),
               bootstrap_servers=['172.17.0.1:32768','172.17.0.1:32769','172.17.0.1:32770'])

id_count = [0 for _ in range(100)]
for i in range(10):
    print('### start {} times loop ###'.format(str(i+1)))
    fetch_data = consumer.poll(timeout_ms=1000, max_records=100)
    if len(fetch_data) == 0:
        print('empty')
        continue
    for key, value in fetch_data.items():
        print(key)
        # print(value)
        print('current fetch data size is :{}'.format(len(value)))
        for cons_data in value:
            if not('id' in cons_data.value):
                continue
            tmp_id = cons_data.value['id']
            id_count[tmp_id] = id_count[tmp_id] + 1
    sleep(5)

print(id_count)

次回はkafka用コンテナの外にConsumer、Producerを作成します。Dockerのネットワークを作成することで別のdocker-composeで立ち上げたコンテナ間の通信を実現することが可能です。


カテゴリー:AnalyticsPf,docker,python

Output不足なエンジニア

統計が好きになれず、機械学習やったら必然的に統計が必要になるだろうと思ったら想像以上に機械学習にハマる。数学は芸術なので商売にするつもりはないけど、DeepLearningは数学じゃないし商売にしたいと思っているところ。画像処理がメイン。