以前この記事でKafkaをdocker-composeで構築したときのように、docker-composeを使うとクラスタ数を容易に調整できるのでsparkのような分散処理を試して見るには最適です。元々sparkとkafkaでストリーミング処理を実現するまでを記載するつもりだったのですが、若干長くなりそうだったので今回はSparkクラスタを構築するところまで、次回はPySparkでkafkaのデータをストリーミング処理する方法と分けます。
docker-composeとDockerfileの用意
Sparkを動作させる、クラスタ数を容易に変更するという要望はこのリポジトリで満たせます(他だとWorkerノード数が固定になっていることが多い)。個人のリポジトリなのでSparkやHadoopバージョンの最新を追従しているわけではないようです。でもDockerfileでバージョンが指定できるようになっているのでcloneしてちょっと変更すれば簡単にスケーラブルなSparkクラスタが用意できます。
SPARK_VERSIONの変更
Dockerfileが親切なのですぐわかると思いますが、「ENV SPARK_VERSION 2.4.1」を任意のバージョンに合わせて変更するだけでOKです。
AWS Glueの検証目的なら2.4.3(2020年7月時点)です。次回のソースでは3.0.0のAPIを利用するので3.0.0とします。
git clone https://github.com/suraj95/Spark-on-Docker.git
vim docker/Dockerfile
⇒ENV SPARK_VERSION 2.4.1 を変更
kafka-clientのJARを追加する
pysparkでストリーミングのインプットとしてkafkaを指定する際にkafkaのクライアント用のjarがないと下記エラーが発生します。
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value.
SPARK_HOME/jars にjarを追加するようにDockerfileに追記します。Sparkのインストールが終わって最後のCMDの手前に下記を追加します。
(kafkaのクライアント用jarバージョンはkafkaコンテナに入って、KAFKA_HOME/libs 配下を確認すればわかります。)
KAFKA_CLIENT_JAR_VERSION 2.5.0
WORKDIR $SPARK_HOME/jars
RUN curl -O https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/${KAFKA_CLIENT_JAR_VERSION}/kafka-clients-${KAFKA_CLIENT_JAR_VERSION}.jar
以上でSparkクラスタが構築できます。次回はkafkaストリーミング処理を実装してこのクラスタで実行してみます。