article icon gst-python使っています

heroImage

はじめに

IBEXブログでは先んじてGStreamerに関する記事を掲載させていただいていますが、今回のブログ内容はGStreamerの応用編となります。GStreamer連載記事と併せて読んでいただけると幸いです。GStreamerは多種多様なエレメントが用意されており様々なマルチメディアの処理が可能ですが、既存のエレメントだけでは実現できないようなアプリケーションを作成したい場合、ユーザ独自の機能を実装する必要があります。その方法は色々とありますが、ここではgst-pythonを取り上げたいと思います。このブログではgst-pythonの概要説明と簡単なgst-pythonを用いたサンプルデモを動かしてみます。

gst-pythonとは

gst-pythonはGStreamerをPythonで使用するためのバインディングです。インストール済みであるGStreamerのプラグインやエレメントがそのままPythonで使用できるようになります。またPythonスクリプトからGStreamerのパイプラインを構築したり、パイプラインを流れるデータへのアクセスしたりすることが可能となります。Pythonが使えるようになるメリットは何といっても豊富なライブラリが使用できるようになることです。GStreamerとPythonが融合することで、実現できるアプリケーションの可能性が広がります。

gst-pythonを使用するための環境構築

gst-pythonを使用するために、まずはGStreamerのインストールから行います。GStreamer を使おう! その1 ~GStreamer の世界へようこそ編~を参考にインストールを行います。Ubuntuの場合は以下のコマンドになります。

sudo apt install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 gstreamer1.0-qt5 gstreamer1.0-pulseaudio

次にpython3-giのインストールを行います。このインストールを行うことでGObjectベースのライブラリをPythonから使用できるようになります。環境構築は以上です。

sudo apt install python3-gi

gst-pythonを用いたサンプルを動かすための環境構築

OpenCVのインストール

早速ですが、gst-pythonを動かしてみたいと思います。今回動作確認用のソースコードを用意しました。このコードではOpenCVを使用するため、コード実行前にライブラリのインストールを行います。

pip3 install opencv-python

今回動作確認を行った環境はUbuntu20.04でPythonのバージョンは3.8.10です。ライブラリのバージョンは以下で動作確認を行いました。

Package       Version
------------- ---------
dbus-python   1.2.16
numpy         1.24.4
opencv-python 4.10.0.84
pip           20.0.2
PyGObject     3.36.0
setuptools    45.2.0
wheel         0.34.2

コードの用意

今回実行するコードです。詳細は後述します。

ソースコード
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import numpy as np
import cv2

# 初期化
Gst.init(None)

# バラメータ
WIDTH        = 640
HEIGHT       = 480
FRAMERATE    = "10/1"
OUTFILE_NAME = "output.mp4"

# バスメッセージを受信した際の処理
def handle_bus_messages(bus, message, loop, src):
    t = message.type
    src_name = message.src.get_name()
    if t == Gst.MessageType.EOS:
        print(f"{src_name} received End-Of-Stream")
        if src_name == "input_pipeline":
            src.emit('end-of-stream')
        elif src_name == "output_pipeline":
            loop.quit()

# フレーム加工
def frame_processing(frame):
    frame_blur = cv2.GaussianBlur(frame, (15, 15), 0)
    return frame_blur

# パイプラインから映像フレームを受信した際の処理
def on_new_sample(sink, src):
    sample = sink.emit('pull-sample')
    inp_buf = sample.get_buffer()
    result, map_info = inp_buf.map(Gst.MapFlags.READ)
    if result:
        # 映像フレーム取得
        frame_bgr = np.ndarray(shape=(HEIGHT, WIDTH, 3), dtype=np.uint8, buffer=map_info.data)
        
        # フレーム加工
        frame_proc = frame_processing(frame_bgr)
        
        # output_pipelineに加工した映像フレームをプッシュ
        frame_i420 = cv2.cvtColor(frame_proc, cv2.COLOR_BGR2YUV_I420)
        out_buf = Gst.Buffer.new_wrapped(frame_i420.tobytes())
        out_buf.pts = inp_buf.pts
        out_buf.duration = inp_buf.duration
        out_buf.dts = inp_buf.dts
        src.emit('push-buffer', out_buf)
        
    inp_buf.unmap(map_info)
    return Gst.FlowReturn.OK


if __name__ == '__main__':
    # メインループの設定
    loop = GLib.MainLoop()

    # GStreamerのパイプラインを作成
    input_pipeline = Gst.parse_launch(
        f"videotestsrc num-buffers=100 ! videoconvert ! video/x-raw,format=BGR,width={WIDTH},height={HEIGHT}, framerate={FRAMERATE} ! appsink name=sink"
    )
    input_pipeline.set_name("input_pipeline")

    output_pipeline = Gst.parse_launch(
        f"appsrc name=src ! video/x-raw,format=I420, width={WIDTH}, height={HEIGHT}, framerate={FRAMERATE} ! x264enc ! h264parse ! mp4mux ! filesink location={OUTFILE_NAME}"
    )
    output_pipeline.set_name("output_pipeline")

    # output_pipelineのappsrcの設定
    src = output_pipeline.get_by_name("src")

    # input_pipelineのappsinkの設定
    sink = input_pipeline.get_by_name("sink")
    sink.set_property("emit-signals", True)
    sink.connect("new-sample", on_new_sample, src)

    # バスの設定
    input_pipeline_bus = input_pipeline.get_bus()
    input_pipeline_bus.add_signal_watch()
    input_pipeline_bus.connect("message", handle_bus_messages, loop, src)
    output_pipeline_bus = output_pipeline.get_bus()
    output_pipeline_bus.add_signal_watch()
    output_pipeline_bus.connect("message", handle_bus_messages, loop, src)

    # パイプラインを開始
    input_pipeline.set_state(Gst.State.PLAYING)
    output_pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except KeyboardInterrupt:
        pass

    # パイプラインを停止
    input_pipeline.set_state(Gst.State.NULL)
    output_pipeline.set_state(Gst.State.NULL)

コードの概要

用意したコードの概要を説明します。videotestsrcエレメントで生成したテスト映像に対して、映像処理ライブラリであるOpenCVを使用してPythonで映像加工したものをMP4ファイルで保存するといった動作デモになります。GStreamerパイプラインは映像入力側(input_pipeline)と映像出力側(output_pipeline)の2本存在しています。これらのパイプラインに挟まれる形でPythonで映像フレームの加工を行う関数が存在しているという構成になります。今回フレーム加工ではブラー処理と文字挿入を行ってみました。

ソースコードの概要図
ソースコードの概要図

ライブラリのインポート

ここからは主要部分のコードを解説していきます。

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

上記のコードでPython上でGStreamerを使うためのライブラリをimportしています。GstモジュールはGStreamerのバインディングを提供するモジュールで、GLibモジュールはGLibライブラリのバインディングを提供するモジュールです。これらのモジュールを使用してパイプラインの作成やメインループの管理を行いGStreamerの機能をPythonで使用していきます。

初期化

# 初期化
Gst.init(None)

Gstモジュールの初期化を行う関数です。アプリケーションの実行前に必ず行う必要があります。コマンドライン引数を指定することが可能ですが、このコードではNoneを指定しています。

パイプラインの構築

    input_pipeline = Gst.parse_launch(
        f"videotestsrc num-buffers=100 ! videoconvert ! video/x-raw,format=BGR,width={WIDTH},height={HEIGHT}, framerate={FRAMERATE} ! appsink name=sink"
    )
    input_pipeline.set_name("input_pipeline")

入力側のパイプラインを構築しています。videotestsrcで映像の横幅=640ピクセル、高さ=480ピクセル、フレームレート=10fpsで合計100枚のフレームの映像(10秒)を生成します。生成する映像フォーマットはBGRです。appsinkはGStreamerパイプラインからデータを取得することができるエレメントです。このエレメントを繋ぎ合わせることでvideotestsrcで生成した映像を取得し、Pythonで映像データを直接操作することが可能になります。このパイプラインの名前はinput_pipelineとしています。

    output_pipeline = Gst.parse_launch(
        f"appsrc name=src ! video/x-raw,format=I420, width={WIDTH}, height={HEIGHT}, framerate={FRAMERATE} ! x264enc ! h264parse ! mp4mux ! filesink location={OUTFILE_NAME}"
    )
    output_pipeline.set_name("output_pipeline")

出力側のパイプラインを構築しています。appsrcはGStreamerパイプラインにデータを挿入することができるエレメントです。Pythonで映像加工したフレームを挿入することが可能になります。フレームのサイズ、フレームレートは入力と同じで、映像フォーマットはI420としています。x264encは流れてきた映像をH.264でエンコードするエレメントです。I420で出力しているのは、それがx264encの受け付けるフォーマットのひとつだからです。x264encはH.264ソフトウェアエンコーダのx264を利用します。h264parseはH.264ストリームを解析し、多重化できるようパケットを調整します。mp4muxはMP4コンテナに映像ストリームを多重化します。最後に多重化されたデータをfilesinkでファイルに書き出します。ファイル名はlocation=output.mp4です。このパイプラインの名前はoutput_pipelineとしています。

映像フレーム取得->加工->挿入までの実装

    # input_pipelineのappsinkの設定
    sink = input_pipeline.get_by_name("sink")
    sink.set_property("emit-signals", True)
    sink.connect("new-sample", on_new_sample, src)

input_pipelineのappsinkから映像フレームを取得できるような設定を行っています。emit-signalsというプロパティをTrueに設定することによってappsinkが新しいサンプルデータ(映像フレーム)を受け取った際に”new-sample”というシグナルが発行されるようになります。“new-sample”が発行された際にはon_new_sampleというコールバック関数が呼ばれるように設定しています。

# フレーム加工
def frame_processing(frame):
    frame_blur = cv2.GaussianBlur(frame, (15, 15), 0)
    return frame_blur

# パイプラインから映像フレームを受信した際の処理
def on_new_sample(sink, src):
    sample = sink.emit('pull-sample')
    inp_buf = sample.get_buffer()
    result, map_info = inp_buf.map(Gst.MapFlags.READ)
    if result:
        # 映像フレーム取得
        frame_bgr = np.ndarray(shape=(HEIGHT, WIDTH, 3), dtype=np.uint8, buffer=map_info.data)
        
        # フレーム加工
        frame_proc = frame_processing(frame_bgr)
        
        # output_pipelineに加工した映像フレームをプッシュ
        frame_i420 = cv2.cvtColor(frame_proc, cv2.COLOR_BGR2YUV_I420)
        out_buf = Gst.Buffer.new_wrapped(frame_i420.tobytes())
        out_buf.pts = inp_buf.pts
        out_buf.duration = inp_buf.duration
        out_buf.dts = inp_buf.dts
        src.emit('push-buffer', out_buf)
        
    inp_buf.unmap(map_info)
    return Gst.FlowReturn.OK

on_new_sample関数ではappsinkからの映像フレーム取得、フレームの加工処理、加工後の映像フレームをoutput_pipelineのappsrcに挿入を行っています。ここではnumpy形式で映像フレームを取得し、フレームの加工はOpenCVのGaussianBlur関数を用いてブラー処理を行っています。加工後のフレームはI420フォーマットに変換された後、appsrcに挿入されます。frame_processing関数の中身を自由に変えることで任意の画像処理が可能になります。

パイプラインのバス設定

    # バスの設定
    input_pipeline_bus = input_pipeline.get_bus()
    input_pipeline_bus.add_signal_watch()
    input_pipeline_bus.connect("message", handle_bus_messages, loop, src)
    output_pipeline_bus = output_pipeline.get_bus()
    output_pipeline_bus.add_signal_watch()
    output_pipeline_bus.connect("message", handle_bus_messages, loop, src)

GStreamerのパイプラインの各エレメントはバスに接続されており、パイプライン内で発生するイベントやメッセージがバスを介して通知されます。上記のコードでは、ここまでで作成したパイプラインのバスの設定を行っています。input_pipelineoutput_pipelineではパイプラインからメッセージが発行されるとhandle_bus_messagesというコールバック関数が呼ばれるように設定しています。

# バスメッセージを受信した際の処理
def handle_bus_messages(bus, message, loop, src):
    t = message.type
    src_name = message.src.get_name()
    if t == Gst.MessageType.EOS:
        print(f"{src_name} received End-Of-Stream")
        if src_name == "input_pipeline":
            src.emit('end-of-stream')
        elif src_name == "output_pipeline":
            loop.quit()

handle_bus_messages関数の処理内容です。input_pipelineでGst.MessageType.EOSというメッセージ(映像の終了を示す)が通知された際にoutput_pipelineのappsrcに対し”end-of-stream”を発行し、映像が終了した旨を伝えています。output_pipelineでGst.MessageType.EOSが通知されるとメインループが終了するようにしています。

パイプラインの制御

    # メインループの設定
    loop = GLib.MainLoop()

    <省略>

    # パイプラインを開始
    input_pipeline.set_state(Gst.State.PLAYING)
    output_pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except KeyboardInterrupt:
        pass

    # パイプラインを停止
    input_pipeline.set_state(Gst.State.NULL)
    output_pipeline.set_state(Gst.State.NULL)

上記のコードでGStreamerパイプラインの開始と停止を行っています。GStreamerパイプラインにはステートがあり、パイプラインをPLAYINGにセットすることでパイプライン内のエレメントが処理を開始しパイプライン上をデータが流れ始めます。loop.run()でメインループが実行され、ループが実行されている間はパイプラインは処理を継続します。ループが終了するとステートをNULLにセットしパイプラインを初期状態に戻します。

コード実行(ブラー処理)

以下の映像は何もフレーム加工しない場合のframe_processingの例とテスト映像です。

def frame_processing(frame):
    return frame

それに対しフレーム加工としてブラー処理を行った場合のコードと映像が以下です。各カラーの境界や右下の砂嵐がぼやけた映像を作ることができました。

# フレーム加工
def frame_processing(frame):
    frame_blur = cv2.GaussianBlur(frame, (15, 15), 0)
    return frame_blur

コード実行(文字を挿入)

このコードはframe_processing関数を自由に変更することで様々なフレーム加工を行うことができます。次は試しにframe_processing関数を以下のコードに変更して実行してみます。“IBEX”という文字を一文字ずつ映像に表示する加工を行うように変更しました。なおappsinkから取得したフレームは読み取り専用なのでframe.copy()で複製してフレーム加工を行っています。

# フレーム加工
def frame_processing(frame):
    frame_text = frame.copy()
    cv2.putText(frame_text, text='I', org=(0,              int(HEIGHT/2)), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=3, color=(255, 0, 0), thickness=5)
    cv2.putText(frame_text, text='B', org=(int(WIDTH/4),   int(HEIGHT/2)), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=3, color=(0, 0, 255), thickness=5)
    cv2.putText(frame_text, text='E', org=(int(WIDTH/2),   int(HEIGHT/2)), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=3, color=(255, 0, 0), thickness=5)
    cv2.putText(frame_text, text='X', org=(int(WIDTH*3/4), int(HEIGHT/2)), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=3, color=(255, 0, 0), thickness=5)
    return frame_text

このコードを実行した際に出力された映像が以下になります。ちゃんと”IBEX”という文字が表示されました。

おわりに

今回ご説明させていただいた実装例は簡易的なものでしたが、GStreamerのエレメントをvideotestsrcからfilesrcやrtspsrc等に変更すれば任意の入力映像にも対応可能になります。また映像解析用途としてAI処理との組み合わせも可能になります。実際に弊社ではgst-pythonを用いて様々なシステムを構築しています。今後もgst-pythonを活用した映像処理の可能性を模索していきたいと思います。