Python/Perlから子プロセスの標準入力にデータを渡す

先日も似た記事を書きました。

Python/Perlからdiffコマンドを起動して2つの文字列の差分を表示

このときは、外部プロセスとの通信に名前付きパイプを使いました。

今回は、子プロセスの標準入力に親プロセスからデータを送ります。名前のないパイプを作成し、パイプの読み取り側を子プロセスの標準入力にし、パイプの書き込み側を親プロセスで保持してデータをパイプに書き込みます。

以下のサンプルコードは、子プロセスにwcコマンドを起動して、文字数を数えています。

(プロセスをforkしたり、パイプをごにょごにょしたりする処理はPerlが書きやすいって思っていたのですが、Pythonでも同じようにできるんですね、という発見で、先日の記事も今回の記事も書きました)

Python

import os
import sys

def exec_wcc(input):
    sys.stdout.flush()
    # execDiff呼び出す前に標準出力のバッファリングが残っていると
    # fork後に重複して出力される

    reader, writer = os.pipe()

    # wcコマンド起動
    pid = os.fork()
    if pid == 0:
        os.close(writer)

        # 標準入力をパイプからの入力で置き換える
        os.dup2(reader, 0) # 0は標準入力の意味

        os.execvp("wc", ["wc", "-c"])

    os.close(reader)

    # パイプにデータを書き込み
    writer = os.fdopen(writer, "w")
    writer.write(input)
    writer.close()

    os.waitpid(pid, 0)

exec_wcc("Hello, world!")
# 13と表示される

Perl

use strict;
use warnings;

# 上記Pythonの例と同様の
# プロセスforkを手動で書く方法
sub exec_wcc_1 {
    my ($input) = @_;

    pipe(PIPE_READER, PIPE_WRITER);

    # wcコマンド起動
    my $pid = fork;
    die $! unless defined($pid);
    if ($pid == 0) {
        close(PIPE_WRITER);

        # 標準入力をパイプからの入力で置き換える
        open(STDIN, "<&=", fileno(PIPE_READER)) or die $!;

        exec("wc", "-c");
    }

    close(PIPE_READER);

    # パイプにデータを書き込み
    print PIPE_WRITER $input;
    close(PIPE_WRITER);

    waitpid($pid, 0);
}

# Perlっぽい簡潔な書き方
sub exec_wcc_2 {
    my ($input) = @_;
    open(WRITER, "|-", "wc -c");
    print WRITER $input;
    close(WRITER);
}

exec_wcc_1("Hello, world!");
# 13と表示される

exec_wcc_2("Hello, world!");
# 13と表示される

gcloudコマンドをUbuntuにインストール

Google Cloudのgcloud, gsutilなどのコマンドをインストールしてみます。

Compute Engineですと、初めからインストールされていて、認証も通っているようです。今回はGoogle Cloudと関係ないAWS EC2インスタンスにインストールしてみました。

参考 Using the Google Cloud SDK installer  |  Cloud SDK Documentation

OSはUbuntu Server 20.04 LTSです。

$ curl https://sdk.cloud.google.com | bash

これを実行すると、インストール先を聞かれます。コマンドラインでの対話型です。

インストールが完了すると ~/.bashrc に以下の記述が追記されます。

# The next line updates PATH for the Google Cloud SDK.
if [ -f '/home/ubuntu/google-cloud-sdk/path.bash.inc' ]; then . '/home/ubuntu/google-cloud-sdk/path.bash.inc'; fi

# The next line enables shell command completion for gcloud.
if [ -f '/home/ubuntu/google-cloud-sdk/completion.bash.inc' ]; then . '/home/ubuntu/google-cloud-sdk/completion.bash.inc'; fi

このPATH設定を読み込みます。

$ . .bashrc

~/google-cloud-sdk にインストールされていることがわかります。

$ which gcloud
/home/ubuntu/google-cloud-sdk/bin/gcloud

$ gcloud --version
Google Cloud SDK 331.0.0
bq 2.0.65
core 2021.03.05
gsutil 4.59

$ ll google-cloud-sdk/bin
total 205188
drwxrwxr-x 3 ubuntu ubuntu      4096 Mar 15 14:53 ./
drwxrwxr-x 9 ubuntu ubuntu      4096 Mar 15 14:53 ../
-rwxr-xr-x 1 ubuntu ubuntu 144642600 Jan  1  1980 anthoscli*
drwxrwxr-x 3 ubuntu ubuntu      4096 Mar 15 14:53 bootstrapping/
-rwxr-xr-x 1 ubuntu ubuntu      4973 Jan  1  1980 bq*
-rwxr-xr-x 1 ubuntu ubuntu      3061 Jan  1  1980 dev_appserver.py*
-rwxr-xr-x 1 ubuntu ubuntu      4993 Jan  1  1980 docker-credential-gcloud*
-rwxr-xr-x 1 ubuntu ubuntu       945 Jan  1  1980 endpointscfg.py*
-rwxr-xr-x 1 ubuntu ubuntu      4965 Jan  1  1980 gcloud*
-rwxr-xr-x 1 ubuntu ubuntu      5449 Jan  1  1980 git-credential-gcloud.sh*
-rwxr-xr-x 1 ubuntu ubuntu      5013 Jan  1  1980 gsutil*
-rwxr-xr-x 1 ubuntu ubuntu      4991 Jan  1  1980 java_dev_appserver.sh*
-rwxr-xr-x 1 ubuntu ubuntu  65393116 Jan  1  1980 kuberun*

対話型を避けて、スクリプトでもインストールできるようにするには、以下のようにすればよいようです。

$ bash <(curl https://sdk.cloud.google.com)  --disable-prompts --install-dir=$HOME

この場合、 ~/.bashrc は変更されないようです。手動で以下を書き加えればよいです。

. $HOME/google-cloud-sdk/path.zsh.inc
. $HOME/google-cloud-sdk/completion.zsh.inc

実際に使うには認証情報を設定する必要があります。

$ gcloud init

このコマンドも対話型です。対話中に認証用のURLが表示されますので、これをGoogleアカウントでログインしているブラウザでアクセスします。ブラウザ上で承認すると、コードが表示されますので、これを端末にコピーします。

プロジェクトの選択、リージョンの選択をすると、完了です。 ~/.boto~/.config/gcloud/ が作成されます。

以下のコマンドで、Cloud Storageのバケット一覧が表示されますので、認証が通っていることが確認できます。

$ gsutil ls

AWS Step FunctionsのRetryとCatchを使ってみる

f:id:suzuki-navi:20210311081043p:plain

AWSのStep FunctionsのRetryとCatchをServerless Frameworkから使ってみます。

Step Functionsは、CloudFormationだとState machine定義の記述が面倒です。JSONを文字列にしたものをCloudFormationのテンプレートに書くか、別ファイルにしないといけないようです。Serverless FrameworkならYAMLで直接serverless.ymlに定義できます。今回はServerless Frameworkで試しました。

Serverless FrameworkからStep Functionsを定義するにはプラグインのインストールが必要です。

$ serverless plugin install -n serverless-step-functions

ソースコード

試しに作った serverless.yml

service: sample-stepfunction

frameworkVersion: '2'

plugins:
  - serverless-step-functions

provider:
  name: aws
  region: ap-northeast-1
  lambdaHashingVersion: 20201221

resources:
  Resources:
    # Step Functions用CloudWatch LogGroup作成
    sampleStateMachineLogGroup:
      Type: AWS::Logs::LogGroup
      Properties:
        LogGroupName: /aws/states/sampleStateMachine-Logs

functions:
  # 3つのLambda作成
  samplefunc1:
    handler: handler.main1
    runtime: python3.8
  samplefunc2:
    handler: handler.main2
    runtime: python3.8
  samplefunc3:
    handler: handler.main3
    runtime: python3.8

# Step Functions作成
stepFunctions:
  stateMachines:
    sampleStateMachine:
      name: sampleStateMachine
      loggingConfig:
        level: ALL
        includeExecutionData: true
        destinations:
          - Fn::GetAtt: [sampleStateMachineLogGroup, Arn]
      definition:
        StartAt: Status1
        States:
          Status1:
            Type: Task
            Resource:
              Fn::GetAtt: [samplefunc1, Arn]
            Next: Status2
          Status2:
            Type: Task
            Resource:
              Fn::GetAtt: [samplefunc2, Arn]
            Retry:
              - ErrorEquals:
                - States.ALL
                IntervalSeconds: 1
                MaxAttempts: 2
                BackoffRate: 2
            Catch:
              - ErrorEquals:
                - States.ALL
                Next: Status3
            Next: End
          Status3:
            Type: Task
            Resource:
              Fn::GetAtt: [samplefunc3, Arn]
            Next: End
          End:
            Type: Pass
            End: true

handler.py

import time

def main1(event, context):
    print(event)
    return {"foo": "main1"}

def main2(event, context):
    print(event)
    time.sleep(10) # Lambdaをタイムアウトさせる
    return {"foo": "main2"} # タイムアウトになるのでこの値は返ってこない

def main3(event, context):
    print(event)
    return {"foo": "main3"}

3つのLambdaを定義しています。

  • samplefunc1: 入力をログに残した後、正常に終了するLambda
  • samplefunc2: 入力をログに残した後、必ずタイムアウトのエラーになってしまうLambda
  • samplefunc3: 入力をログに残した後、正常に終了するLambda

それぞれStep FunctionsのStatus1, Status2, Status3に対応します。

実行結果

f:id:suzuki-navi:20210311081107p:plain

Status2にはRetryでMaxAttempts: 2の指定があるので、2回のリトライ、つまりsamplefunc2が3回実行されます。samplefunc2に渡される入力は3回とも同じです。

Status2にはCatchで'Next: Status3'の指定もあるので、samplefunc2が3回実行されたあとに、Status3に遷移します。samplefunc3への入力は次のようなオブジェクトになります。

{
  "Error": "Lambda.Unknown",
  "Cause": "The cause could not be determined because Lambda did not return an error type. Returned payload: {\"errorMessage\":\"2021-03-10T11:12:36.559Z ea381979-e25c-41e3-9fe5-39360cfa0ab6 Task timed out after 6.01 seconds\"}"
}

エラーハンドラに元の入力を引き渡す

Status2のCatchの指定に次のように ResultPath: "$.error" の指定を追加します。

            Catch:
              - ErrorEquals:
                - States.ALL
                Next: Status3
                ResultPath: "$.error"

これをデプロイして実行すると、Status3への入力は、Status2への入力にエラー情報を追加したものに変わります。元の入力を保持しておきたい場合はこの指定が必要のようです。

{
  "foo": "main1",
  "error": {
    "Error": "Lambda.Unknown",
    "Cause": "The cause could not be determined because Lambda did not return an error type. Returned payload: {\"errorMessage\":\"2021-03-10T11:25:56.258Z 42c9ade8-93e1-499d-9d60-68b0a6c7ccdf Task timed out after 6.01 seconds\"}"
  }
}

WindowsのWSLからテキストのコピペでSSH経由でファイルを送る方法

WSLから clip.exe というコマンドでテキストをクリップボードにコピーできることを知りました。

なので、こんなことができます。WindowsからTera Termなどでどこか遠くのサーバに接続し、Windowsにあるファイルをテキスト貼り付けで遠くのサーバに送信です。scpとかrsyncとか不要です。

<ローカルファイルをクリップボードにコピー>

WSL上で、適当なファイルをtarコマンドとbase64コマンドでテキストにしてクリップボードに送ります。以下のコマンドはカレントディレクトリにあるJPGファイルすべてをコピーしています。

$ tar cvzf - *.jpg  | base64 | clip.exe

クリップボードから遠くにペースト>

遠くのサーバに接続しているTera Termなどで、以下のコマンドでエンターを押した後、マウスの右クリックなどでクリップボードからの貼り付けをします。Tera Termであれば改行を含むテキストの貼り付け確認画面がでるかもしれません。

$ base64 -d | tar xvzf -

貼り付けると、長々とBASE64のテキストがスクロールされます。Ctrl-Dを押すと貼り付け完了で、サーバのカレントディレクトリにファイルがただちに解凍されます。

ファイルが大きいとクリップボードを介するテキストが長くなりますので、ファイルサイズに上限があるかもしれません。

AWS SQSをPythonで試してみる

CloudFormationでSQSのQueueを作成して、Pythonコードでメッセージの送受信をするところまで試してみます。

CloudFormation

template.yaml

AWSTemplateFormatVersion: '2010-09-09'

Resources:
  SampleSQS:
    Type: AWS::SQS::Queue

Outputs:
  SampleSQSArn:
    Value: !GetAtt SampleSQS.Arn

Propertiesなしの全部デフォルト値のQueueをこれで作成できます。

デプロイ

$ aws cloudformation deploy --template-file template.yaml --stack-name samplesqs

Queueの名前は以下のコマンドでわかります。

$ aws cloudformation describe-stacks --stack-name samplesqs | jq ".Stacks[0].Outputs"
[
  {
    "OutputKey": "SampleSQSArn",
    "OutputValue": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:samplesqs-SampleSQS-XXXXXXXXXXXX"
  }
]

QueueのURLがあとで必要になるのですが、このコマンドでわかります。

$ aws sqs get-queue-url --queue-name samplesqs-SampleSQS-X9UDN0YW5ADO
{
    "QueueUrl": "https://ap-northeast-1.queue.amazonaws.com/xxxxxxxxxxxx/samplesqs-SampleSQS-XXXXXXXXXXXX"
}

Pythonコード

PythonコードでQueueのメッセージ送受信をしてみます。

sample.py

import json
import boto3

profile = "default"

session = boto3.session.Session(profile_name = profile)
sqs_client = session.client("sqs")

queue_url = "https://ap-northeast-1.queue.amazonaws.com/xxxxxxxxxxxx/samplesqs-SampleSQS-XXXXXXXXXXXX"

# 送信
idx = 1
while idx <= 10: # 10個のメッセージを送信
    message = {"msg": "Hello, world!", "foo": idx}
    sqs_client.send_message(
        QueueUrl = queue_url,
        MessageBody = json.dumps(message),
    )
    idx += 1

# 受信
while True:
    res = sqs_client.receive_message(
        QueueUrl = queue_url,
    )

    # 受信したものをなにか処理 (このサンプルでは表示するだけ)
    delete_entries = [] # 処理済みメッセージ一覧
    id = 1
    for msg in res.get("Messages", []):
        message = json.loads(msg["Body"])
        print(message)
        delete_entries.append({"Id": str(id), "ReceiptHandle": msg["ReceiptHandle"]})
        id += 1

    # 処理済みメッセージを削除
    if len(delete_entries) > 0: # 0件でdelete_message_batchを呼び出すとエラーになる
        sqs_client.delete_message_batch(
            QueueUrl = queue_url,
            Entries = delete_entries,
        )
    else:
        break

実行してみます。

$ python sample.py
{'msg': 'Hello, world!', 'foo': 3}
{'msg': 'Hello, world!', 'foo': 6}
{'msg': 'Hello, world!', 'foo': 1}
{'msg': 'Hello, world!', 'foo': 2}
{'msg': 'Hello, world!', 'foo': 4}
{'msg': 'Hello, world!', 'foo': 10}
{'msg': 'Hello, world!', 'foo': 7}
{'msg': 'Hello, world!', 'foo': 5}
{'msg': 'Hello, world!', 'foo': 8}
{'msg': 'Hello, world!', 'foo': 9}

できました。順序は保証されていないので、受信したメッセージの順番はめちゃめちゃです。

Vue.js / Nuxt.js / Vuexの書き方のメモ

すぐに忘れてしまうのでテンプレートを残しておきます。

Vue.js / Nuxt.js

import Element from "~/components/element.vue";

export default {
  components: {
    "element": Element,
  },
  data() { return {
    field1: value1,
    field3: value3,
  } },
  computed: {
    field2() {
      return value2;
    },
    field3: {
      get() {
        return this.field3Impl;
      },
      set(newVal) {
        this.field3Impl = newVal;
      },
    },
  },
  methods: {
    method1() {
      // なにか
    },
  },
  created() {
    // なにか
  },
  beforeRouteUpdate(to, from, next) {
    // to.path が新しいURLのパス部分
    next(); // 最後に呼ぶ
  },
  mounted() {
    // なにか
  },
  beforeDestroy() {
    // なにか
  },
}

Vuex

  • state
    • 値の集まり、ステータス
  • mutations
    • ステータスの操作
    • 定義の例: increment(state, n)
    • 呼び出しの例: context.commit("increment", 10)
  • actions
    • 非同期処理を含む任意の処理
    • 定義の例: increment (context, n)
    • 呼び出しの例: context.dispatch("increment", n)

Vue.js

const store = new Vuex.Store({
    state: {
        count: 0,
    },
    mutations: {
        increment(state, n) {
            // なにか
            state.count += n;
        },
    },
    actions: {
        increment(context, n) {
            // なにか
            context.commit("increment", n);
        },
    },
}

Nuxt.js

export const state = () => ({
    count: 0,
});
export const mutations = {
    increment(state, n) {
        // なにか
        state.count += n;
    },
};
export const actions = {
    increment(context, n) {
        // なにか
        context.commit("increment", n);
    },
};

Python/Perlからdiffコマンドを起動して2つの文字列の差分を表示

PythonまたはPerlからfork, execしてdiffコマンドを起動し、2つの文字列の差分を表示させるサンプルコードです。

diffコマンドへの文字列受け渡しは名前付きパイプを使っています。diffしたいだけなのに3つも子プロセスを起動しちゃってます。

Python

import os
import sys
import tempfile

def execDiff(content1, content2, name1 = "1", name2 = "2"):
    sys.stdout.flush()
    # execDiff呼び出す前に標準出力のバッファリングが残っていると
    # fork後に重複して出力される

    # FIFOを置くテンポラリディレクトリ作成
    tmpdir = tempfile.mkdtemp()

    # FIFO作成
    fifo1 = tmpdir + "/" + name1
    fifo2 = tmpdir + "/" + name2
    os.mkfifo(fifo1)
    os.mkfifo(fifo2)

    # diffコマンド起動
    pid = os.fork()
    if pid == 0:
        os.execvp("diff", ["diff", "-u", fifo1, fifo2])

    # 2つの文字列をdiffコマンドに渡すためのプロセスをfork
    pid1 = os.fork()
    if pid1 == 0:
        writer1 = open(fifo1, "w")
        writer1.write(content1)
        writer1.close()
        sys.exit()
    pid2 = os.fork()
    if pid2 == 0:
        writer2 = open(fifo2, "w")
        writer2.write(content2)
        writer2.close()
        sys.exit()

    # 子プロセス終了まで待機
    os.waitpid(pid1, 0)
    os.waitpid(pid2, 0)
    os.waitpid(pid, 0)

    # FIFOとテンポラリディレクトリ削除
    os.remove(fifo1)
    os.remove(fifo2)
    os.rmdir(tmpdir)

execDiff("abc\nghi\n", "abc\ndef\nghi\n", "before", "after")
$ python sample.py
--- /tmp/tmpvvdrq8yq/before     2021-02-28 18:16:39.715880739 +0900
+++ /tmp/tmpvvdrq8yq/after      2021-02-28 18:16:39.715880739 +0900
@@ -1,2 +1,3 @@
 abc
+def
 ghi

Perl

use POSIX qw/mkfifo/;
use File::Temp qw/tempdir/;

sub execDiff {
    my ($content1, $content2, $name1, $name2) = @_;
    $name1 = "1" unless defined($name1);
    $name2 = "2" unless defined($name2);

    # FIFOを置くテンポラリディレクトリ作成
    my $tmpdir = tempdir();

    # FIFO作成
    my $fifo1 = "$tmpdir/$name1";
    my $fifo2 = "$tmpdir/$name2";
    mkfifo($fifo1, 0700) or die $!;
    mkfifo($fifo2, 0700) or die $!;

    # diffコマンド起動
    my $pid = fork;
    die $! unless defined($pid);
    if ($pid == 0) {
        exec("diff", "-u", $fifo1, $fifo2);
    }

    # 2つの文字列をdiffコマンドに渡すためのプロセスをfork
    my $pid1 = fork;
    die $! unless defined($pid1);
    if ($pid1 == 0) {
        open(my $writer1, ">", $fifo1) or die $!;
        print $writer1 $content1;
        close($writer1);
        exit(0);
    }
    my $pid2 = fork;
    die $! unless defined($pid2);
    if ($pid2 == 0) {
        open(my $writer2, ">", $fifo2) or die $!;
        print $writer2 $content2;
        close($writer2);
        exit(0);
    }

    # 子プロセス終了まで待機
    waitpid($pid, 0);
    waitpid($pid1, 0);
    waitpid($pid2, 0);

    # FIFOとテンポラリディレクトリ削除
    unlink($fifo1);
    unlink($fifo2);
    rmdir($tmpdir);
}

execDiff("abc\nghi\n", "abc\ndef\nghi\n", "before", "after");
$ perl sample.pl
--- /tmp/i4YAOgOSeh/before      2021-02-28 18:16:56.985034423 +0900
+++ /tmp/i4YAOgOSeh/after       2021-02-28 18:16:56.985034423 +0900
@@ -1,2 +1,3 @@
 abc
+def
 ghi