今回はPythonでマルチプロセスのプログラムを実行する際に行う、プロセス間通信の実装方法について書きたいと思います。ライブラリはmultiprocessingを使用します。
これまで書いた記事ではQueueを使った方法でしたが、今回は共有メモリー(Array)を使用して通信を行ってみました。
また、記事の中ではRaspberry Pi 3を使って実装していますが、マルチコアのCPUを搭載していれば、デスクトップPCなど他のハードウェアでも使用できます。
環境
ハードウェア:Raspberry Pi 3 ModelB
OS:Raspbian 10.11
Python:3.7.3
Raspberry Pi(ラズベリーパイ)とは
Raspberry Pi 5は、Raspberry Pi財団が開発した最新のシングルボードコンピュータで、従来モデルよりも大幅な性能向上を実現しています。
搭載される64ビットクアッドコアプロセッサは、クロック速度が最大2.4GHzに達し、前世代よりも処理速度が格段に向上しました。また、4GBまたは8GBのRAMを選択できるため、教育用途からリソースを必要とするプロジェクトまで幅広く対応可能です。
グラフィックス性能も強化され、4K解像度でのデュアルディスプレイ出力をサポートしています。さらに、新たにPCIeインターフェイスが追加され、外部ストレージや高速デバイスとの接続が容易になりました。Wi-Fi 6やBluetooth 5.2の導入により、無線通信も高速かつ安定。
教育、IoT、AI開発、メディア再生など、多様な用途に対応するRaspberry Pi 5は、初心者から上級者まで幅広いユーザーにとって魅力的な選択肢となっています。
Raspberry Piでできることについては以下の記事で解説していますので、あわせてご覧ください。
やりたいこと
当ブログでは過去にPython3のmultiprocessingを使用し、マルチプロセスでの並列処理を行う記事を公開しています。
これまで公開したプログラムでは、プロセス間でのデータの受け渡しにmultiprocessingのQueue(キュー)を使用してきました。Queueを使ったデータの受け渡しでは、データは先入れ先出しになります。
非同期に実行されているプロセス間で、すべてのデータを確実に相手のプロセスに渡したい場合はQueueを使用すると良いのですが、Raspberry Piで行っているロボット制御では少し事情が違ってきます。
例えば、画像処理プロセスで取得した情報をモーター制御プロセスに渡したい場合があるとします。画像処理プロセスはカメラのフレームレート毎に処理を実行した場合、一秒間に30回データを出力します。
しかし、モーター制御プロセスはロボットの1モーションが1秒以上かかるため、Queueのデータを読み出しに行く周期もそれ以上の時間になります。
このような場合、Queueにどんどんデータが溜まっていってしまい、その瞬間の画像処理結果を読み出したい場合でも、欲しいデータにすぐにアクセスできません。
そこで今回はQueueではなく、共有メモリー(Array)を使った瞬時値の受け渡しを行ってみたいと思います。
共有メモリー(Array)とは
共有メモリーを使ったプロセス間通信では、ある決められたメモリー領域を複数のプロセスで共有し、別々のプロセスから書き込み、読み出しをすることでデータを受け渡します。
一次元配列でのデータの受け渡しが可能ですが、今回は1変数のみで使用します。
@pocokhcさんのページでmultiprocessingで使用可能なプロセス間通信の時間計測など、詳細なデータがありますので参考にしてみてください。
メモリアクセス時の排他処理
共有メモリーを使った通信では、メモリを書き込むタイミングとメモリを読み出すタイミングがバッティングした場合、正常に値を読み出せない場合があります。
そのため、以下のようなフラグを使った排他処理を実装しました。
- 許可フラグをFalseに設定します
- 読み出し側のプロセスは参照許可フラグを確認し、False(読み出し禁止)の場合は一定時間をおいて再度フラグを確認します。
- 参照ができない状態になっている間に、書き込み側プロセスがメモリの値を書き換えます。そして書き込みが完了したら、参照許可フラグをTrue(読み出し可能)に変更します。
- 読み出し側プロセスが参照許可フラグがTrueになっているのを確認後、データの読み出しを行います。
作成したソースコード
以下のようなソースコードを作成してみました。
sleepメソッドについては、ログを確認しやすくするためのものですので、実際に使用するプログラムでは削除してください。(入ったままだと処理が遅くなります)
~~処理の流れ~~
プロセス①では、共有メモリーに書き込みを行う「ps_write」というメソッドを実行します。
プロセス②では、共有メモリーから読み出しを行う「ps_read」というメソッドを実行します。
初期状態では共有メモリーにはInt型の「5」が格納されており、参照許可フラグは「True」(読み出し可能)に設定されていますので、プロセス②が初期値の「5」という値を読み出すことができます。
その後、プロセス①が参照許可フラグを「False」(読み出し禁止)に変更し、メモリーへ書き込みを行います。
その間、プロセス②はメモリーにアクセスできず、待ち状態となります。
そしてプロセス②が共有メモリーの値を「10」に書き換え、参照許可フラグを「True」に戻すと、プロセス①が共有メモリーから「10」を読み出します。
import multiprocessing as mp
import ctypes
from ntpath import join
from time import sleep
#共有メモリ書き込み処理
def ps_write(*args):
val = args[0] #データのアドレス
flag = args[1] #フラグのアドレス
sleep(2) #ログ確認のためのディレイ
#書き込み開始
flag.value = False
val[0] = 10
sleep(2) #ログ確認のためのディレイ
#書き込み終了
flag.value = True
#共有メモリ参照処理
def ps_read(*args):
val = args[0]
flag = args[1]
while True:
if not flag.value: # データが変わるまで待つ
print("prosess wait")
sleep(0.1)
continue
#共有メモリからデータを初見出し
data = val[0]
print("process2 = ", data)
if __name__ == '__main__':
#パラメータサイズ
size = 1
#プロセス間通信用の共有メモリーを生成
arr = mp.Array(ctypes.c_int, [0]*size )
flag = mp.Value(ctypes.c_bool, False)
#パラメータ初期値
arr[0] = 5 #初期値
flag.value = True #読み出し可能状態
#プロセスを生成
p1 = mp.Process(target=ps_write, args=(arr, flag)) #プロセス①
p2 = mp.Process(target=ps_read, args=(arr,flag)) #プロセス②
#プロセス実行
p1.start()
p2.start()
#プロセスを終了
p1.join()
p2.join()
9~10行目:
共有メモリーのアドレス情報を引数として受け取ります。ここで受け取っているのは変数の実態ではなく、間接的な参照先となりますので、C言語のポインタをイメージされると良いと思います。
14行目:
参照許可フラグを「False」に設定し、読み出し禁止状態にします。
15行目:
共有メモリーの値を書き換えます。
20行目:
参照許可フラグを「True」に設定し、別プロセスからの共有メモリーの読み出しを許可します。
25~26行目:
共有メモリーのアドレス情報を引数として受け取ります。
29行目:
参照許可フラグを確認します。「False」の場合には、フラグが「True」に代わるまでwhile文でループして待機します。
「True」に変わったら、読み出し処理に移行します。
35行目:
共有メモリーの値を読み出します。
42行目:
共有メモリーには一次元配列が格納できるため、格納するデータサイズを指定します。今回は1データのみなので「1」としました。
45行目:
プロセス間通信で受け渡す値を格納するためのArrayを作成します。型はCのint型です。
46行目:
共有メモリーにアクセスする際の排他制御を行うためのフラグを作成します。型はCのbool型です。
49~50行目:
共有メモリーに格納する値の初期値を代入します。
53~54行目:
プロセスを生成します。
57~58行目:
プロセスを実行します。
実行結果
以下が実行時のターミナルの画像です。
初期状態は参照許可フラグが「True」となっていますので、初期値の5を読み出すことができています。
その後、プロセス①が参照許可フラグを「False」に変更したため、プロセス②は待ち(wait)状態になっているのがわかります。
そして最後に、プロセス①が書き込み終了後、参照許可フラグを「True」に戻したため、プロセス②は共有メモリーの書き換えられた値「10」を読み出すことができているのが確認できました。
\ Pythonを自宅で好きな時に学べる! /
まとめ
multiprocessingの共有メモリー(Array)を使ったプロセス間通信を行うことで、瞬時値や一次元配列でまとめたデータを転送できるようになりました。
通信方式によって、通信にかかる遅延時間等も変わってきますので、実装するシステムに合わせてキューや共有メモリーなどの手段を検討されると良いかと思います。
今後は共有メモリーを使って、Raspberry Pi ロボットの画像処理データをモーター制御プロセスにフィードバックしていけるようにシステムを構成していく予定です。
また、Pythonのプログラミングをさらに深く学びたい方にはUdemyの以下の講座がおすすめです。
また、以下の記事で効率的にPythonのプログラミングスキルを学べるプログラミングスクールの選び方について解説しています。最近ではほとんどのスクールがオンラインで授業を受けられるようになり、仕事をしながらでも自宅で自分のペースで学習できるようになりました。
スキルアップや副業にぜひ活用してみてください。
スクールではなく、自分でPythonを習得したい方には、いつでもどこでも学べる動画学習プラットフォームのUdemyがおすすめです。
講座単位で購入できるため、スクールに比べ非常に安価(セール時1200円程度~)に学ぶことができます。私も受講しているおすすめの講座を以下の記事でまとめていますので、ぜひ参考にしてみてください。
それでは、また次の記事でお会いしましょう。
\ Raspberry Piを使ったアプリ開発を学びたい人には自宅で学べるUdemyがおすすめ! /
講座単位で購入できます!
コメント