kafkaコンテナに別コンテナからアクセスする

前回はdocker-composeを使ってkafkaの検証環境を構築する方法を記載しました。今回は別のコンテナからkafkaにメッセージを送信したり取得したりする方法を記載します。Dockerのネットワークを作成して、複数のdocker-compose環境で共有することでコンテナ間の通信を可能にします。また、kafka-dockerではkafka(厳密にはkafkaのbroker)のポートが固定されず、外部から利用するには不向きなのでポート範囲を指定することで対応します。



Dockerコンテナ間の通信について

異なるDockerコンテナで通信を行うためにはコンテナが同じネットワークに参加している必要があります。昔はlinkさせる方法があったようですが、現在は非推奨です。ネットワークを作ってそのネットワークを使うよう設定するだけなので、同じネットワークに参加させるほうが管理が楽だと思います。

ネットワークの生成

ネットワークを作成する前に、現在存在するネットワークを見てみましょう。

docker network ls

NETWORK ID          NAME                           DRIVER              SCOPE
f9bd73b04794        bridge                         bridge              local
9c6af1fa80b1        host                           host                local
44a475555828        none                           null                local

この例だと3つのネットワークが存在しています。bridgeがデフォルトで使われるネットワークなのでbridgeに参加させるのも良いですが、せっかくなので新しくネットワークを作って利用します。

※docker-composeコマンドでコンテナを起動したことがある場合は、デフォルトでbridge以外のネットワークが作られて、composeで管理されているコンテナを参加させます。今回はkafkaを作るためのdocker-composeと疑似センサを作るためのdocker-composeを分けるのでdocker-composeにネットワークを作らせず、予め作ったネットワークに参加させます。

ネットワークは下記コマンドで作成することができます。この記事では今後もこの名称を使いますが、sample_docker_networkは任意の名称に置き換えてください。
前回記事にしたとおりkafkaの起動にIPアドレスが必要なのでついでに今回作成したネットワークで使うIPを確認しておきます。jqコマンドが無い場合はパイプ以前を実行すればjsonで出力されるのでそこから確認してください。

docker network create -d bridge sample_docker_network

docker network inspect sample_docker_network | jq -r ".[].IPAM.Config[].Gateway"

docker-compose で使用するネットワークを指定する

既にネットワークを作成したのでそのネットワークを使用する用に設定していきます。また、kafkaにプログラムで接続する際にポートを指定する必要があるのでポートの範囲も制限しておきます。kafka接続の際は指定されたポートのいずれかで接続できればOKなので、予め範囲を決めておいて、その全てを指定することで確実に接続できます。
kafka-docker の docker-composeを下記のように修正します。ネットワークの指定とkafkaコンテナが使えるポート範囲の指定、KAFKA_ADVERTISED_HOST_NAMEが変更内容になります。

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:  # ネットワークを指定
      - sample_docker_network  # 先程作ったもの
  kafka:
    build: .
    ports:
      - 10090-10099:9092  # ポート範囲下限-上限で範囲を制限できる
    networks:  # ネットワークを指定
      - sample_docker_network  # 先程作ったもの
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 172.26.0.1  # 先程確認したIP
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
networks:  # servicesと同じ階層にも記載が必要
  sample_docker_network:  # 先程作ったもの
    external: true  # コンテナ外部から利用するために必要

※ポート範囲を制限しているのでこの例だと10個までしかスケールできません
※apt-getでdocker-composeを入れている場合ポート範囲の制限をするとコンテナが起動できないので、apt-get remove して公式を参考に入れ直してください。

以上で準備完了です。前回作成したコンテナが残っていると恐らく起動に失敗すると思うので停止して削除してから起動してください。前回作成したTopicが失われるので再度作成しておかないと後述のプログラム実行時にエラーになります。

kafkaを利用する別コンテナの作成

先程と同じようにdocker-composeにネットワークを指定することでコンテナ間の通信を可能にします。kafka-dockerとは別ディレクトリに下記ファイルを用意します。プログラム的にはkafkaのポートとして取りうる全てを列挙するようにした以外は前回と同様です。

【Dockerfile】
FROM python:3.7.5-slim

RUN pip install kafka-python
RUN mkdir /work
COPY ./app/ /work/ # ./appのファイルは後述
【docker-compose.yml】
version: '2'
services:
  sensor:
    build: .
    ports:
      - "20001"
    networks:
      - sample_docker_network
networks:
  sample_docker_network:
    external: true
【app/consumer.py】
from kafka import KafkaConsumer
from json import loads

KAFKA_IP = '172.26.0.1'  # ネットワークのGateway
KAFKA_PORT_MIN = 10090  # kafkaのポート範囲下限
KAFKA_PORT_RANGE = 10  # kafkaポート範囲

# 172.26.0.1:10090, ~, 172.26.0.1:10099 とkafkaポートが取りうる全てを列挙
bootstraps = [
    KAFKA_IP + ':' + str(KAFKA_PORT_MIN + i) for i in range(KAFKA_PORT_RANGE)
]
consumer = KafkaConsumer(
   'test',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group-1',
    value_deserializer=lambda m: loads(m.decode('utf-8')),
    bootstrap_servers=bootstraps
)

for m in consumer:
    print(m.value)
【app/producer.py】
from kafka import KafkaProducer
from json import dumps
from time import sleep

KAFKA_IP = '172.26.0.1'  # ネットワークのGateway
KAFKA_PORT_MIN = 10090  # kafkaのポート範囲下限
KAFKA_PORT_RANGE = 10  # kafkaポート範囲

# 172.26.0.1:10090, ~, 172.26.0.1:10099 とkafkaポートが取りうる全てを列挙
bootstraps = [
    KAFKA_IP + ':' + str(KAFKA_PORT_MIN + i) for i in range(KAFKA_PORT_RANGE)
]

producer = KafkaProducer(
   value_serializer=lambda m: dumps(m).encode('utf-8'),
   bootstrap_servers=bootstraps)

producer.send("test", value={"hello": "producer"}).get(timeout=10)
for i in range(100):
    tmp_value = {'id': i}
    producer.send('test', value=tmp_value).get(timeout=10)
    sleep(1)

これらのファイルが用意できたらコンテナを起動して、/work に配置されたpythonファイルを実行すればkafkaにメッセージを送信したりメッセージを取得したりできるはずです。

長かったのでまとめ

コンテナ間通信について

Dockerコンテナ間の通信は同一ネットワークに参加させることで可能になります。docker-composeでサービスごとに利用ネットワークを指定することが可能なので、通信が必要な場合は同一ネットワークになるよう指定しましょう。

コンテナのポート範囲について

ポート範囲下限-上限でコンテナのポート範囲を指定することが可能です。予めポート範囲を決めておけばkafkaへの接続時に全指定してしまえば実際にどのポートを使っていても接続可能です。ポート範囲を指定してdocker-compose up できない場合はdocker-composeを最新にしましょう。apt-getだとNG(将来的には大丈夫でしょうけど)


カテゴリー:AnalyticsPf,docker,python

Output不足なエンジニア

統計が好きになれず、機械学習やったら必然的に統計が必要になるだろうと思ったら想像以上に機械学習にハマる。数学は芸術なので商売にするつもりはないけど、DeepLearningは数学じゃないし商売にしたいと思っているところ。画像処理がメイン。