Google Cloud DataflowをPythonで動かしてみる

前回のGoogle Cloud DataflowをJavaで動かしてみるに続き、次はPythonで試しました。

前提

Pythonライブラリインストール

pip install apache-beam[gcp] というコマンドはbashではそのままでいいですが、zshでは私の環境ではシングルクオートで囲む必要がありました。

$ pip install 'apache-beam[gcp]'

ソースコード

Pythonの1ファイルのみです。

import argparse

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    return element.split(" ")

def run(argv=None, save_main_session=True):
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--inputPath',
      dest='inputPath',
      required=True,
      help='Path of the file to read from')
  parser.add_argument(
      '--outputPath',
      dest='outputPath',
      required=True,
      help='Path of the file to write to')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  with beam.Pipeline(options=pipeline_options) as p:
    def format_result(word, count):
      return '%s: %d' % (word, count)

    ( p |
      'ReadLines' >> ReadFromText(known_args.inputPath) |
      'Split' >> (beam.ParDo(ExtractWordsFn()).with_output_types(unicode)) |
      (beam.ParDo(ExtractWordsFn()).with_output_types(unicode)) |
      'PairWIthOne' >> beam.Map(lambda x: (x, 1)) |
      'GroupAndSum' >> beam.CombinePerKey(sum) |
      'Format' >> beam.MapTuple(format_result) |
      'WriteCounts' >> WriteToText(known_args.outputPath)
    )

if __name__ == '__main__':
  run()

実行方法

$ python mysample.py --project=PROJECT_ID --region=us-central1 --runner=DataflowRunner --temp_location=TEMP_GS_PATH --inputPath=gs://apache-beam-samples/shakespeare/kinglear.txt --outputPath=OUTPUT_GS_PATH

上記コマンドの中、以下の3箇所は自分の環境に合わせて書き換えます。

  • PROJECT_ID: GCPのプロジェクトID、
  • TEMP_GS_PATH: Dataflowの実行に必要なファイルを置くGCSのパス。このパスの中に自動で必要なファイルがアップロードされる。例: gs://sample-bucket/dataflow/temp/
  • OUTPUT_GS_PATH: 出力先GCSパス。 例: gs://sample-bucket/dataflow/output と指定すると gs://sample-bucket/dataflow/output-00000-of-00003 のような名前のファイルが作成される

コマンドの中で入力として指定している gs://apache-beam-samples/shakespeare/kinglear.txt にはApacheが用意しているサンプル入力ファイルがあり、誰でも読み込み可能です。

このコマンドを実行すると、4分〜5分で完了します。GCPコンソールでDataflowのジョブを見ると次のように表示されています。

f:id:suzuki-navi:20201009233622p:plain

JavaPythonの違い

  • Dataflowの処理時間が違う
    • Javaは3分前後
    • Pythonは4分から5分程度かかる
  • Javaでの --stagingLocation で指定するGoogle Cloud Storageの中身とPythonでの --temp_location で指定するGoogle Cloud Storageの中身は、どちらも自動で実行時に必要なファイルが置かれるが、構成が違う
    • Java--stagingLocation で指定のディレクトリ直下に大量のjarファイルが保存される
    • Python--temp_location で指定のディレクトリ内に日時を含むテンポラリのディレクトリが作られ、その中になにかが保存される。ファイル数もJavaよりずっと少ない