本記事では、Raspberry Pi 5上でHailo-8L AIアクセラレータを活用し、リアルタイムで物体検出を行うプログラムの解説を行います。
Hailo-8Lは、組み込みAIデバイス向けの高性能な推論エンジンを提供し、このサンプルコードを通じてPythonでのAI推論パイプラインの構築方法を学ぶことができます。
GStreamerを使用したビデオ処理や、HailoのAPIを利用した推論結果の取得など、エッジデバイスでのAI活用を実現するための実践的な例を紹介します。
今回使用するHailo-8L AIアクセラレータのハードウェアについては以下の記事で詳細を解説していますので、あわせてご覧ください。
環境
ハードウェア:Raspberry Pi 5
Hailo-8L AIアクセラレータ
Hailo-8L AIアクセラレータサンプルPythonコード解説
hailo-rpi5-examples
リポジトリは、Raspberry Pi 5上でHailo-8L AIアクセラレータを活用するためのサンプルプロジェクトを提供しています。
オブジェクト検出やポーズ推定、インスタンスセグメンテーションなどのPythonベースのパイプラインが含まれており、HailoのPython APIの使い方や、Raspberry Piカメラライブラリとの統合方法、Hailo Dataflow Compilerを使用したモデル展開手順が示されています。
エンベデッドデバイスでのAI活用を支援するリソースです。
コードの概要
このコードは、Raspberry Pi 5上でHailo-8L AIアクセラレータを利用して、リアルタイムに物体検出を行うプログラムです。
GStreamerを使用してビデオストリームを処理し、Hailoの推論エンジンを利用して検出結果を取得します。
コードの構造は、ユーザー定義のコールバッククラスとコールバック関数、GStreamerパイプラインのセットアップで構成されます。
インポートと初期設定
最初に必要なモジュールやライブラリをインポートしています。
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
gi
ライブラリを使用してGStreamerをインポートし、バージョンを指定しています。GStreamerはマルチメディアフレームワークで、ビデオのストリーミング処理に利用されます。numpy
とcv2
(OpenCV)は画像処理のために使用します。hailo
ライブラリはHailoの推論エンジン用で、Hailo-8L AIアクセラレータのインターフェースを提供します。hailo_rpi_common
からは、キャプス(フォーマットやサイズ)の取得やバッファ処理を支援する関数をインポートしています。
ユーザー定義のコールバッククラス
次に、app_callback_class
を継承してuser_app_callback_class
を作成しています。
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.new_variable = 42 # 新しい変数(例:42を初期値として設定)
def new_function(self): # 新しい関数の定義(例)
return "The meaning of life is: "
user_app_callback_class
は、ユーザーがカスタマイズできるクラスで、追加の変数や関数を定義できます。new_variable
という変数を追加して、初期値を42
に設定しています。これは例示用の値ですが、必要に応じて他の目的に使うこともできます。new_function
というメソッドを定義し、簡単な文字列を返す関数の例を示しています。これも例示的なものであり、アプリケーションに応じて適宜変更可能です。
コールバック関数
コールバック関数app_callback
は、GStreamerパイプラインからデータが利用可能になったときに呼び出されます。この関数では、以下の処理を行います。
def app_callback(pad, info, user_data):
# GstBuffer(メディアデータのバッファ)をプローブ情報から取得
buffer = info.get_buffer()
# バッファが有効かどうかを確認
if buffer is None:
return Gst.PadProbeReturn.OK # 無効な場合は処理を継続
# user_dataを使ってフレーム数をカウント
user_data.increment()
string_to_print = f"Frame count: {user_data.get_count()}\n" # フレーム数を表示する文字列を作成
# パッドからキャプス(フォーマット、幅、高さ)を取得
format, width, height = get_caps_from_pad(pad)
# user_data.use_frameがTrueの場合、ビデオフレームをバッファから取得可能
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
# ビデオフレームを取得
frame = get_numpy_from_buffer(buffer, format, width, height)
- GstBufferの取得:
info.get_buffer()
を使って、パイプラインから送られてきたメディアデータのバッファを取得します。バッファが無効(None)の場合は、そのまま処理を継続します。
- フレーム数のカウント:
user_data.increment()
でフレーム数をカウントし、現在のフレーム数を表示するための文字列を作成します。
- 画像サイズの取得:
get_caps_from_pad
関数を使って、パッドからフォーマット、幅、高さを取得します。これにより、ビデオデータの解像度や形式を把握できます。
- ビデオフレームの取得:
user_data.use_frame
がTrue
の場合、バッファからビデオフレームを取得して、後の処理で使用します。
検出結果の解析
次に、バッファから物体検出の結果を取得し、それを解析します。
# バッファから検出結果を取得
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# 検出結果を解析
detection_count = 0
for detection in detections:
label = detection.get_label() # 検出されたオブジェクトのラベルを取得
bbox = detection.get_bbox() # バウンディングボックスを取得
confidence = detection.get_confidence() # 信頼度を取得
if label == "person": # ラベルが「person」の場合
string_to_print += f"Detection: {label} {confidence:.2f}\n" # 検出結果を表示
detection_count += 1
hailo.get_roi_from_buffer
関数でバッファからROI(Region of Interest)を取得し、get_objects_typed
を使って物体検出結果をリスト形式で取得します。- 各検出結果について、ラベル、バウンディングボックス、信頼度を取得し、検出されたオブジェクトが「person」(人)の場合に、検出結果をコンソールに表示します。
フレームに検出結果を表示
フレームを取得している場合には、検出結果をフレームに描画します。
if user_data.use_frame:
# 注意:imshowを使うとメインスレッドでないため正しく表示できない
# フレームに検出数を表示
cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# 新しい変数と新しい関数の結果をフレームに表示
cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# フレームをBGR形式に変換
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(frame) # 更新したフレームを設定
cv2.putText
を使用して、検出数やユーザー定義のテキストをフレームに描画します。- 最後にフレームを
RGB
からBGR
形式に変換し、user_data.set_frame
でフレームを更新します。
メイン処理の実行
最後に、メイン処理部分でGStreamerパイプラインをセットアップし、アプリケーションを実行します。
if __name__ == "__main__":
# ユーザー定義コールバッククラスのインスタンスを作成
user_data = user_app_callback_class()
# GStreamerDetectionAppを初期化し、パイプラインを実行
app = GStreamerDetectionApp(app_callback, user_data)
app.run() # アプリケーションを実行
user_app_callback_class
のインスタンスを作成し、GStreamerDetectionApp
を初期化します。app.run()
でGStreamerパイプラインを開始し、リアルタイムで推論処理を実行します。
全体のソースコード
全体のソースコードは以下の通りです。
処理内容がわかりやすいよう、日本語でコメントを追加しています。
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
# -----------------------------------------------------------------------------------------------
# ユーザー定義のコールバッククラス
# -----------------------------------------------------------------------------------------------
# app_callback_classを継承してカスタマイズしたクラスを作成
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.new_variable = 42 # 新しい変数(例:42を初期値として設定)
def new_function(self): # 新しい関数の定義(例)
return "The meaning of life is: " # 返り値として意味を持つ文字列を返す
# -----------------------------------------------------------------------------------------------
# ユーザー定義のコールバック関数
# -----------------------------------------------------------------------------------------------
# この関数は、パイプラインからデータが利用可能になったときに呼び出されるコールバック関数
def app_callback(pad, info, user_data):
# GstBuffer(メディアデータのバッファ)をプローブ情報から取得
buffer = info.get_buffer()
# バッファが有効かどうかを確認
if buffer is None:
return Gst.PadProbeReturn.OK # 無効な場合は処理を継続
# user_dataを使ってフレーム数をカウント
user_data.increment()
string_to_print = f"Frame count: {user_data.get_count()}\n" # フレーム数を表示する文字列を作成
# パッドからキャプス(フォーマット、幅、高さ)を取得
format, width, height = get_caps_from_pad(pad)
# user_data.use_frameがTrueの場合、ビデオフレームをバッファから取得可能
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
# ビデオフレームを取得
frame = get_numpy_from_buffer(buffer, format, width, height)
# バッファから検出結果を取得
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# 検出結果を解析
detection_count = 0
for detection in detections:
label = detection.get_label() # 検出されたオブジェクトのラベルを取得
bbox = detection.get_bbox() # バウンディングボックスを取得
confidence = detection.get_confidence() # 信頼度を取得
if label == "person": # ラベルが「person」の場合
string_to_print += f"Detection: {label} {confidence:.2f}\n" # 検出結果を表示
detection_count += 1
if user_data.use_frame:
# 注意:imshowを使うとメインスレッドでないため正しく表示できない
# フレームに検出数を表示
cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# 新しい変数と新しい関数の結果をフレームに表示
cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# フレームをBGR形式に変換
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(frame) # 更新したフレームを設定
print(string_to_print) # 検出結果をコンソールに表示
return Gst.PadProbeReturn.OK # 処理を続行
if __name__ == "__main__":
# ユーザー定義コールバッククラスのインスタンスを作成
user_data = user_app_callback_class()
# GStreamerDetectionAppを初期化し、パイプラインを実行
app = GStreamerDetectionApp(app_callback, user_data)
app.run() # アプリケーションを実行
まとめ
本記事では、Raspberry Pi 5とHailo-8L AIアクセラレータを使用した物体検出プログラムの詳細を解説しました。このコードを通じて、リアルタイムのAI推論を組み込みデバイス上で実現する方法を理解できたでしょう。
Hailo-8Lの強力な推論能力により、エッジデバイスでのAIアプリケーション開発が一層容易になり、今後のAIプロジェクトに役立つ実践的なスキルを身につけることができます。
また、私が使用しているHailo-8Lアクセラレーターを搭載できるRaspberry Pi 5用ケースPironman5もおすすめです。
冷却効率も非常に高く、機械学習の推論処理など高負荷のアプリケーションの運用にも最適です。
それでは、また次の記事でお会いしましょう。
コメント