Google Cloud DataflowをPythonで動かしてみる
前回のGoogle Cloud DataflowをJavaで動かしてみるに続き、次はPythonで試しました。
前提
Pythonライブラリインストール
pip install apache-beam[gcp]
というコマンドはbashではそのままでいいですが、zshでは私の環境ではシングルクオートで囲む必要がありました。
$ pip install 'apache-beam[gcp]'
ソースコード
Pythonの1ファイルのみです。
import argparse from past.builtins import unicode import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions class ExtractWordsFn(beam.DoFn): def process(self, element): return element.split(" ") def run(argv=None, save_main_session=True): parser = argparse.ArgumentParser() parser.add_argument( '--inputPath', dest='inputPath', required=True, help='Path of the file to read from') parser.add_argument( '--outputPath', dest='outputPath', required=True, help='Path of the file to write to') known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session with beam.Pipeline(options=pipeline_options) as p: def format_result(word, count): return '%s: %d' % (word, count) ( p | 'ReadLines' >> ReadFromText(known_args.inputPath) | 'Split' >> (beam.ParDo(ExtractWordsFn()).with_output_types(unicode)) | (beam.ParDo(ExtractWordsFn()).with_output_types(unicode)) | 'PairWIthOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum) | 'Format' >> beam.MapTuple(format_result) | 'WriteCounts' >> WriteToText(known_args.outputPath) ) if __name__ == '__main__': run()
実行方法
$ python mysample.py --project=PROJECT_ID --region=us-central1 --runner=DataflowRunner --temp_location=TEMP_GS_PATH --inputPath=gs://apache-beam-samples/shakespeare/kinglear.txt --outputPath=OUTPUT_GS_PATH
上記コマンドの中、以下の3箇所は自分の環境に合わせて書き換えます。
PROJECT_ID
: GCPのプロジェクトID、TEMP_GS_PATH
: Dataflowの実行に必要なファイルを置くGCSのパス。このパスの中に自動で必要なファイルがアップロードされる。例:gs://sample-bucket/dataflow/temp/
OUTPUT_GS_PATH
: 出力先GCSパス。 例:gs://sample-bucket/dataflow/output
と指定するとgs://sample-bucket/dataflow/output-00000-of-00003
のような名前のファイルが作成される
コマンドの中で入力として指定している gs://apache-beam-samples/shakespeare/kinglear.txt
にはApacheが用意しているサンプル入力ファイルがあり、誰でも読み込み可能です。
このコマンドを実行すると、4分〜5分で完了します。GCPコンソールでDataflowのジョブを見ると次のように表示されています。