近年、エッジデバイスでの高速な物体検出が注目されています。
今回は、Raspberry Pi 5にUSB接続されたWebカメラからリアルタイムに映像を取得し、Hailo-8L AIアクセラレータを使用して物体検出を行うPythonスクリプトについて解説します。
このスクリプトでは、YOLO(You Only Look Once)モデルを使用し、高速かつ効率的な物体検出を実現しています。
Raspberry Pi 5 Hailo-8L AIアクセラレータとは
Raspberry Pi 5とHailo-8L AIアクセラレータは、エッジデバイスでのリアルタイムAI推論を可能にする強力な組み合わせです。
Raspberry Pi 5の小型で汎用性のある設計に、Hailo-8Lが持つ高性能なニューラルネットワーク推論機能を加えることで、低消費電力で高速な物体検出や画像分類が実現します。
Hailo-8Lは高い処理効率を誇り、重いAI処理をエッジで行う用途に最適です。この組み合わせにより、IoTやスマートデバイスのリアルタイムAI対応が手軽に構築可能となります。
システム構成
今回作成したアプリケーションを実行するにあたり以下のようなハードウェア構成を用意しました。
Webカメラ
今回使用したロジクールのWebCam C270は、USB接続に対応した手頃な価格のウェブカメラで、Raspberry Piなどのデバイスで簡単にセットアップできます。
最大解像度は720p(1280×720ピクセル)で、30fpsのフレームレートで映像を提供するため、ビデオ通話やAIプロジェクトのリアルタイム映像取得に適しています。また、ほとんどのLinuxディストリビューションで追加ドライバーなしで動作し、手軽に使用可能です。
AIアクセラレータ
今回、推論処理を高速化するために使用するのはRaspberry Pi 5用に発売されているAI Kitに含まれるHailo-8L AIアクセラレータを使用します。(HATは使用していません)
詳細は以下の記事で解説しています。
ケース(AIアクセラレータ拡張スロット)
今回はHailo-8L AIアクセラレータを搭載可能な拡張スロットを持つRaspberry Pi用ケースPironman5を使用しました。
事前準備
Hailo-8L AIアクセラレータを使用するにあたり、開発元が公開している以下の公式リポジトリにあるサンプルプログラムを実行できる環境を構築しておいてください。
具体的な手順は以下の記事で解説しています。
Pyrhonコード解説
今回作成したコードは、Raspberry Pi 5に接続されたWebカメラからリアルタイムで映像を取得し、Hailo-8L AIアクセラレータを使って物体検出を行うPythonプログラムです。
コードでは、GStreamerパイプラインを用いて映像の取得・処理を効率化し、YOLOモデルで推論を実行して検出結果をオーバーレイ表示します。これにより、カメラの映像に映る物体(例:「person」)をリアルタイムで検出し、検出数やメッセージを映像上に表示できるようにしています。
必要なライブラリとモジュールのインポート
最初に、スクリプトで使用するライブラリやモジュールをインポートします。これには、GStreamerとそのPythonバインディング、OpenCV、Hailo SDK、その他のヘルパーモジュールが含まれます。
- GStreamer: マルチメディアフレームワークで、映像の取得、処理、表示を効率的に行います。
- OpenCV: コンピュータビジョンライブラリで、フレームへのテキスト描画や画像処理に使用します。
- Hailo SDK: Hailo-8L AIアクセラレータを制御し、推論を行うためのSDKです。
ユーザー定義のコールバッククラスと関数の作成
コールバッククラス
user_app_callback_class
は、フレームごとの情報を保持し、必要なデータや関数を提供するためのクラスです。このクラスは、既存のapp_callback_class
を継承し、ユーザー独自の変数や関数を追加できます。
- 新しい変数の追加: 例えば、
new_variable
という変数を追加しています。 - 新しい関数の追加:
new_function
という関数を定義し、カスタムメッセージを返すようにしています。
コールバック関数
app_callback
は、GStreamerパイプラインからフレームデータが到着したときに呼び出される関数です。この関数では、以下の処理を行います。
- フレームデータの取得: GStreamerのバッファから現在のフレームを取得します。
- フレームカウントの更新: 処理したフレーム数をカウントし、ログに出力します。
- 映像情報の取得: フレームのフォーマット、幅、高さなどの情報を取得します。
- フレームの加工: 必要に応じて、フレームをNumPy配列に変換します。
- 物体検出結果の取得: Hailo SDKを使用して、フレーム内の物体検出結果を取得します。
- 検出結果の解析: 検出された物体のラベル、位置、信頼度を解析し、特定のクラス(例:”person”)に対する処理を行います。
- 情報の表示: 検出数やカスタムメッセージをフレームに描画します。
- フレームの更新: 加工したフレームを
user_data
にセットし、後続の処理で使用できるようにします。
GStreamerアプリケーションクラスの定義
GStreamerDetectionApp
クラスは、GStreamerパイプラインを構築し、映像の取得から推論、表示までの一連の処理を管理します。
初期化処理
- ソースタイプとビデオソースの設定: コマンドライン引数から、使用するビデオソース(USBカメラ、Raspberry Piカメラ、ビデオファイル)とそのパスを取得します。
- Hailoのパラメータ設定: 使用するモデルに応じて、バッチサイズ、入力画像の幅と高さ、フォーマット、NMS(Non-Maximum Suppression)のしきい値などを設定します。
- HEFファイルのパス設定: 選択したモデルに対応するHEF(Hailo推論エンジンファイル)のパスを設定します。
- ラベルファイルの設定: 必要に応じて、カスタムのラベルJSONファイルを指定できます。
- コールバック関数の設定: 前述の
app_callback
関数を設定します。 - パイプラインの作成:
create_pipeline
メソッドを呼び出して、GStreamerパイプラインを構築します。
パイプラインの構築
get_pipeline_string
メソッドで、GStreamerパイプラインの文字列を生成します。パイプラインは以下の要素で構成されています。
- 映像ソースの設定: ソースタイプに応じて、USBカメラ、Raspberry Piカメラ、ビデオファイルから映像を取得する要素を設定します。
- 前処理: 映像をモデルの入力サイズやフォーマットに合わせて変換します。具体的には、解像度の変更やフォーマットの変換を行います。
- 推論処理:
hailonet
要素を使用して、Hailo-8L AIアクセラレータで推論を行います。 - 後処理:
hailofilter
要素で推論結果の後処理を行い、検出結果を取得します。 - コールバックの挿入:
identity
要素を使用して、app_callback
関数を呼び出します。 - オーバーレイと表示:
hailooverlay
要素で検出結果を映像にオーバーレイし、fpsdisplaysink
で映像を表示します。
コマンドライン引数の解析
スクリプトは、以下のコマンドライン引数を受け取ります。
--network
: 使用するモデルを指定します。デフォルトはyolov6n
です。--hef-path
: カスタムのHEFファイルを指定できます。--labels-json
: カスタムのラベルJSONファイルを指定できます。--source-type
: ビデオソースのタイプを指定します(usb
、rpi
、file
)。デフォルトはusb
です。--video-source
: ビデオソースのデバイスファイルやファイルパスを指定します。デフォルトは/dev/video0
です。
これらの引数を解析し、アプリケーションの設定に反映させます。
メイン処理の実行
スクリプトのエントリーポイントでは、以下の処理を行います。
- ユーザーコールバッククラスのインスタンス作成:
user_app_callback_class
のインスタンスを作成します。 - コマンドライン引数の解析: 前述の引数を解析し、設定を取得します。
- アプリケーションのインスタンス作成:
GStreamerDetectionApp
のインスタンスを作成し、必要な設定とユーザーデータを渡します。 - パイプラインの実行:
app.run()
を呼び出して、GStreamerパイプラインの実行を開始します。
作成した全体のソースコード
作成した全体のソースコードは以下の通りです。
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import hailo
from hailo_rpi_common import (
get_default_parser,
QUEUE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
)
# -----------------------------------------------------------------------------------------------
# ユーザー定義のコールバッククラス
# -----------------------------------------------------------------------------------------------
# app_callback_classを継承したクラスを定義
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.new_variable = 42 # 新しい変数の例
def new_function(self): # 新しい関数の例
return "The meaning of life is: "
# -----------------------------------------------------------------------------------------------
# ユーザー定義のコールバック関数
# -----------------------------------------------------------------------------------------------
# パイプラインからデータが利用可能になったときに呼び出されるコールバック関数
def app_callback(pad, info, user_data):
# probe infoからGstBufferを取得
buffer = info.get_buffer()
# bufferが有効かチェック
if buffer is None:
return Gst.PadProbeReturn.OK
# user_dataを使用してフレーム数をカウント
user_data.increment()
string_to_print = f"Frame count: {user_data.get_count()}\n"
# パッドからキャプス(形式、幅、高さ)を取得
format, width, height = get_caps_from_pad(pad)
# user_data.use_frameがTrueの場合、バッファからビデオフレームを取得
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
# ビデオフレームを取得
frame = get_numpy_from_buffer(buffer, format, width, height)
# バッファから検出結果を取得
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# 検出結果を解析
detection_count = 0
for detection in detections:
label = detection.get_label() # ラベルを取得
bbox = detection.get_bbox() # バウンディングボックスを取得
confidence = detection.get_confidence() # 信頼度を取得
if label == "person":
string_to_print += f"Detection: {label} {confidence:.2f}\n"
detection_count += 1
if user_data.use_frame:
# メインスレッドでないため、imshowは使用できない
# フレームに検出数を表示
cv2.putText(frame, f"Detections: {detection_count}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# user_dataのnew_variableとnew_functionを使用する例
# フレームにカスタムメッセージを表示
cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# フレームをBGRに変換
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# フレームをuser_dataにセット
user_data.set_frame(frame)
# 結果をコンソールに出力
print(string_to_print)
return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# ユーザー定義のGStreamerアプリケーション
# -----------------------------------------------------------------------------------------------
# hailo_rpi_common.GStreamerAppクラスを継承したクラスを定義
class GStreamerDetectionApp(GStreamerApp):
def __init__(self, args, user_data):
# 親クラスのコンストラクタを呼び出す
super().__init__(args, user_data)
# 追加の初期化コードをここに追加できる
# --- 変更点: ソースタイプとビデオソースを設定 ---
self.source_type = args.source_type # ソースタイプを設定(usb, rpi, file)
self.video_source = args.video_source # ビデオソースを設定(デバイスファイルやファイルパス)
# モデルに基づいて設定すべきHailoパラメータを設定
self.batch_size = 2
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
nms_score_threshold = 0.3 # NMSのスコアしきい値
nms_iou_threshold = 0.45 # NMSのIoUしきい値
# 一時的なコード: 新しいポストプロセスの.soファイルが存在するかチェック
new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
if os.path.exists(new_postprocess_path):
self.default_postprocess_so = new_postprocess_path
else:
self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
if args.hef_path is not None:
self.hef_path = args.hef_path
# ネットワークに基づいてHEFファイルのパスを設定
elif args.network == "yolov6n":
self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
elif args.network == "yolov8s":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
elif args.network == "yolox_s_leaky":
self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
else:
assert False, "Invalid network type"
# ユーザー定義のラベルJSONファイル
if args.labels_json is not None:
self.labels_config = f' config-path={args.labels_json} '
else:
self.labels_config = ''
# コールバック関数を設定
self.app_callback = app_callback
# NMSのしきい値などを文字列として設定
self.thresholds_str = (
f"nms-score-threshold={nms_score_threshold} "
f"nms-iou-threshold={nms_iou_threshold} "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)
# プロセスタイトルを設定
setproctitle.setproctitle("Hailo Detection App")
# パイプラインを作成
self.create_pipeline()
# パイプラインの文字列を生成するメソッド
def get_pipeline_string(self):
# ソースタイプに応じてsource_elementを設定
if self.source_type == "rpi":
source_element = (
"libcamerasrc name=src_0 ! "
f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
+ QUEUE("queue_src_scale")
+ "videoscale ! "
f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
)
elif self.source_type == "usb":
source_element = (
f"v4l2src device={self.video_source} name=src_0 ! "
"video/x-raw, width=640, height=480, framerate=30/1 ! "
)
else:
source_element = (
f"filesrc location=\"{self.video_source}\" name=src_0 ! "
+ QUEUE("queue_dec264")
+ " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
" video/x-raw, format=I420 ! "
)
# 共通のパイプライン要素を追加
source_element += QUEUE("queue_scale")
source_element += "videoscale n-threads=2 ! "
source_element += QUEUE("queue_src_convert")
source_element += "videoconvert n-threads=3 name=src_convert qos=false ! "
source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "
# 全体のパイプライン文字列を構築
pipeline_string = (
"hailomuxer name=hmux "
+ source_element
+ "tee name=t ! "
+ QUEUE("bypass_queue", max_size_buffers=20)
+ "hmux.sink_0 "
+ "t. ! "
+ QUEUE("queue_hailonet")
+ "videoconvert n-threads=3 ! "
f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
+ QUEUE("queue_hailofilter")
+ f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! "
+ QUEUE("queue_hmuc")
+ "hmux.sink_1 "
+ "hmux. ! "
+ QUEUE("queue_hailo_python")
+ QUEUE("queue_user_callback")
+ "identity name=identity_callback ! "
+ QUEUE("queue_hailooverlay")
+ "hailooverlay ! "
+ QUEUE("queue_videoconvert")
+ "videoconvert n-threads=3 qos=false ! "
+ QUEUE("queue_hailo_display")
+ f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
)
# パイプライン文字列を出力
print(pipeline_string)
return pipeline_string
if __name__ == "__main__":
# ユーザーコールバッククラスのインスタンスを作成
user_data = user_app_callback_class()
parser = get_default_parser()
# 追加の引数をここで追加
parser.add_argument(
"--network",
default="yolov6n",
choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
help="使用するネットワーク。デフォルトはyolov6n。",
)
parser.add_argument(
"--hef-path",
default=None,
help="HEFファイルへのパス。",
)
parser.add_argument(
"--labels-json",
default=None,
help="カスタムラベルJSONファイルへのパス。",
)
# --- 変更点: ソースタイプとビデオソースの引数を追加 ---
parser.add_argument(
"--source-type",
default="usb",
choices=['usb', 'rpi', 'file'],
help="ビデオソースのタイプ: 'usb'(USBカメラ)、'rpi'(Raspberry Piカメラ)、または'file'(ビデオファイル)",
)
parser.add_argument(
"--video-source",
default="/dev/video0",
help="ビデオソースのデバイス(例: '/dev/video0')またはファイルパス",
)
# 引数をパース
args = parser.parse_args()
# アプリケーションのインスタンスを作成
app = GStreamerDetectionApp(args, user_data)
# アプリケーションを実行
app.run()
こちらのコードをrealtime_webcam_yolo_detection.py
というファイル名で使用します。
実行方法
ターミナルで以下のコマンドを実行してください。
python realtime_webcam_yolo_detection.py
デフォルトでは、USBカメラ(/dev/video0
)から映像を取得し、yolov6n
モデルを使用してオブジェクト検出を行います。
オプションの引数
スクリプトは以下のコマンドライン引数をサポートしています。
--source-type
: ビデオソースのタイプを指定します。選択肢は以下の通りです。'usb'
: USBカメラを使用(デフォルト)'rpi'
: Raspberry Piカメラを使用'file'
: ビデオファイルを使用
--video-source
: ビデオソースのデバイスファイルまたはファイルパスを指定します。デフォルトは'/dev/video0'
です。--network
: 使用するモデルを指定します。選択肢は以下の通りです。'yolov6n'
(デフォルト)'yolov8s'
'yolox_s_leaky'
--hef-path
: カスタムのHEFファイルへのパスを指定します。--labels-json
: カスタムのラベルJSONファイルへのパスを指定します。
使用例
別のUSBカメラデバイスを使用する場合
python realtime_webcam_yolo_detection.py --video-source /dev/video1
yolov8s
モデルを使用する場合
python realtime_webcam_yolo_detection.py --network yolov8s
ビデオファイルをソースとして使用する場合
python realtime_webcam_yolo_detection.py --source-type file --video-source /path/to/video.mp4
注意事項
- 依存関係の確認: スクリプトを実行する前に、以下のライブラリやパッケージが正しくインストールされていることを確認してください。
- Python 3.x
- Hailo SDK
- GStreamerおよび関連プラグイン
- OpenCV (
cv2
モジュール) - その他、
gi
,setproctitle
,numpy
などの必要なパッケージ
- デバイスの確認: USBカメラが正しく接続されており、デバイスファイル(例:
/dev/video0
)が存在することを確認してください。デバイスファイルは以下のコマンドで確認できます。bashコードをコピーするls /dev/video*
- 権限の確認: 必要に応じて、カメラデバイスへのアクセス権限を設定してください。
- Hailo-8L AIアクセラレータの接続: Hailo-8Lデバイスが正しく接続されており、ドライバやファームウェアが適切に設定されていることを確認してください。
実行結果
先ほどのPythonコードを実行すると以下のようにカメラの映像に対してリアルタイムにオブジェクト検出が可能になりました。
まとめ
このスクリプトは、Raspberry Pi 5とHailo-8L AIアクセラレータを組み合わせることで、エッジデバイス上での高速かつリアルタイムな物体検出を実現しています。GStreamerを活用することで、映像処理の各ステップを効率的にパイプライン化し、ハードウェアの性能を最大限に引き出しています。
この実装をベースに、さらなる機能拡張や応用が可能です。例えば、特定の物体の検出に特化したアプリケーションや、検出結果を他のシステムと連携させるIoTアプリケーションなどが考えられます。
コメント