PySparkとKafkaでStreaming処理を行う

前回docker-composeで構築したsparkクラスタで今回はStreaming処理を行います。Structured Streamingを利用することで簡単にKafkaのデータをストリームで受け取って処理することが可能です。

早速サンプルを載せます。将来的には今取り組んでいるリアルタイム異常検知システムのサンプル全体をgithubで公開しようと思いますが、リポジトリ構成を検討中なので一部抜粋して記載します。(全体的にリファクタリングが必要なのもありかなり先になると思います)



Streamingデータの読み出し

SparkSessionからreadStreamを利用することでストリーミングでデータを処理することが可能になります。細かいことは後にしてまずソースを先に載せます。
一点だけ注意ですが、readStreamとwriteStream(後述)はペアで存在する必要があります。readStreamだけしてprintSchemaで確認しながら実装していきたいところですが、wirteStreamも必要なのでまずは読み進めてもらって単純に読み出したデータをファイルやコンソールに出力できる状態にしてから色々実装したほうが良いと思います。

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("KafkaSample") \
    .getOrCreate()

kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", '172.26.0.1:10090,...,172.26.0.1:10099') \
    .option("subscribe", "test") \
    .option("startingOffsets", "earliest") \
    .option("groupId", "streamtest") \
    .load()

readStreamのformatでデータソースの種類を指定します。今回はkafkaを使用しましたが、他にもcsv, parquet, socketなどが利用可能です。Structured Streamingのドキュメントでも最初の例はsocketになっています。

formatを指定したらformatに合わせたoptionを指定します。大体以前記事にしたkafka-pythonでconsumerを作成するときと同じ考えで問題ないですが、brokerのリストの形式が下記の通り異なるので注意が必要です。

  • kafka-python: ‘IP:PORT’のlist
  • pyspark: ‘IP:PORT,…,IP:PORT’の文字列

Kafkaデータの加工 sparkSQL

kafkaから読み出すデータもDataFrameとして扱えるので下記スキーマを持っています。詳細はkafkaとの統合のドキュメントを参照してください。

ColumnType
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamptimestamp
timestampTypeint
headers (optional)array
kafkaデータのスキーマ

kafkaのtopicに送ったデータはvalueとして保持されているので、valueをselectしてきて必要に応じて加工します。一旦Stringとして受け取って、from_jsonで展開します。また、kafkaがデータを受信したrecieve_timeも取得します。
ちなみに今回の題材でkafkaに送信される気象データはarea: 地域, send_time: 疑似sensorがデータを送信したUnixtime, weather_data: 気象データ(json形式文字列)となっています。

from pyspark.sql.types import *
from pyspark.sql import functions

input_schema = StructType([
    StructField('area', StringType(), True),
    StructField('send_time', StringType(), True),
    StructField('weather_data', StringType(), True)
])

# kafka_stream_dfは上述のソースでkafkaから取得してきたものです。
df = kafka_stream_df \
    .selectExpr("CAST(value AS STRING) AS json",
                "CAST(timestamp AS LONG) AS recive_time") \
    .select(functions.from_json(functions.col("json"),
                                input_schema).alias('tmp_data'),
            "recive_time") \
    .select("tmp_data.*", "recive_time")

selectExprもselectも1カラム毎にカンマでクエリを区切る必要があるので注意です。select(“col1 AS name1, col2 AS name2”)としたくなりますが。。。
この題材ではweather_dataが更にjsonなので同様にselectExprでStringとして取得してselectにfrom_jsonを使って展開しますが、やってること同じなので省略してます。

Kafkaデータの加工 DataFrame

当然SQLでなくDataFrameで加工することも可能です。個人的にはJSONを展開するまではSQLのほうがわかりやすいと思いますが、DataFrameにしてからは好みだと思います。DataFrameはpandasと少しずつAPIが異なるのでpandasに慣れていると混乱しますが、SQLよりプログラムチックに書きやすいと思います。

# 欠損値の補完は同じ
df = df.fillna('///') # ///は気象データを観測しなかったときに入るみたい
# replaceは結構変わる。置換前の値List, 置換後の値List, 置換実施カラム名のListを渡す
df = df.replace(['///', '北', '北北東',...], 
                ['', 'N', 'NNE',...],
                ['wind_velocity_max_direction',...])
# for で回して処理することも可能。下記はスペース以降を取り除く例
# withColumnは本来SELECT 〇〇 AS 別名 みたいに使うものだが別名を指定しなければ上書きになる
for col in df.schema.names:
    df = df.withColumn(col, functions.regexp_replace(col, ' .*$', ''))

Streamingデータの書き込み

書き込みにはDataFrame.writeStreamを利用します。読み出しと同様に様々な出力先が選択できます。Streaming処理なので基本的には常時動かし続けると思いますが、一定時間経過したら終了させることも可能です(kafkaへの書き込み のソース参照)。

kafkaへの書き込み

kafkaへ書き込む場合はスキーマが決まっています。最低限valueというカラムが必要です。こちらも詳細はkafkaとの統合のドキュメントを参照してください。

ColumnType
key (optional)string or binary
value (required)string or binary
headers (optional)array
topic (*optional)string
partition (optional)int
kafkaデータのスキーマ
# valueとkeyの作成. valueはto_jsonを使うと楽
writer = df.selectExpr("CAST(key AS STRING)", "to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "172.26.0.1:10090,...,172.26.0.1:10099") \
    .option("topic", "output_topic") \
    .start()
# Ctrl + Cが来るまで実行し続ける
writer.awaitTermination()

# もし一定時間や特定の条件で終了したい場合はstop()を利用する
# sleep(5)
# writer.stop()

CSVへの書き込み

csvの場合は特にスキーマの指定は無いのでそのまま出力可能です。出力先はディレクトリ指定です。1ファイルに出力させることは(多分)できません。

writer = df \
    .writeStream \
    .format("csv") \
    .option("path", "stream_output") \
    .start()
writer.awaitTermination()
# もし一定時間や特定の条件で終了したい場合はstop()を利用する
# sleep(5)
# writer.stop()

以上でkafkaのデータをStreamingで処理することが可能になります。Streamingではsortがサポートされていないなど色々と制限があるので本当にStreamingで処理する必要があるのか検討することも大切だと思います。実際通常のconsumerはpush型もpull型も利用可能なので、データが来たらすぐ処理する・定期的に刈り取る、などの実装で対処したほうがシンプルかつデバッグもしやすくなると思います(どっちにしろsortはミニバッチというか刈り取ったデータ内でしかできませんけど…。sortはデータの加工側ではなく利用側に任せたりsort用にバッチを用意するなど工夫が必要かもしれません)。


カテゴリー:AnalyticsPf,docker,python

Output不足なエンジニア

統計が好きになれず、機械学習やったら必然的に統計が必要になるだろうと思ったら想像以上に機械学習にハマる。数学は芸術なので商売にするつもりはないけど、DeepLearningは数学じゃないし商売にしたいと思っているところ。画像処理がメイン。