Python/Perlから子プロセスの標準入力にデータを渡す
先日も似た記事を書きました。
Python/Perlからdiffコマンドを起動して2つの文字列の差分を表示
このときは、外部プロセスとの通信に名前付きパイプを使いました。
今回は、子プロセスの標準入力に親プロセスからデータを送ります。名前のないパイプを作成し、パイプの読み取り側を子プロセスの標準入力にし、パイプの書き込み側を親プロセスで保持してデータをパイプに書き込みます。
以下のサンプルコードは、子プロセスにwcコマンドを起動して、文字数を数えています。
(プロセスをforkしたり、パイプをごにょごにょしたりする処理はPerlが書きやすいって思っていたのですが、Pythonでも同じようにできるんですね、という発見で、先日の記事も今回の記事も書きました)
Python
import os import sys def exec_wcc(input): sys.stdout.flush() # execDiff呼び出す前に標準出力のバッファリングが残っていると # fork後に重複して出力される reader, writer = os.pipe() # wcコマンド起動 pid = os.fork() if pid == 0: os.close(writer) # 標準入力をパイプからの入力で置き換える os.dup2(reader, 0) # 0は標準入力の意味 os.execvp("wc", ["wc", "-c"]) os.close(reader) # パイプにデータを書き込み writer = os.fdopen(writer, "w") writer.write(input) writer.close() os.waitpid(pid, 0) exec_wcc("Hello, world!") # 13と表示される
Perl
use strict; use warnings; # 上記Pythonの例と同様の # プロセスforkを手動で書く方法 sub exec_wcc_1 { my ($input) = @_; pipe(PIPE_READER, PIPE_WRITER); # wcコマンド起動 my $pid = fork; die $! unless defined($pid); if ($pid == 0) { close(PIPE_WRITER); # 標準入力をパイプからの入力で置き換える open(STDIN, "<&=", fileno(PIPE_READER)) or die $!; exec("wc", "-c"); } close(PIPE_READER); # パイプにデータを書き込み print PIPE_WRITER $input; close(PIPE_WRITER); waitpid($pid, 0); } # Perlっぽい簡潔な書き方 sub exec_wcc_2 { my ($input) = @_; open(WRITER, "|-", "wc -c"); print WRITER $input; close(WRITER); } exec_wcc_1("Hello, world!"); # 13と表示される exec_wcc_2("Hello, world!"); # 13と表示される
gcloudコマンドをUbuntuにインストール
Google Cloudのgcloud, gsutilなどのコマンドをインストールしてみます。
Compute Engineですと、初めからインストールされていて、認証も通っているようです。今回はGoogle Cloudと関係ないAWS EC2インスタンスにインストールしてみました。
参考 Using the Google Cloud SDK installer | Cloud SDK Documentation
OSはUbuntu Server 20.04 LTSです。
$ curl https://sdk.cloud.google.com | bash
これを実行すると、インストール先を聞かれます。コマンドラインでの対話型です。
インストールが完了すると ~/.bashrc
に以下の記述が追記されます。
# The next line updates PATH for the Google Cloud SDK. if [ -f '/home/ubuntu/google-cloud-sdk/path.bash.inc' ]; then . '/home/ubuntu/google-cloud-sdk/path.bash.inc'; fi # The next line enables shell command completion for gcloud. if [ -f '/home/ubuntu/google-cloud-sdk/completion.bash.inc' ]; then . '/home/ubuntu/google-cloud-sdk/completion.bash.inc'; fi
このPATH設定を読み込みます。
$ . .bashrc
~/google-cloud-sdk
にインストールされていることがわかります。
$ which gcloud /home/ubuntu/google-cloud-sdk/bin/gcloud $ gcloud --version Google Cloud SDK 331.0.0 bq 2.0.65 core 2021.03.05 gsutil 4.59 $ ll google-cloud-sdk/bin total 205188 drwxrwxr-x 3 ubuntu ubuntu 4096 Mar 15 14:53 ./ drwxrwxr-x 9 ubuntu ubuntu 4096 Mar 15 14:53 ../ -rwxr-xr-x 1 ubuntu ubuntu 144642600 Jan 1 1980 anthoscli* drwxrwxr-x 3 ubuntu ubuntu 4096 Mar 15 14:53 bootstrapping/ -rwxr-xr-x 1 ubuntu ubuntu 4973 Jan 1 1980 bq* -rwxr-xr-x 1 ubuntu ubuntu 3061 Jan 1 1980 dev_appserver.py* -rwxr-xr-x 1 ubuntu ubuntu 4993 Jan 1 1980 docker-credential-gcloud* -rwxr-xr-x 1 ubuntu ubuntu 945 Jan 1 1980 endpointscfg.py* -rwxr-xr-x 1 ubuntu ubuntu 4965 Jan 1 1980 gcloud* -rwxr-xr-x 1 ubuntu ubuntu 5449 Jan 1 1980 git-credential-gcloud.sh* -rwxr-xr-x 1 ubuntu ubuntu 5013 Jan 1 1980 gsutil* -rwxr-xr-x 1 ubuntu ubuntu 4991 Jan 1 1980 java_dev_appserver.sh* -rwxr-xr-x 1 ubuntu ubuntu 65393116 Jan 1 1980 kuberun*
対話型を避けて、スクリプトでもインストールできるようにするには、以下のようにすればよいようです。
$ bash <(curl https://sdk.cloud.google.com) --disable-prompts --install-dir=$HOME
この場合、 ~/.bashrc
は変更されないようです。手動で以下を書き加えればよいです。
. $HOME/google-cloud-sdk/path.zsh.inc . $HOME/google-cloud-sdk/completion.zsh.inc
実際に使うには認証情報を設定する必要があります。
$ gcloud init
このコマンドも対話型です。対話中に認証用のURLが表示されますので、これをGoogleアカウントでログインしているブラウザでアクセスします。ブラウザ上で承認すると、コードが表示されますので、これを端末にコピーします。
プロジェクトの選択、リージョンの選択をすると、完了です。 ~/.boto
と ~/.config/gcloud/
が作成されます。
以下のコマンドで、Cloud Storageのバケット一覧が表示されますので、認証が通っていることが確認できます。
$ gsutil ls
AWS Step FunctionsのRetryとCatchを使ってみる
AWSのStep FunctionsのRetryとCatchをServerless Frameworkから使ってみます。
Step Functionsは、CloudFormationだとState machine定義の記述が面倒です。JSONを文字列にしたものをCloudFormationのテンプレートに書くか、別ファイルにしないといけないようです。Serverless FrameworkならYAMLで直接serverless.yml
に定義できます。今回はServerless Frameworkで試しました。
Serverless FrameworkからStep Functionsを定義するにはプラグインのインストールが必要です。
$ serverless plugin install -n serverless-step-functions
ソースコード
試しに作った serverless.yml
service: sample-stepfunction frameworkVersion: '2' plugins: - serverless-step-functions provider: name: aws region: ap-northeast-1 lambdaHashingVersion: 20201221 resources: Resources: # Step Functions用CloudWatch LogGroup作成 sampleStateMachineLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: /aws/states/sampleStateMachine-Logs functions: # 3つのLambda作成 samplefunc1: handler: handler.main1 runtime: python3.8 samplefunc2: handler: handler.main2 runtime: python3.8 samplefunc3: handler: handler.main3 runtime: python3.8 # Step Functions作成 stepFunctions: stateMachines: sampleStateMachine: name: sampleStateMachine loggingConfig: level: ALL includeExecutionData: true destinations: - Fn::GetAtt: [sampleStateMachineLogGroup, Arn] definition: StartAt: Status1 States: Status1: Type: Task Resource: Fn::GetAtt: [samplefunc1, Arn] Next: Status2 Status2: Type: Task Resource: Fn::GetAtt: [samplefunc2, Arn] Retry: - ErrorEquals: - States.ALL IntervalSeconds: 1 MaxAttempts: 2 BackoffRate: 2 Catch: - ErrorEquals: - States.ALL Next: Status3 Next: End Status3: Type: Task Resource: Fn::GetAtt: [samplefunc3, Arn] Next: End End: Type: Pass End: true
handler.py
import time def main1(event, context): print(event) return {"foo": "main1"} def main2(event, context): print(event) time.sleep(10) # Lambdaをタイムアウトさせる return {"foo": "main2"} # タイムアウトになるのでこの値は返ってこない def main3(event, context): print(event) return {"foo": "main3"}
3つのLambdaを定義しています。
- samplefunc1: 入力をログに残した後、正常に終了するLambda
- samplefunc2: 入力をログに残した後、必ずタイムアウトのエラーになってしまうLambda
- samplefunc3: 入力をログに残した後、正常に終了するLambda
それぞれStep FunctionsのStatus1, Status2, Status3に対応します。
実行結果
Status2にはRetryでMaxAttempts: 2
の指定があるので、2回のリトライ、つまりsamplefunc2が3回実行されます。samplefunc2に渡される入力は3回とも同じです。
Status2にはCatchで'Next: Status3'の指定もあるので、samplefunc2が3回実行されたあとに、Status3に遷移します。samplefunc3への入力は次のようなオブジェクトになります。
{ "Error": "Lambda.Unknown", "Cause": "The cause could not be determined because Lambda did not return an error type. Returned payload: {\"errorMessage\":\"2021-03-10T11:12:36.559Z ea381979-e25c-41e3-9fe5-39360cfa0ab6 Task timed out after 6.01 seconds\"}" }
エラーハンドラに元の入力を引き渡す
Status2のCatchの指定に次のように ResultPath: "$.error"
の指定を追加します。
Catch: - ErrorEquals: - States.ALL Next: Status3 ResultPath: "$.error"
これをデプロイして実行すると、Status3への入力は、Status2への入力にエラー情報を追加したものに変わります。元の入力を保持しておきたい場合はこの指定が必要のようです。
{ "foo": "main1", "error": { "Error": "Lambda.Unknown", "Cause": "The cause could not be determined because Lambda did not return an error type. Returned payload: {\"errorMessage\":\"2021-03-10T11:25:56.258Z 42c9ade8-93e1-499d-9d60-68b0a6c7ccdf Task timed out after 6.01 seconds\"}" } }
WindowsのWSLからテキストのコピペでSSH経由でファイルを送る方法
WSLから clip.exe
というコマンドでテキストをクリップボードにコピーできることを知りました。
なので、こんなことができます。WindowsからTera Termなどでどこか遠くのサーバに接続し、Windowsにあるファイルをテキスト貼り付けで遠くのサーバに送信です。scpとかrsyncとか不要です。
<ローカルファイルをクリップボードにコピー>
WSL上で、適当なファイルをtarコマンドとbase64コマンドでテキストにしてクリップボードに送ります。以下のコマンドはカレントディレクトリにあるJPGファイルすべてをコピーしています。
$ tar cvzf - *.jpg | base64 | clip.exe
<クリップボードから遠くにペースト>
遠くのサーバに接続しているTera Termなどで、以下のコマンドでエンターを押した後、マウスの右クリックなどでクリップボードからの貼り付けをします。Tera Termであれば改行を含むテキストの貼り付け確認画面がでるかもしれません。
$ base64 -d | tar xvzf -
貼り付けると、長々とBASE64のテキストがスクロールされます。Ctrl-Dを押すと貼り付け完了で、サーバのカレントディレクトリにファイルがただちに解凍されます。
ファイルが大きいとクリップボードを介するテキストが長くなりますので、ファイルサイズに上限があるかもしれません。
AWS SQSをPythonで試してみる
CloudFormationでSQSのQueueを作成して、Pythonコードでメッセージの送受信をするところまで試してみます。
CloudFormation
template.yaml
AWSTemplateFormatVersion: '2010-09-09' Resources: SampleSQS: Type: AWS::SQS::Queue Outputs: SampleSQSArn: Value: !GetAtt SampleSQS.Arn
Propertiesなしの全部デフォルト値のQueueをこれで作成できます。
デプロイ
$ aws cloudformation deploy --template-file template.yaml --stack-name samplesqs
Queueの名前は以下のコマンドでわかります。
$ aws cloudformation describe-stacks --stack-name samplesqs | jq ".Stacks[0].Outputs" [ { "OutputKey": "SampleSQSArn", "OutputValue": "arn:aws:sqs:ap-northeast-1:xxxxxxxxxxxx:samplesqs-SampleSQS-XXXXXXXXXXXX" } ]
QueueのURLがあとで必要になるのですが、このコマンドでわかります。
$ aws sqs get-queue-url --queue-name samplesqs-SampleSQS-X9UDN0YW5ADO { "QueueUrl": "https://ap-northeast-1.queue.amazonaws.com/xxxxxxxxxxxx/samplesqs-SampleSQS-XXXXXXXXXXXX" }
Pythonコード
PythonコードでQueueのメッセージ送受信をしてみます。
sample.py
import json import boto3 profile = "default" session = boto3.session.Session(profile_name = profile) sqs_client = session.client("sqs") queue_url = "https://ap-northeast-1.queue.amazonaws.com/xxxxxxxxxxxx/samplesqs-SampleSQS-XXXXXXXXXXXX" # 送信 idx = 1 while idx <= 10: # 10個のメッセージを送信 message = {"msg": "Hello, world!", "foo": idx} sqs_client.send_message( QueueUrl = queue_url, MessageBody = json.dumps(message), ) idx += 1 # 受信 while True: res = sqs_client.receive_message( QueueUrl = queue_url, ) # 受信したものをなにか処理 (このサンプルでは表示するだけ) delete_entries = [] # 処理済みメッセージ一覧 id = 1 for msg in res.get("Messages", []): message = json.loads(msg["Body"]) print(message) delete_entries.append({"Id": str(id), "ReceiptHandle": msg["ReceiptHandle"]}) id += 1 # 処理済みメッセージを削除 if len(delete_entries) > 0: # 0件でdelete_message_batchを呼び出すとエラーになる sqs_client.delete_message_batch( QueueUrl = queue_url, Entries = delete_entries, ) else: break
実行してみます。
$ python sample.py {'msg': 'Hello, world!', 'foo': 3} {'msg': 'Hello, world!', 'foo': 6} {'msg': 'Hello, world!', 'foo': 1} {'msg': 'Hello, world!', 'foo': 2} {'msg': 'Hello, world!', 'foo': 4} {'msg': 'Hello, world!', 'foo': 10} {'msg': 'Hello, world!', 'foo': 7} {'msg': 'Hello, world!', 'foo': 5} {'msg': 'Hello, world!', 'foo': 8} {'msg': 'Hello, world!', 'foo': 9}
できました。順序は保証されていないので、受信したメッセージの順番はめちゃめちゃです。
Vue.js / Nuxt.js / Vuexの書き方のメモ
すぐに忘れてしまうのでテンプレートを残しておきます。
Vue.js / Nuxt.js
import Element from "~/components/element.vue"; export default { components: { "element": Element, }, data() { return { field1: value1, field3: value3, } }, computed: { field2() { return value2; }, field3: { get() { return this.field3Impl; }, set(newVal) { this.field3Impl = newVal; }, }, }, methods: { method1() { // なにか }, }, created() { // なにか }, beforeRouteUpdate(to, from, next) { // to.path が新しいURLのパス部分 next(); // 最後に呼ぶ }, mounted() { // なにか }, beforeDestroy() { // なにか }, }
Vuex
- state
- 値の集まり、ステータス
- mutations
- ステータスの操作
- 定義の例:
increment(state, n)
- 呼び出しの例:
context.commit("increment", 10)
- actions
- 非同期処理を含む任意の処理
- 定義の例:
increment (context, n)
- 呼び出しの例:
context.dispatch("increment", n)
Vue.js
const store = new Vuex.Store({ state: { count: 0, }, mutations: { increment(state, n) { // なにか state.count += n; }, }, actions: { increment(context, n) { // なにか context.commit("increment", n); }, }, }
Nuxt.js
export const state = () => ({ count: 0, }); export const mutations = { increment(state, n) { // なにか state.count += n; }, }; export const actions = { increment(context, n) { // なにか context.commit("increment", n); }, };
Python/Perlからdiffコマンドを起動して2つの文字列の差分を表示
PythonまたはPerlからfork, execしてdiffコマンドを起動し、2つの文字列の差分を表示させるサンプルコードです。
diffコマンドへの文字列受け渡しは名前付きパイプを使っています。diffしたいだけなのに3つも子プロセスを起動しちゃってます。
Python
import os import sys import tempfile def execDiff(content1, content2, name1 = "1", name2 = "2"): sys.stdout.flush() # execDiff呼び出す前に標準出力のバッファリングが残っていると # fork後に重複して出力される # FIFOを置くテンポラリディレクトリ作成 tmpdir = tempfile.mkdtemp() # FIFO作成 fifo1 = tmpdir + "/" + name1 fifo2 = tmpdir + "/" + name2 os.mkfifo(fifo1) os.mkfifo(fifo2) # diffコマンド起動 pid = os.fork() if pid == 0: os.execvp("diff", ["diff", "-u", fifo1, fifo2]) # 2つの文字列をdiffコマンドに渡すためのプロセスをfork pid1 = os.fork() if pid1 == 0: writer1 = open(fifo1, "w") writer1.write(content1) writer1.close() sys.exit() pid2 = os.fork() if pid2 == 0: writer2 = open(fifo2, "w") writer2.write(content2) writer2.close() sys.exit() # 子プロセス終了まで待機 os.waitpid(pid1, 0) os.waitpid(pid2, 0) os.waitpid(pid, 0) # FIFOとテンポラリディレクトリ削除 os.remove(fifo1) os.remove(fifo2) os.rmdir(tmpdir) execDiff("abc\nghi\n", "abc\ndef\nghi\n", "before", "after")
$ python sample.py --- /tmp/tmpvvdrq8yq/before 2021-02-28 18:16:39.715880739 +0900 +++ /tmp/tmpvvdrq8yq/after 2021-02-28 18:16:39.715880739 +0900 @@ -1,2 +1,3 @@ abc +def ghi
Perl
use POSIX qw/mkfifo/; use File::Temp qw/tempdir/; sub execDiff { my ($content1, $content2, $name1, $name2) = @_; $name1 = "1" unless defined($name1); $name2 = "2" unless defined($name2); # FIFOを置くテンポラリディレクトリ作成 my $tmpdir = tempdir(); # FIFO作成 my $fifo1 = "$tmpdir/$name1"; my $fifo2 = "$tmpdir/$name2"; mkfifo($fifo1, 0700) or die $!; mkfifo($fifo2, 0700) or die $!; # diffコマンド起動 my $pid = fork; die $! unless defined($pid); if ($pid == 0) { exec("diff", "-u", $fifo1, $fifo2); } # 2つの文字列をdiffコマンドに渡すためのプロセスをfork my $pid1 = fork; die $! unless defined($pid1); if ($pid1 == 0) { open(my $writer1, ">", $fifo1) or die $!; print $writer1 $content1; close($writer1); exit(0); } my $pid2 = fork; die $! unless defined($pid2); if ($pid2 == 0) { open(my $writer2, ">", $fifo2) or die $!; print $writer2 $content2; close($writer2); exit(0); } # 子プロセス終了まで待機 waitpid($pid, 0); waitpid($pid1, 0); waitpid($pid2, 0); # FIFOとテンポラリディレクトリ削除 unlink($fifo1); unlink($fifo2); rmdir($tmpdir); } execDiff("abc\nghi\n", "abc\ndef\nghi\n", "before", "after");
$ perl sample.pl --- /tmp/i4YAOgOSeh/before 2021-02-28 18:16:56.985034423 +0900 +++ /tmp/i4YAOgOSeh/after 2021-02-28 18:16:56.985034423 +0900 @@ -1,2 +1,3 @@ abc +def ghi