読者です 読者をやめる 読者になる 読者になる

【SOT-ES100】無線による光伝送、使ってみた

 

 

1. はじめに

みなさん、こんにちは。

FutureStandardでエンジニアをしています、田本です。

会社のブログを書くのは初めてですが、よろしくお願いします。

 

今回は、弊社の監視カメラのある案件で使用した面白い通信機器を紹介したいと思います。

東洋電機さん(開発: 太陽誘電さん)の空間光伝送装置「SOT-ES100」です。

「光による無線伝送」という大変面白い製品ですので、その時の評価レポートをまとめました。

 

 

2. SOT-ES100とは?

f:id:FutureStandard:20170227184754j:plain

LANケーブルの信号を光に変換して無線で送受信するメディアコンバーターです。
他の通信方法と比較すると、下記のような利点があります。

・ケーブルを這わせたくないような場所からも、ネットワークに接続できる。

・初期設定不要で使用できる。

・電波の混雑などでWi-Fiが使えない場所でも通信できる。

Wi-Fiなどと比べ、通信の傍受がされづらい。

・電波法認証不要で使用できる。*1

・屋外でも使用できる。*2

・最大で200mまで通信可能。*3

現状の最大伝送速度は10Mbpsですが、将来的に100Mbpsに対応されるとのことです。

 

通信方法別比較表

 

有線LAN

無線LAN

(無指向性)

無線LAN

(指向性)

LTE

光無線通信

SOT-ES

シリーズ

最大通信速度

100Gpbs

1Gbps

30Mbps

100Mbps

10Mbps

最大通信距離

100m

100m

10km

-

200m

配線の有無

あり

なし

なし

なし

なし

電波の干渉

-

弱い

弱い

弱い

強い

通信の傍受

強い

弱い

弱い

弱い

強い

初期設定

不要

必要

必要

必要

不要

www.toyo-elec.co.jp

 

外観 

f:id:FutureStandard:20170304205739j:plain 

上面: LEDで通信強度がわかります。 

 

f:id:FutureStandard:20170403200716j:plain

 前面: 送光・受光部です。

 

f:id:FutureStandard:20170403200955j:plain

後面: LANケーブルの挿し込み口がついています。

 

 

3. 製品評価の概要

性能の実測や、光ならではの性質を利用した実験を行いました。

本体にLANケーブルが挿せるようになっていて、AとBの間でデータを送受信できます。

今回はAの方にNTTのモデムを、Bの方にパソコンをつないで実験してみました。

 

 

3. 1. スピードテストやってみた

まずはスピードテスト

 

スピードテストの構成図

f:id:FutureStandard:20170421115611p:plain

今回はUSENスピードテストを使用しました。
(http://www.usen.com/speedtest02/

 

f:id:FutureStandard:20170227185136p:plain

最初は機器の間隔を20cmに設定してスピードテストを行いました。*4
最大伝送速度が10Mbpsなので、約90%のスピードが出ていますね。*5

 

その後、装置の間隔を10m程度に変更して10回スピードテストを行いました。

試行回数

スピード[Mbps]

1

9.189

2

9.214

3

9.156

4

9.214

5

9.166

6

9.166

7

9.167

8

9.167

9

9.163

10

9.188

平均値

9.179

装置の間隔によるスピードの違いは、誤差の範囲内と言って問題ないと思います。

 

 

3. 2. 光を遮ってみた

光による無線通信ということで、装置間に障害物があると通信が途切れてしまうことがあります。

試しに養生テープや緩衝材の薄いスポンジで遮ってみましたが、
若干通信強度が落ちることがあっても、通信には問題ありませんでした。

f:id:FutureStandard:20170421125704j:plain

f:id:FutureStandard:20170421130720j:plain

製品試験では豪雨や濃霧などの悪条件でのテストもされているそうなので、予想以上に切れづらいです。

 

当たり前かもしれませんが、手で完全に覆うと切断してしまいます。

f:id:FutureStandard:20170421131816j:plain

通信強度が0になってしまいました。(赤丸で囲った部分)

 

  

3. 3. 鏡で反射させてみた

最後に鏡で反射させても通信できるのか実験してみました。 

f:id:FutureStandard:20170227185759j:plain

角度の調整がシビアでしたが、 問題なくインターネットに接続できました。

 

f:id:FutureStandard:20170403195545j:plain

わかりづらいですが、受信強度を示すLEDが光っていますね。(赤丸で囲った部分)

 

4. まとめ

今回の実験では最大伝送速度の約90%のスピードが出ていたので、今後100Mbpsに対応したときが非常に楽しみです。

天井など、障害物が少なく人が光を遮りづらい場所に設置すると、通信が切断されることなく使えると思います。

弊社では監視カメラシステムに、この光無線装置を活用しております。
興味がございましたら下記のリンクをご覧ください。

SCORER Surveillance

 

*1:いわゆる技適のこと

*2:防水ケースが必要

*3:今回使用した「SOT-ES100」は100mまで

*4:製品仕様に記載されている伝送距離の最短が20cmであるため

*5:装置を介さない場合は約67Mbpsだったので、回線がボトルネックになっているわけではありません

Raspberry Pi3 と ZeroMQ でフォグ・コンピューティングの雰囲気を体験(中編)

~ HTTP ではないマイクロサービスのすゝめ ~

この記事では、IoT の世界で旬の Raspberry Pi3 上に、ZeroMQPython を使った分散システムを構築し、フォグ・コンピューティングを疑似体験してみたいと思います。

前編は、ZeroMQ の入門だけで終わってしまったので、今回は応用編です。

ラズパイ3を使った『画風変換ネットワーク・カメラ』を作ります。



3.画風変換とは

ニューラルネットワークで『画風*1を学習し、一般画像をそれっぽく変換してしまうというものです。2015年夏頃に有名になったので、ご存じの方もいるのではと思います。

普通の風景写真が、ゴッホの絵のような風景写真になったり、キュービズムな風景写真になったりします。

PFN さんが、詳しい技術内容を記事にしてくれています。

本ブログ記事と同様の「画風変換カメラ」を、Android アプリ+サーバで作られた方もいるようです。その方は、速度に定評のある TensorFlow を使っていますね。また、動画配信プロトコルRTMP を駆使して、サーバに映像を飛ばしています。結構、大掛かりです。

developers.linecorp.com

今回は、導入が簡単な ZeroMQ、Chainer、そして弊社提供の SCORER SDK を利用して、さくっと半日ぐらいで、同じ機能を作ってしまおうと思います。

4.全体構成

こちらを御覧ください。

f:id:FutureStandard:20170302192410j:plain

ラズパイ3をネットワーク・カメラ化するのには、弊社の映像解析 IoT プラットフォームとして提供している SCORER SDK を利用します。SCORER には、OpenCV3.2 や ZeroMQ など、よく使うミドルウェアが最初からインストールされているので、高速開発に便利です。

画風の変換には、日本で人気の高いディープラーニングフレームワークである Chainer を利用します。Chainer をラズパイ3上で動かすのは、さすがにメモリ不足なので、パソコンに処理をオフロードします。この時の通信に、ZeroMQ を活用します。

画風変換用のパソコンは、オフィスにあった普通の Windows 10 パソコンです。*2

5.ラズパイ3の準備

ラズパイ3に USB カメラを接続し、SCORER SDK のイメージを書き込んだ MicroSD を挿入して起動します。起動したら Wifi もしくは有線で LAN にも繋ぎましょう。*3

カメラ制御ページ*4である http://<ラズパイ3のIPアドレス>:20001/ にアクセスし、カメラの画角を確認しましょう。

f:id:FutureStandard:20170302201355j:plain

CPU リソースを空けたいので、Detection MethodNone に変更し、Apply Now を押下しておきます。

なお、横道にそれますが、画面右上リンク Go to RPi-Monitor をクリックすると、ラズパイの稼働グラフなども見ることができます。

f:id:FutureStandard:20170302201435j:plain

5.1. Cloud9 で開発スタート

次に、開発用ページである http://<ラズパイ3のIPアドレス>:20002/ にアクセスし、Cloud9 を起動します。

利用を開始する前に、右上メニューの Cloud9 または Ctrl-, で Preferences を表示し、Python Support の Python Version が Python3 になっていることを確認します。*5

また、Workspace にホームディレクトリも表示しておきましょう。そのためには歯車マークをクリックします。

f:id:FutureStandard:20170302204845j:plain

5.2. カメラ画像の取得と ZeroMQ による送受信

SCORER SDK から、最新のカメラ映像を取得し、ZeroMQ で送信するのは簡単です。

今回は、ZeroMQ の Pub-Sub パターンを利用します。以下のファイルを Cloud9 上にドラッグ&ドロップして、ラズパイ3内に持ち込みましょう。

pub_images.py

import numpy as np
import time
import scorer

import zmq
import zmq_utils


# Setup ZeroMQ image publisher & receiver
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.set_hwm(1)
pub.bind("tcp://*:5558")

sub = context.socket(zmq.SUB)
sub.setsockopt_string(zmq.SUBSCRIBE, "image")
sub.set_hwm(1)
sub.bind("tcp://*:5559")


# Setup VideoCaputre Object
cap = scorer.VideoCapture(0)


last_tick_pub = last_tick_sub = time.time()
duration_pub = duration_sub = 0

while True:

    frame = cap.read()
    if frame == None:
        continue

    # Publish the image
    image = frame.get_bgr()
    zmq_utils.send_image(pub, image)

    tick = time.time()
    duration_pub = tick - last_tick_pub
    last_tick_pub = tick

    # Receive the transformed image
    result = None
    try:
        result = zmq_utils.recv_image(sub, flags=zmq.NOBLOCK)
    except zmq.error.Again:
        pass

    if result is not None:
        tick = time.time()
        duration_sub = tick - last_tick_sub
        last_tick_sub = tick

        # Output
        scorer.imshow(1, result)

    print("Send {0:>7.3f} ({1:>7.3f} fps)   Recv {2:>7.3f} ({3:>7.3f} fps)".
          format(duration_pub,
                 1.0/duration_pub if duration_pub>0 else 0.0,
                 duration_sub,
                 1.0/duration_sub if duration_sub>0 else 0.0))

pub_images.py@gist

ポイントはこちら。

  • カメラ画像を送信する PUB ソケットはポート5558、画風変換した画像を受け取る SUB ソケットはポート5559 で待受けます。
  • 画風変換の間に合わなかった画像は、キューに溜めずにすぐ捨てます。そのため、set_hwm(1) でキューの最大長を1にしています。メモリ消費やレイテンシーが良好になります。
  • SCORER SDK では OpenCV と同様のインタフェースでカメラ画像を取得できます。 cap オブジェクトがそのインタフェースです。
  • 取得したカメラ画像は、直ちに zmq_utils.send_image() でブロードキャストします。そして、キャプチャのフレームレートを計算します。
  • 画風変換された画像は、zmq_utils.recv_image() で受信します。変換結果がいつ来るか予測できないので NOBLOCK を指定し、データが到着したときだけ処理します。
  • 変換結果を受信できた場合だけ scorer.imshow() で結果を Web に表示し、受信のフレームレートを計算します。

ZeroMQ で Numpy 画像を送受信する関数は、zmq_utils.py として、pub_images.py の隣に置いておきます。一種のライブラリ・ファイルですね。

zmq_utils.py

import numpy as np
import json
import zmq


def send_image(socket, image, channel = b"image"):
    # Serialize a Numpy array
    dtype = str(image.dtype).encode('ascii')
    shape = json.dumps(image.shape).encode('ascii')
    data = image.tostring('C')

    socket.send_multipart([channel, dtype, shape, data])


def recv_image(socket, flags = 0):
    channel, dtype, shape, data = socket.recv_multipart(flags)

    # Deserialize a numpy array
    image = np.frombuffer(data, dtype=dtype.decode('ascii'))
    image.shape = json.loads(shape.decode('ascii'))
    return image

zmq_utils.py@gist

シリアライズの詳細は、本シリーズの前編を参照してください。

5.3. 画風変換のスタブ

画風変換のプログラムを動かす前に、スタブ・プログラムで処理の概要を確認しましょう。

mirror_images.py

import numpy as np
import zmq
import zmq_utils


# Setup ZeroMQ image publisher & receiver
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.setsockopt_string(zmq.SUBSCRIBE, "image")
sub.set_hwm(1)
sub.connect("tcp://localhost:5558")

pub = context.socket(zmq.PUB)
pub.set_hwm(1)
pub.connect("tcp://localhost:5559")

print("Starting...")

while True:

    image = zmq_utils.recv_image(sub)
    zmq_utils.send_image(pub, image)

mirror_images.py

  • PUB/SUBソケットは、pub_images.py とポート番号が逆になります。また、bind()ではなくconnect() になります。
  • 画像を recv_image() で受信したら、即、send_image()送信しています。

この送信と受信の間に、画風変換を入れていくことになります。

5.4. 単体で動かしてみる

Cloud9 の IDE 画面で、pub_images.py と mirror_images.py をそれぞれ ▶ Run すると、処理が動き始めます。

IDE 画面の中央下にあるコンソール・ペインに、ログが出力されると思います。

f:id:FutureStandard:20170302214832j:plain

ZeroMQ のお陰で、どちらのプログラムを先に Run しても問題ありません。

実行中に SDK のページ http://<ラズパイ3のIPアドレス>:20002/ に行き、Web Show 1 を表示すると、pub_images.py で呼んでいる scorer.imshow() の結果画像を確認することができます。

f:id:FutureStandard:20170302215243j:plain

以上でラズパイ3側の作業は全てです。

プログラムを STOP し、画風変換パソコンの準備に取り掛かります。

6.画風変換用パソコンの準備

今回は、オフィスにあった Windows10 パソコンを利用しました。

Windows10 への Chainer 導入方法は、以下の記事を参考にしました。

ありがとうございます!

6.1. インストール

Windows10 で Chainer を動かすのに必要なソフトウェアをインストールしていきます。

Python3.6 with CUDA

詳細は、上記の Qiita 記事を参照してください。

  1. Visual C++ Build Tools
    • C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\bin環境変数「PATH」に追加
    • C:\Program Files (x86)\Windows Kits\10\Include\10.0.10240.0\ucrt環境変数「INCLUDE」に追加
  2. CUDA Toolkit 8.0
  3. cuDNN 5.1
    • Zipファイルを展開後に内部の “bin” “include” “lib” を C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v8.0\ 内に上書きコピー
  4. Anaconda 4.3.0 / Python 3.6 version 64bit *6

インストールが終わり、コマンドプロンプトから python -V を実行すると、以下の表示が出るようになります。

Python 3.6.0 :: Anaconda 4.3.0 (64-bit)

※ もしGPUを利用せずCPUだけで使う場合、Anaconda のインストールのみでOKです。

pip パッケージ

Chainer 含め、Pythonのパッケージは pip コマンドで簡単に入ります。

コマンドプロンプトから、以下のコマンドを実行します。

pip install chainer
pip install pillow
pip install pyzmq

pyzmq は、ZeroMQ の Python バインディングだけでなく、ZeroMQ 本体もインストールしてくれます。

fast-neuralstyle をダウンロード

GitHub から、以下のレポジトリをダウンロードして、展開しておきます。

github.com

色々な画風のモデル(学習済みデータ)もあります。

github.com

GitHub から直接 Zip ファイルを取得・展開できます。

むしろ git clone したい方はこちら。

git clone https://github.com/yusuketomoto/chainer-fast-neuralstyle
git clone https://github.com/gafr/chainer-fast-neuralstyle-models

6.2. 画風変換を試してみる

早速、画風変換してみましょう。

レポジトリ内の sample_images/tubingen.jpg を変換しますが、利用したパソコンのグラボは GTX 650 Ti で 1280×720 の解像度を処理するにはメモリ不足でした。事前に 640x480 まで画像を縮小し、tubigen_vga.jpg として保存しておきます。

コマンドプロンプトで Gitプロジェクトの展開先に chdir し、generate.py を叩きます。

cd chainer-fast-neuralstyle
python generate.py --gpu 0 --model models\seurat.model --out ..\output.jpg --keep_colors sample_images\tubingen_vga.jpg
2.5774641036987305 sec

seurat.model という画風を tubingen.jpg に適用して、output.jpg が生成されました。かかった時間は 2.57 秒。

別途取得したレポジトリのモデルも使ってみます。

python generate.py --gpu 0 --model ..\chainer-fast-neuralstyle-models-master\models\cubist.model --out ..\output.jpg --keep_colors sample_images\tubingen_vga.jpg
2.7556166648864746 sec

CPU でも動作させてみます。--gpu -1 が CPU の指定です。

python generate.py --gpu -1 --model ..\chainer-fast-neuralstyle-models-master\models\cubist.model --out ..\output.jpg --keep_colors sample_images\tubingen_vga.jpg
5.451727628707886 sec

このパソコンのCPUは Core i7-3770 3.4GHz。GTX 650 Tiの2倍弱の時間がかかりました。

すべて順調に動作しているようです!

6.3. ZeroMQ と連結する

generate.py は、画像ファイルを読み込んで、変換して、ファイルに書き出すという構造になっています。画像ファイルの代わりに、ZeroMQ で受信した画像を変換し、結果も ZeroMQ で送り出すように改造します。

画像の扱いも、generate.py は Pillow ライブラリを使用しているので、PIL.Image オブジェクト(Pillow形式)と Numpy オブジェクト(OpenCV形式)の相互変換が必要となります。

以下のファイル transform_images.py を generate.py の隣にコピーしておきます。

transform_images.py

from __future__ import print_function
import numpy as np
import argparse
from PIL import Image, ImageFilter
import time

import chainer
from chainer import cuda, Variable, serializers
from net import *

import zmq
import zmq_utils
import time


parser = argparse.ArgumentParser(description='Real-time style transfer image generator')
parser.add_argument('--gpu', '-g', default=-1, type=int,
                    help='GPU ID (negative value indicates CPU)')
parser.add_argument('--model', '-m', default='models/style.model', type=str)
parser.add_argument('--out', '-o', default='out.jpg', type=str)
parser.add_argument('--median_filter', default=3, type=int)
parser.add_argument('--padding', default=50, type=int)
parser.add_argument('--keep_colors', action='store_true')
parser.set_defaults(keep_colors=False)
args = parser.parse_args()

# from 6o6o's fork. https://github.com/6o6o/chainer-fast-neuralstyle/blob/master/generate.py
def original_colors(original, stylized):
    h, s, v = original.convert('HSV').split()
    hs, ss, vs = stylized.convert('HSV').split()
    return Image.merge('HSV', (h, s, vs)).convert('RGB')

model = FastStyleNet()
serializers.load_npz(args.model, model)
if args.gpu >= 0:
    cuda.get_device(args.gpu).use()
    model.to_gpu()
xp = np if args.gpu < 0 else cuda.cupy

print("The model has been read")



# Setup ZeroMQ image publisher & receiver
context = zmq.Context()
sub = context.socket(zmq.SUB)
sub.setsockopt_string(zmq.SUBSCRIBE, "image")
sub.set_hwm(1)
sub.connect("tcp://<ラズパイ3のIPアドレス>:5558")

pub = context.socket(zmq.PUB)
pub.set_hwm(1)
pub.connect("tcp://<ラズパイ3のIPアドレス>:5559")

print("Starting...")

last = time.time()

while True:

    # Convert cv::Mat to PIL.Image
    original = zmq_utils.recv_image(sub)
    original = Image.fromarray(original)
    b,g,r = original.split()
    original = Image.merge("RGB",(r,g,b))

    image = np.asarray(original, dtype=np.float32).transpose(2, 0, 1)
    image = image.reshape((1,) + image.shape)

    if args.padding > 0:
        image = np.pad(image, [[0, 0], [0, 0], [args.padding, args.padding], [args.padding, args.padding]], 'symmetric')
        image = xp.asarray(image)

    x = Variable(image)

    y = model(x)

    result = cuda.to_cpu(y.data)

    print("result obtained")

    # Free the memory
    y.unchain_backward()

    if args.padding > 0:
        result = result[:, :, args.padding:-args.padding, args.padding:-args.padding]

    result = np.uint8(result[0].transpose((1, 2, 0)))
    med = Image.fromarray(result)

    if args.median_filter > 0:
        med = med.filter(ImageFilter.MedianFilter(args.median_filter))
    if args.keep_colors:
        med = original_colors(original, med)

    tick = time.time()
    print(tick - last, 'sec')
    last = tick

    #med.save(args.out)

    # Convert PIL.Image back to cv::Mat
    r,g,b = med.split()
    med = Image.merge("RGB",(b,g,r))
    med4pub = np.asarray(med)
    zmq_utils.send_image(pub, med4pub)

transform_images.py@gist

オリジナルの generate.py と比較すると、前述の mirror_images.py を組み込んでいるのがわかると思います。

PUBソケットとSUBソケットで connect() する際の IP アドレスは、ラズパイ3のIPアドレス を書いておきます。

なお、このプログラムを動かすのには、隣に zmq_utils.py も必要ですので、忘れずに transform_images.py と同じフォルダにコピーしておきます。

7.全体で動かしてみる

ここまで出来てしまえば、全体を動かすのは簡単です。

ZeroMQのお陰でどちらが先でも良いのですが、ラズパイ3で pub_images.py を動作させ、次に画風変換パソコンで transform_images.py を動作させてみます。*7

7.1. ラズパイ3側

IDE から ▶ Run にて実行、もしくは IDE 画面下のコンソールから、以下のコマンドを実行します。

python3 ./pub_images.py

7.2. 画風変換パソコン側

python transform_images.py --gpu 0 --keep_colors --model ..\chainer-fast-neuralstyle-models-master\models\edtaonisl.model

そして Web show を見ると、キャプチャした画像が変換され表示されるはずです!

f:id:FutureStandard:20170303233114j:plain

この時の画風変換パソコンのログを見るとこんな感じ。

f:id:FutureStandard:20170303233335p:plain

VGA 画像1枚を 0.9 秒程でループできているようです。

コマンド引数の --gpu 0--gpu -1 にして、CPUを使った時のログはこちら。

f:id:FutureStandard:20170303233438p:plain

画像1枚で6秒弱なので、複数枚の画像を連続で処理すると、GPU の恩恵が大きくなるようですね。

もう一つ、別の画風モデルで流した時の映像を貼っておきます。

f:id:FutureStandard:20170302231057j:plain

その時のコマンドラインはこちら。

python transform_images.py --gpu -1 --keep_colors --model ..\chainer-fast-neuralstyle-models-master\models\cubist.model

画風変換おもしろいかも!!

中編まとめ(つづく)

今回、mattyaさん、yusuketomotoさんが公開している素晴らしいライブラリのお陰で、半日足らずで、画風変換カメラを作ることができました。

Chainer を利用した画像処理は、以下の記事にもあるとおり、他にも色々とあります。

本記事のノウハウを活用することで、このようなアルゴリズムを、カメラと簡単に組み合わせ、実環境に適用することができると思います。

それでも計算リソースが不足する場合には、複数のコンピュータを駆使して処理を分担するハイパフォーマンス・クラスタ的な使い方をしたくなるでしょう。

後編では、フォグ・コンピューティングの真骨頂というべき?、エッジ側でのハイパフォーマンス・クラスタを実現するロードバランサーについて解説したいと思います。


*1:画風は、英語で “Style” と言うようです。

*2:私のノートPCでも動作しますが、CPUでの処理はあまりに遅かったので、NVidia グラボ搭載のパソコンにしました。

*3:iOS App の SCORER Starter を利用すると、画面・キーボード不要のヘッドレス構成でネットワークの設定ができます。

*4:Scorer Camera Control - SCC と呼びます。

*5:ここが Python2 だと import scorer に失敗します。

*6:Anacondaは、Python の科学技術計算に最適化されたディストリビューションです。公式の Python 3.6 の代わりにインストールします。

*7:mirror_images.py はスタブなので、今回は動かしません。

OpenCVデータをC++とPython間で交換する方法

1.はじめに

弊社では社長を含めてメンバーの中で一番年上の林です。新卒で業務系ソフトウエア開発会社にエンジニアとして入社したのち、海外での寄り道を経由して、ビジネス番組を衛星放送およびインターネットに配信している会社にて番組企画・プロデューサーとして働いた後、現在は縁あってFuture Standardにて主にRaspberry Piを利用したシステムの開発を行ってます。エンジニア歴としては、UNIXを利用したC言語システム開発の経験が長い一方、Webのフロント開発は苦手です。

先日のエントリにありましたとおり、弊社ではRaspberry Pi、USBカメラ、Pythonの組み合わせで、簡単に映像解析(顔検知、バーコード読み取り、AI物体認識など)を行うことができるSCORER SDKをリリースしました。そこで、本エントリではSCORER SDKで使われている技術の一部を説明します。

SCORER SDKの内部では、Gstreamer+OpenCVを使ってUSBカメラからの画像取得を行っています。このプログラムはC++で書かれています。

一方、SCORER SDKは使用言語がPythonとなります。そこでSCORER SDK内部では、C++OpenCV形式で保持されている画像データを、Python形式に交換しています。またSDKで操作された画像データをクラウドにアップロードするプログラムもC++で作られている事から、PythonからC++へのデータ交換も実施しています。この交換方法が本エントリーのテーマです。

2.C++OpenCVにおける画像データ

例えばある画像の高さ(rows)、幅(cols)、ピクセルデータ(data)がわかっている場合、C++OpenCVにおいては、下記の手法で画像データ(img)を作成する事ができます。 なお、今回は簡便化の為にC++側、Python側共に画素データの形式はBGR形式を前提とします。

static const int GRAY = 1;
cv::Mat create_image(int type, int rows, int cols, void* data){
    cv:: Mat img;
    if( type == GRAY){
        //グレイ画像
        img = cv::Mat(rows, cols, CV_8UC1 , data);   //CV_8UC1 はOpenCVの定数
    }else{
        //カラー画像
        img = cv::Mat(rows, cols, CV_8UC3 , data);   //CV_8UC3はOpenCVの定数
    }

    return img;
}

3.PythonOpenCVにおける画像データ

PythonOpenCVにてある画像の高さ(rows)、幅(cols)、ピクセルデータ(data)があらかじめ分かっている場合、画像データ(img)の生成方法を考えます。

PythonOpenCVでは、Numpy形式でデータを保持します。そこでピクセルデータをNumpy形式に交換する必要があります。SCORER SDKではピクセルデータをNumpy形式に交換する際、高速化のためNumpyが提供する機能であるfrombufferを利用しています。USBカメラからの画像取得など、連続したデータを扱う場合はfrombufferを利用する事を前提に考えた方が良いでしょう。

GRAY=1
def create_image(type, rows, cols, data):
    if type == GRAY:
        # グレイ画像
        # 画素データをnumpy形式に交換
        img=numpy.frombuffer(data, dtype=np.uint8).reshape((rows, cols))
        # グレイデータに交換
        gray = cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
    else:
        # カラー画像
        # 画素データをnumpy形式に交換f。このimgをそのまま利用可能    
        img=numpy.frombuffer(data, dtype=np.uint8).reshape((rows, cols,3))

    return img

frombufferに関する参考記事:http://stackoverflow.com/questions/5674960/efficient-python-array-to-numpy-array-conversion

4.ZeroMQを利用したプロセス間通信

C++版、Python版それぞれのOpenCVにおいて画像データの作成方法が整理出来ましたので、次はC++のプログラムとPythonプログラムの間の通信方法を考えます。先日のエントリにも説明がありますが、SCORER SDKではこのようなプログラム間の通信には、軽量なメッセージング・ミドルウエアであるZeroMQを利用しています。ZeroMQでは利用する接続文字列を変える事で、プログラムを修正する必要なく同一マシン間でのプロセス間通信と、マシンもしくはネットワークを跨いだ通信が可能になります。そこで本エントリーでもZeroMQを利用します。

5.C++からPythonへの交換

それではC++プログラムから読み込んだBMP画像を、Pythonプログラムに送信して、ファイル書き出しするプログラムを見てみます。

5.1: 送信側C++プログラム

送信プログラムでは、引数で与えられた画像ファイルをOpenCV形式に読み込んだのちに、画像の情報をZeroMQを利用して送信します。

#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>
#include "zmq.hpp"
#include <stdlib.h>
#include <stdio.h>

void my_free(void *data, void *hint)
{
    free(data);
}

main( int argc, char *argv[]){
    cv::Mat image;
    int i;

    // File Open
    if( !strcmp(argv[2], "color") ){
        printf("color\n");
        image = cv::imread(argv[1], CV_LOAD_IMAGE_COLOR);
    }else{
        printf("gray\n");
        image = cv::imread(argv[1], CV_LOAD_IMAGE_GRAYSCALE);
    }

    // Image Info
    int32_t  info[3];
    info[0] = (int32_t)image.rows;
    info[1] = (int32_t)image.cols;
    info[2] = (int32_t)image.type();

    // Open ZMQ Connection
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REQ);
    socket.connect ("tcp://localhost:5555");

    // Send Rows, Cols, Type
    for(i=0; i<3; i++ ){
        zmq::message_t msg ( (void*)&info[i], sizeof(int32_t), NULL  );
        socket.send(msg, ZMQ_SNDMORE);
    }

    // Pixel data
    void* data = malloc(image.total() * image.elemSize());
    memcpy(data, image.data, image.total() * image.elemSize());

    // Send Pixel data
    zmq::message_t msg2(data, image.total() * image.elemSize(), my_free, NULL);
    socket.send(msg2);

    return 0;
}

5.2: 受信側Pythonプログラム

受信側では、送信プログラムから送られてきた画像情報を、PythonにおけるOpenCV形式に交換したのちに、ファイルに出力します。ZeroMQでは受信されたデータはbyte列となっています。Pythonプログラムにおいて、変数がbyte列のままでは画像の高さ、横幅といった数字を正しく扱えません。そこでこのプログラムは受信したバイト列をPythonで扱える数字に交換するために、struct.upackメソッドを利用してバイト列の交換を行っています。

import zmq
import cv2
import struct
import numpy as np

# Connection String
conn_str      = "tcp://*:5555"

# Open ZMQ Connection
ctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind(conn_str)

# Receve Data from C++ Program
byte_rows, byte_cols, byte_mat_type, data=  sock.recv_multipart()

# Convert byte to integer
rows = struct.unpack('i', byte_rows)
cols = struct.unpack('i', byte_cols)
mat_type = struct.unpack('i', byte_mat_type)

if mat_type[0] == 0:
    # Gray Scale
    image = np.frombuffer(data, dtype=np.uint8).reshape((rows[0],cols[0]));
else:
    # BGR Color
    image = np.frombuffer(data, dtype=np.uint8).reshape((rows[0],cols[0],3));

# Write BMP Image
cv2.imwrite("recv.bmp", image);

6.PythonからC++への交換

それでは今度はPythonプログラムにて読み込んだBMP画像を、C++プログラムへ送信するプログラムの実例をみてみます。

6.1: 送信Pythonプログラム

5.1と同様に引数で与えられた画像ファイルをOpenCV形式に読み込んだのち、画像の情報をZeroMQを利用して送信します。

import zmq
import cv2
import sys
import numpy as np

conn_str="tcp://localhost:5556"

args = sys.argv

ctx = zmq.Context()
sock = ctx.socket(zmq.REQ)
sock.connect(conn_str)

if( args[2] == "color"):
    # Color
    img = cv2.imread(args[1], cv2.IMREAD_COLOR);
else:
    # Gray
    img = cv2.imread(args[1], cv2.IMREAD_GRAYSCALE);

height, width = img.shape[:2]
ndim = img.ndim

data = [ np.array( [height] ), np.array( [width] ), np.array( [ndim] ), img.data ]
sock.send_multipart(data)

6.2: 受信側C++プログラム

こちらの受信プログラムも5.2と同様にZeroMQ経由で送信されてきた画像情報をOpenCV形式に交換した後にファイルに出力しています。

#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>
#include "zmq.hpp"
#include <stdlib.h>
#include <stdio.h>

main( int argc, char *argv[]){
    int cnt=0;
    int rows, cols, type;
    cv::Mat img;
    void *data;

    // Open ZMQ Connection
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind("tcp://*:5556");

    while(1){
        zmq::message_t rcv_msg;
        socket.recv(&rcv_msg, 0);

        // Receive Data from ZMQ
        switch(cnt){
         case 0:
            rows = *(int*)rcv_msg.data();
            break;
         case 1:
            cols = *(int*)rcv_msg.data();
            break;
         case 2:
            type = *(int*)rcv_msg.data();
            break;
         case 3:
            data = (void*)rcv_msg.data();
            printf("rows=%d, cols=%d type=%d\n", rows, cols, type);

            if (type == 2) {
                 img = cv::Mat(rows, cols, CV_8UC1, data);
            }else{
                 img = cv::Mat(rows, cols, CV_8UC3, data);
            }
            cv::imwrite("recv.bmp", img);
            break;
        }

        if( !rcv_msg.more() ){
            // No massage any more
            break;
        }

        cnt++;
    }

    return 0;
}

まとめ

本エントリでは、C++ <-> Python間におけるOpenCV形式の画像データの交換方法を解説してみました。OpenCV自体はC++で開発されているので、C++でプログラムを書いた方が最新の機能などを利用できるかもしれません。しかし、PythonOpenCVはNumpy形式でデータを扱うため、Numpyが提供する各種機能を活用する事でプログラムをシンプルに書ける場合もあるでしょう。

そこでOpenCVを活用したC++Pythonのプログラムを連携させたい場合が生じた場合などは、本エントリで例示した方法が役立てば幸いです。

Raspberry Pi3 と ZeroMQ でフォグ・コンピューティングの雰囲気を体験(前編)

~ HTTP ではないマイクロサービスのすゝめ ~

この記事では、IoT の世界で旬の Raspberry Pi3 上に、ZeroMQPython を使った分散システムを構築し、フォグ・コンピューティングを疑似体験してみたいと思います。

長くなってしまったので、記事を分けました。前編は、フォグ・コンピューティングと ZeroMQ の紹介です。



1.フォグ・コンピューティングって何?

昨今、IoTやエッジ・コンピューティングが注目されていますが、一部企業では、さらにその先に、 フォグ・コンピューティング というパラダイムを見据えています。

フォグ・コンピューティングの定義や意義については、既に様々なところに説明がありますので、そちらを御覧ください。

ざっくりした理解としては、“モノ” と “クラウド” との間に 分散システム を構築しましょうって話ですね。近場のコンピューティング・リソースを束ねて、データの保管や解析ができちゃえば、クラウドまで行く通信時間とコストを削減できるでしょうと。

ソフトウェア的には、多数の IoT ハードウェアを束ねて伸縮自在なサービスを作る感じですね。

2.ZeroMQ 入門

ZeroMQは、非常に高速・軽量で、“組込み型” のメッセージング・ミドルウェアです。

一般にメッセージング・ミドルウェアというのは、プロセス間やノード間で、データの受け渡しを担当してくれるモジュールです。

通信プログラムを書くときには、しばしば、低レベルAPIの仕様を意識したコーディングをしなければいけません。メッセージング・ミドルウェアはそれらの雑事を引き受けてくれるので、マイクロサービスなどの分散システムでよく使われています。RabbitMQ(AMQP)が有名ですね。*1 *2

世の中のメッセージング・ミドルウェアについては、以下に良い比較記事があります。

postd.cc

グラフを注意深く見ると、「ブローカーレス」と「ブローカード」では、 処理できる秒間メッセージ数の桁が違う ことがわかります。ブローカーレスは、場合によってはブローカードの100倍以上高速です。

《グラフ抜粋》 f:id:FutureStandard:20170216153315j:plain

この記事で「ブローカード」といっているものは、メッセージの送信側と受信側の間にメッセージ交換サービス*3を挟む クライアントサーバ・モデル のメッセージング・ミドルウェアです。他方の「ブローカーレス」では、ピアツーピア・モデル が基本となりますので、外部にメッセージ交換サービスは存在しません。

ZeroMQ は「ブローカーレス」タイプの代表格です。アプリケーション組み込み型(Embedded)のライブラリ実装となっており、アプリケーションのスレッドの1つ*4として動作します。

ハードウェア・リソースの乏しい IoT 機器では、データ管理のニーズで組み込み型の SQLite を採用することが多いと思います。ZeroMQ は「メッセージング・ミドルウェア界の SQLite」と言っても良いでしょう。

2.1.ZeroMQ のメリット

ZeroMQ のバイブル的ドキュメント The Guide“Why We Needed ZeroMQ” を見ると、ZeroMQ で嬉しいことが何なのか、リストアップされています。

以下に意訳を掲載します。どんなライブラリなのか、イメージできますでしょうか?

  1. バックグラウンド・スレッドで、ロック・フリーの非同期 Network I/O を正しく実装します。
  2. 通信ピアは、動的に通信への参加や脱退ができ、再接続は ZeroMQ によって自動的に処理されます。サービスの起動順番を気にする必要はありません。
  3. メッセージは、自動かつインテリジェントに、必要な時のみキューイングされます。
  4. メッセージ・キュー溢れ状態("High Water Mark"と呼びます)を適切に処理します。ZeroMQ で決められた 通信パターン に従い、送信側をブロック、もしくはメッセージを捨てます。
  5. 様々なトランスポート層を使って動作できます。この時、コードを書き換える必要がありません: TCPマルチキャスト、プロセス内通信、プロセス間通信
  6. 受信側が遅かったり、止まってしまった場合でも、ZeroMQ で決められた 通信パターン に従い、安全に処理できます。
  7. 開発者の求めるネットワーク・トポロジーに応じて、多様な 通信パターン を利用できます: Request-Reply、Pub-Sub など
  8. 1行のコードで、メッセージのキューイング、転送、キャプチャなどを目的としたプロキシーを書けます。プロキシーは、ネットワークの複雑さを軽減する効果があります。*5
  9. メッセージは、送信した“そのままの姿”で受信できます。10KB のメッセージを送信したら、受信側でも 10KB のメッセージとして受け取ることができます。*6
  10. メッセージは、バイナリ(Blob)として扱われます。0バイト~ギガバイトクラスのメッセージまで、任意のデータを送受信できます。メッセージにフォーマットを規定したい場合、Msgpack や Protocol Buffer などを ZeroMQ に載せることもできます。
  11. ネットワーク・エラーをインテリジェントに処理します。リトライして意味があるときには、ZeroMQ が自動でリトライします。
  12. 軽量なので、二酸化炭素の排出量を削減します。

最大の特徴は、実現したいネットワーク処理を、ZeroMQ の考える 通信パターン によって理解、分解して、表現するという手法です。通信パターンには、RPC(Remote Procedure Call)に適した Req-Rep パターンや、データ転送に適した Pub-Sub パターンなどがあります。

一方で、メッセージの保全*7はしてくれません。ここは「ブローカード」タイプのメッセージング・ミドルウェアとの大きな違いです。例えば、ZeroMQ で RPC を実装したとして、ワーカー・プロセスがクラッシュしてリクエストが失われても、ZeroMQ は何もしてくれません。メッセージの損失が許されないケースでは、アプリケーション側で、適切な検知処理やリトライ処理を作り込む必要があります。*8

案ずるより産むが易し。早速、動かしてみましょう。

2.2.ZeroMQ のインストール

メジャーな OS であれば、インストールは基本簡単です。

今回はハードウェアに Raspberry Pi3 を想定しているので、Raspbian OSのイメージを書き込んだ MicroSD カードを指して動作検証します。

プログラミング言語は、Python3 を利用します。*9

以下のコマンドを実行してください。

apt-get update
apt-get install libzmq3-dev
pip3 install pyzmq

※ 上記作業は、root ユーザにて行ってください。

2.3.Pub-Sub パターンで通信してみる

手始めに、ZeroMQ の Pub-Sub(Publisher-Subscriber)パターンで、Python の通信プログラムを書いてみます。

このパターンは1方向のデータ配信です。しばしば、ラジオ放送に例えられます。

  • Publisher は放送局です。聞いている人の有無によらずブロードキャストします。
  • Subscriber はリスナーです。聞きたいチャネルにチューニングして受信します。
  • 聞かないで既に流れてしまったデータを、あとから受信することはできません。

Publisher側とSubscribe側のコードを説明します。

※ このコードは、The Guide のサンプルコードを元にアレンジしています。

pub_server.py

Publisher をサーバ・プロセスとして実装します。ポート 5556 で、クライアントからのコネクションを待ち受けます。

2重ループになっており、外側ループは1秒1回の sleep ループです。内側ループは1~3チャネル用のデータ(数字)を作成し送信します。チャネルは単なる文字列です。

import zmq
import time

# ZeroMQ のバックグラウンド・スレッドのコンテキスト
context = zmq.Context()

# このサーバは、ポート5556で待ちます
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")


i = 0
while True:
    i += 1

    for ch in range(1,4):
        data = ch * i
        socket.send_string("{0} {1}".format(ch, data))
        print("Ch {0} <- {1} sent".format(ch, data))

    time.sleep(1)

pub_server.py@gist

sub_client.py

Subscriber はクライアント・プロセスとして実装します。localhost:5556 にいるサーバに接続します。

接続は ZeroMQ のバックグラウンド・スレッドが処理しますので、本当に TCP コネクションが張られているかどうかは、アプリケーション側は意識する必要がありません。

チャネルは、スクリプトの引数で渡します。

import sys
import zmq


if (len(sys.argv) != 2):
    print("Usage: # python3 {} <channel>".format(sys.argv[0]))
    sys.exit(1)

ch = sys.argv[1]


# ZeroMQ のバックグラウンド・スレッドのコンテキスト
context = zmq.Context()

# このクライアントは、ポート5556に接続します(バックグラウンドにて)
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")

# チャネルの目盛りをあわせる
socket.setsockopt_string(zmq.SUBSCRIBE, ch)


while True:
    string = socket.recv_string()
    ch, data = string.split()

    print("Ch {0} -> {1} received".format(ch, data))

sub_client.py@gist

動かしてみる(その1)

ターミナルを3つの開きます。1つめで pub_server.py を動かしてみましょう。

pi@raspberrypi:~ $ python3 ./pub_server.py
Ch 1 <- 1 sent
Ch 2 <- 2 sent
Ch 3 <- 3 sent
Ch 1 <- 2 sent
Ch 2 <- 4 sent
Ch 3 <- 6 sent
Ch 1 <- 3 sent
Ch 2 <- 6 sent
Ch 3 <- 9 sent
Ch 1 <- 4 sent
...

チャネル番号の倍数をブロードキャストしています。リスナーはまだいません。

次に、2つめのターミナルで sub_client.py を実行し、チャネル2を受信してみます。

pi@raspberrypi:~ $ python3 ./sub_client.py 2
Ch 2 -> 66 received
Ch 2 -> 68 received
Ch 2 -> 70 received
Ch 2 -> 72 received
Ch 2 -> 74 received
...

2の倍数を受信できていることがわかります。

3つめのターミナルで、再び sub_client.py を実行し、チャネル3を受信してみます。

pi@raspberrypi:~ $ python3 sub_client.py 3
Ch 3 -> 114 received
Ch 3 -> 117 received
Ch 3 -> 120 received
Ch 3 -> 123 received
Ch 3 -> 126 received
...

3の倍数を受信できています。

今度は、1つめのターミナルの pub_server.py を Ctrl-C で殺したり、再度スタートしてみましょう。sub_client.py 側では何のエラーもなく、データを受信しなくなったり、再度受信するようになるはずです。*10

構成をまとめると、こんな感じ。1つの Publisher に対して不特定多数の Subscriber が接続するパターンができています。

f:id:FutureStandard:20170216210303j:plain

ZeroMQでは、このような非常に短いプログラムで、1対多のプロセス間通信を簡単かつロバストに実現できます。

2.4.サーバとクライアントを逆にしてみる

ZeroMQ にとって、サーバ側(bind)クライアント側(connect)というのは『窓口どっちにしますか?』という区別に過ぎません。窓口がサーバで、クライアントがサービスを受けに出向くのです。

そもそも TCP は双方向通信ですし、データの流れる方向を決めているのは、PUB SUB という ソケットタイプ です。サーバ/クライアントという概念とデータの流れる方向は、独立した話になります。

再びラジオ放送で例えるなら、放送局側がリスナーの送った葉書を読み上げても良いわけです。*11

前回とは逆に、クライアント側を Publisher、サーバ側を Subscriber として、通信プログラムを動かしてみます。

pub_client.py

Publisher がクライアント側なので、socket.connect() でポート 5556 に接続します。

あとで実行結果をわかりやすくするため、スクリプトの引数に alphabet が渡されたときは、チャネル1に小文字、チャネル2に大文字の文字を送信するようにしました。

引数が number なら、チャネル1に自然数、チャネル2に2の倍数を送信します。

import sys
import zmq
import time


if (len(sys.argv) != 2):
    print("Usage: # python3 {} {{alphabet|number}}".format(sys.argv[0]))
    sys.exit(1)

mode = sys.argv[1]


# ZeroMQ のバックグラウンド・スレッドのコンテキスト
context = zmq.Context()

# このクライアントは、ポート5556に接続します(バックグラウンドにて)
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:5556")


i = 0
while True:
    i += 1

    if mode == "alphabet":

        lower = chr(ord('a') + (i - 1) % 26)
        socket.send_string("1 " + lower)
        print("Ch 1 <- {} sent".format(lower))

        upper = chr(ord('A') + (i - 1) % 26)
        socket.send_string("2 " + upper)
        print("Ch 2 <- {} sent".format(upper))

    else:

        for ch in range(1,3):
            number = ch * i
            socket.send_string("{0} {1}".format(ch, number))
            print("Ch {0} <- {1} sent".format(ch, number))

    time.sleep(1)

pub_client.py@gist

sub_server.py

Subscriberはサーバ側です。ポート5556で待受けます。

チャネルは、スクリプトの引数で渡します。

import sys
import zmq


if (len(sys.argv) != 2):
    print("Usage: # python3 {} <channel>".format(sys.argv[0]))
    sys.exit(1)

ch = sys.argv[1]


# ZeroMQ のバックグラウンド・スレッドのコンテキスト
context = zmq.Context()

# このサーバは、ポート5556で待ちます
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:5556")

# Channel をサブスクライブ
socket.setsockopt_string(zmq.SUBSCRIBE, ch)


while True:
    string = socket.recv_string()
    ch, data = string.split()

    print("Ch {0} -> {1} received".format(ch, data))

sub_server.py@gist

動かしてみる(その2)

先ほどと同様に、3つのターミナルを開きます。1つめで pub_client.py を動かしてみます。

pi@raspberrypi:~ $ python3 ./pub_client.py number
Ch 1 <- 1 sent
Ch 2 <- 2 sent
Ch 1 <- 2 sent
Ch 2 <- 4 sent
Ch 1 <- 3 sent
Ch 2 <- 6 sent
...

チャネル1と2に、チャネル番号の倍数をブロードキャスト中。リスナーはまだいません。

次に、2つめのターミナルで sub_server.py を動かして、チャネル2のみ受信してみます。

pi@raspberrypi:~ $ python3 sub_server.py 2
Ch 2 -> 262 received
Ch 2 -> 264 received
Ch 2 -> 266 received
Ch 2 -> 268 received
Ch 2 -> 270 received
...

2の倍数を受信できています。

ここで、3つめのターミナルでも pub_client.py を動かし、今度は英字をブロードキャストしてみます。

pi@raspberrypi:~ $ python3 ./pub_client.py alphabet
Ch 1 <- a sent
Ch 2 <- A sent
Ch 1 <- b sent
Ch 2 <- B sent
Ch 1 <- c sent
...

sub_server.py では、別々の pub_client.py がブロードキャストしている数字と英字を、両方同時に受け取れているでしょうか。

...
Ch 2 -> 762 received
Ch 2 -> 764 received
Ch 2 -> 766 received
Ch 2 -> B received
Ch 2 -> 768 received
Ch 2 -> C received
Ch 2 -> 770 received
Ch 2 -> D received
Ch 2 -> 772 received
Ch 2 -> E received
Ch 2 -> 774 received
Ch 2 -> F received
...

2つのクライアントからのメッセージを、チャネル2だけ、受信できています!

途中でクライアント側やサーバ側のいずれかを Ctrl-C で止めたり、再度スタートすると、残りのスクリプトは何の問題も無く動き続けるのがわかると思います。

構成をまとめると、こんな感じ。多数のクライアントが Publish したデータを、1つの Subscriber が集約するパターンになっています。

f:id:FutureStandard:20170216221915j:plain

ZeroMQ が、多対1のメッセージ交換もよろしくハンドリングしてくれました。

2.5.Python vs. バイナリデータ

ZeroMQは、メッセージの中身に関与しないため、どんなバイナリデータ(Blob)でも送受信できますが、ここで言うバイナリデータとは バイト列 のことです。*12

一方、Python の扱うオブジェクトは、メモリ内部では、必ずしもシンプルなバイト列になっていません。

この2つの世界を橋渡しして、Python の内部表現から、ネットワークに送信可能なバイト列を生成することを リアライゼーション と呼びます。

ZeroMQ の Python バインディングである PyZMQ では、最初から簡単なシリアライゼーションの仕組みを提供しています。

Socketのメソッド 機能説明
send_json() Pythonオブジェクトを JSON 表現のバイト列に変換して送信
recv_json() JSON 表現のバイト列を受信して、Pythonオブジェクトを生成
send_pyobj() Pythonオブジェクトを pickle表現のバイト列に変換して送信
recv_pyobj() pickle 表現のバイト列を受信して、Pythonオブジェクトを生成
send_string() Pythonユニコード文字列を、単なるバイト列に変換して送信
recv_string() バイト列を受信して、Pythonユニコード文字列を生成

これまでのサンプルプログラムは、単なる文字列の送受信でしたので send_string() / recv_string() を用いました。

もし Python の整数「7」を、4バイトのバイト列にシリアライズして、ZeroMQ で送信する場合には、以下のコードを用います。

i = 7
data = i.to_bytes(4, 'little')
socket.send(data)

バイト列を受信後、Python の整数に戻す場合は以下のコードとなります。

data = socket.recv()
i = int.from_bytes(data, 'little')

バイト列リテラルは、b 接頭辞でも作れます。send_string() の代わりに send() を使って文字列を送信するコードは次のようになります。

socket.send(b"Any ASCII string")

日本語など、非ASCII 文字列を送信するときには、encode()メソッドを利用できます。

socket.send("日本語メッセージ".encode('utf-8'))

参考:

2.6.マルチパート・メッセージ

Pub-Sub の Python プログラムでは、メッセージは1つの文字列で構成されていました。より複雑なデータをやり取りする場合、データの切れ目 をメッセージに含められると便利です。

例えば、Python の Numpy 配列は、① 型(dtype) ② 形(shape) ③ バイト列で構成できます。この3つのデータを、不可分な単一メッセージにまとめて送受信できると、アプリケーション側の処理を簡単でできます。

ZeroMQ では、このような区切りの入ったメッセージを マルチパート・メッセージ と呼びます。メッセージの区切られた各パートは フレーム と呼びます。

マルチパート・メッセージは、ZeroMQ 内部でのルーティングなどに活用されていますし、アプリケーションから利用する際には Scatter-Gather API のように扱えますので、分割されると意味がなくなるようなデータを、自前で一旦連結してから送信するといった手間がいらなくなります。

ただし、あくまでもメッセージの区切りを入れるだけなので、フレーム単位で送受信している訳ではありません。すべてのフレームがメモリ内で準備できてから、メッセージ全体の送信が行われます。*13

早速、マルチパート・メッセージで Numpy 配列を送受信してみます。メッセージの形式は下図のとおりです。

f:id:FutureStandard:20170219213951j:plain

今回は、Pub-Sub パターンではなく Req-Rep(Request-Reply)パターンを使います。Req-Rep パターンは、データの単純な交換に向くパターンです。Reqソケット側は、必ず先にデータを送信し、その後必ずデータを受信しなければいけません。2回連続で送信することはできません。反対に Rep ソケット側は、必ず先に受信し、その後データを送信します。*14

Pub-Sub パターンと異なり、通信ピアは、互いに準備ができたと認識するまでデータと送信しないため、確実なデータ転送が可能です。

req_server.py

PyZMQ では、socket クラスに send_multipart() という便利なメソッドが用意されているため、Python リストを渡すだけで、マルチパートのメッセージを送信できます。受信でも recv_multipart() でリストを受け取れます。

import zmq
import numpy as np
import json

# ZeroMQ のバックグラウンド・スレッドのコンテキスト
context = zmq.Context()

# このサーバは、ポート5556で待ちます
socket = context.socket(zmq.REQ)
socket.bind("tcp://*:5556")

# 送信するデータの生成:2行3列の整数配列
M = np.asarray([[0,1,2],[2,1,0]])


# M をシリアライズ
dtype = str(M.dtype).encode('utf-8')
shape = json.dumps(M.shape).encode('utf-8')
data = M.tostring('C')

# マルチパートメッセージとして送信
socket.send_multipart([dtype, shape, data])

print("Request:\n{} sent".format(M))


# 計算結果を受け取る
[dtype2, shape2, data2] = socket.recv_multipart()

# Numpy Array にデシリアライズ
N = np.frombuffer(data2, dtype=dtype2.decode('utf-8'))
N.shape = json.loads(shape2.decode('utf-8'))

print("Reply:\n{} received".format(N))

req_server.py@gist

配列のバイト列を得るために tostring() *15を利用しています。 また、バイト列から Numpy 配列を再構築するために frombuffer() を利用しました。

rep_client.py

Repソケットから、Numpy 配列を受取り、全要素に +1 して1秒待ってから、結果を送信しています。

import zmq
import numpy as np
import json
import time

# ZeroMQ のバックグラウンド・スレッドのコンテキスト
context = zmq.Context()

# このクライアントは、ポート5556に接続します(バックグラウンドにて)
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:5556")


# シリアライズされた Numpy Array を受信
[dtype, shape, data] = socket.recv_multipart()

# Numpy Array にデシリアライズ
M = np.frombuffer(data, dtype=dtype.decode('utf-8'))
M.shape = json.loads(shape.decode('utf-8'))

print("Request:\n{} received".format(M))

# なにか計算してみる; 各要素に+1
M = M + 1

# ちょっと待ってみる
time.sleep(1)

# 計算結果を返信
socket.send_multipart([dtype, shape, M.tostring('C')])

print("Reply:\n{} sent".format(M))

rep_client.py@gist

動かしてみる(その3)

サーバ側とクライアント側の どちらを先に起動しても結果は同じ です。ZeroMQ が諸々の面倒をみてくれています!

サーバ Req 側

pi@raspberrypi:~ $ python3 req_server.py
Request:
[[0 1 2]
 [2 1 0]] sent
Reply:
[[1 2 3]
 [3 2 1]] received

クライアント Rep 側

pi@raspberrypi:~ $ python3 rep_client.py
Request:
[[0 1 2]
 [2 1 0]] received
Reply:
[[1 2 3]
 [3 2 1]] sent

参考:

2.7.様々なソケットタイプ

ZeroMQ には Pub-Sub ソケットや、Req-Rep ソケット以外にも様々なソケットタイプがあります。以下に、ごく簡単にですが、ソケットタイプの紹介をします。

タイプ 概要 詳細
PUB ブロードキャスト送信 SUBと利用。接続中でない時やキュー溢れ時のメッセージは捨てられる。
SUB ブロードキャスト受信側 PUBと利用。接続中でない時やキュー溢れ時のメッセージは捨てられる。
PUSH データ配布送信 PULLと利用。複数接続では Round-robin でメッセージ配信。接続中でない時やキュー溢れ時の送信は待つ。
PULL データ配布受信側 PUSHと利用。複数接続では Fair-queued でメッセージ受信。接続中でない時やキュー溢れ時の受信は待つ。
REQ RPC 要求側 REPやROUTERと利用。必ず先に送信してから受信しなければならない。キュー溢れ時は待つ。
REP RPC 返信側 REQやDEALERと利用。必ず先に受信してから送信しなければならない。キュー溢れ時は待つ。
DEALER 非同期版 REQ REPやROUTERと利用。返信を待たず連続でデータ送信でき、複数接続で要求をばら撒ける。キュー溢れ時は待つ。
ROUTER 非同期版 REP REQやDEALERと利用。受信したデータの返信を、バラバラな順番で返せる高度なソケット。キュー溢れ時は待つ。
PAIR スレッド間パイプ プロセス内スレッドでの利用に特化している。キュー溢れ時は待つ。

詳細は ZeroMQ The Guide “Chapter 2 - Sockets and Patterns”を参照してください。

参考:

前編まとめ(つづく)

大分長々と書いてしまいましたが、ZeroMQ の説明は以上です。

次からは応用編に入ります。


*1:Spring AMQP × RabbitMQ

*2:マイクロサービスの終焉

*3:すなわちブローカー

*4:スレッド数を複数にして、全コアの性能を引き出すこともできます。

*5:このプロキシーとは、要はブローカーです

*6:メッセージを途中まで受信して残り部分を待つケースを考えないで良いということです

*7:突然の電源断に備えて、未処理のメッセージをローカルディスクに保管するなど

*8:そのための様々なベストプラクティスが The Guide の Chapter 4 で議論されています。

*9:ZeroMQ は多数の言語バインディングを提供していますので、興味のある方は、言語バインディング一覧のページを参考にしてください。

*10:注意深い方は、サーバの送信した最初のメッセージが、クライアントに届いていないことに気づくと思います。これは Slow Joiner 現象と呼ばれ、ZeroMQ の構造上の限界です。メッセージロスが許されないケースでの回避策が The Guide に掲載されています

*11:この時、サーバの役割である放送局は Subscriber、クライアントであるリスナーは Publisher になります。葉書のコーナー名がチャネルといったところでしょうか。

*12:Pythonの言葉で表現するなら、bytes オブジェクトや bytearray オブジェクトです。

*13:受信のときも、すべてのフレームがメモリ内に準備できてから、ZeroMQ からの受信成功の知らせが来ます。

*14:少々制約が厳しいように思いますが、この制約のお陰で Req 側と Rep 側のどちらを先にスタートしても同じ結果が得られます。より高度な通信のユースケースでは、代わりに DEALER ソケットと ROUTER ソケットを使います。

*15:Raspbian の Numpy バージョンが 1.8.2-2 だったためで、Numpy 1.9以降であれば tobytes() を利用できます。意味は同じです。

SCORER SDK正式版リリース

 

はじめに

1月31日にラズパイとUSBカメラを使ってPythonで簡単に映像解析(顔検知とかバーコード読み取りとかAI物体認識とか)を使うことができるSCORER SDKをリリースしたので簡単に紹介します。

SCORERとは?

ラズパイにUSBカメラをつけてスマホから登録すると、クラウド上に映像を長期間ためたり、撮った映像に顔が映ってたりすると自動的に何秒見ていたかなどカウントしてくれるスマートカメラのシステムです。

使えるアルゴリズムが多岐にわたるため、アパレルショップのマネキンとかにつけて視聴率を測ったり、飲食店の込み具合を測定したり、パスポートの文字を読み取ったりといろいろなことに活用できたりします。最近はやりのディープラーニングを使った認識などもつかえます。

f:id:FutureStandard:20170202234520p:plain

詳細ページはこちら

https://peraichi.com/landing_pages/view/scorer

SCORER SDKは、もともといろいろな用途に使えるSCORERをさらにプログラム可能にした開発環境で、温度や光センサーとカメラを連動したIoTシステム(明るくなったら写真撮るなど)を簡単に開発することができるようにしたものです。

詳細・ダウンロードはこちら

https://peraichi.com/landing_pages/view/scorersdk

ためしに使ってみよう 

いろいろ細かいことはおいておいて、実際使ってみた方が良さを分かっていただけると思うので、今回は顔をカメラに移したらLINEに通知するだけの簡単なシステムを組んでみたいと思います。

全体図

ハードウェア

f:id:FutureStandard:20170203000506j:plain

 ラズパイ3にBuffaloのUSBカメラと電源をさします。とりあえずこれだけ。

ソフトウェア

f:id:FutureStandard:20170203001922p:plain

顔を映し続けると何回も通知が行ってしまうので、そうはならないように1度検知したら5秒間は送らないという処理を入れます。

LINE notifyのアクセストークンを取得する

Lineに通知を行うために今回はLINE Notifyを使います。

ほんとはBotを使いたいところですが、双方向でやりとりが必要なBotだとローカルアドレスしかないラズパイだけで実現することは難しく、インターネット上にSSLのサーバーを立てなければいけないなど少し手間がかかりそうでしたので、一方的に通知ができるLINE notifyを選びました。

notify-bot.line.me

にアクセスして、スマホのLINEアプリとおなじログインID・PASSでログインします。

マイページを開くと

f:id:FutureStandard:20170204121131p:plain

という画面になるので、「トークンを発行する」を押します。

f:id:FutureStandard:20170204121241p:plain

1:1でLINE Notifyから通知を受け取る

を今回選び、トークン名を記入して発行するボタンを押すと

f:id:FutureStandard:20170204121551p:plain

トークンが発行されます。

このコードを下に書いてあるプログラムの

【Line Notifyのパーソナルアクセストークン】

に代入します。

プログラムを書く

SCORER SDKはラズパイをモニターにつないだりせず、PCのブラウザだけで簡単に使い始められる仕組みです。

開発環境までは下記のマニュアルをご覧ください

http://downloads.scorer.jp/sdk.html

 

【参考】ブラウザ内開発環境のCloud9の画面

f:id:FutureStandard:20170203003952p:plain

今回書いたプログラムがこちら

 【Line Notifyのパーソナルアクセストークン】

 には先ほど取得したトークンを各自入れてください。

 

 実質30行ちょいのコードですが、

cap = scorer.VideoCapture(0)

scorer.imshow(1, gray)

などSCORERライブラリを使ったところがいくつかあります。

可能な限りopenCVの書式に合わせ、openCVではできないWEB経由での画像配信(imshow相当)などを実装しています。

フレーム取得時間なんかも取れるので、通知に使ったフレームから5秒たつまで通知のIFブロックから除外するという処理を入れています。

顔検知については模様などを誤検知することがあるので、通知は最低サイズで足切りしています。

 画像送るところはLINE Notifyの公式ブログでもきちんと書かれたところがなかったので、

curl.setopt(pycurl.HTTPPOST, [('message', buffer.encode('utf-8')),('imageFile', (curl.FORM_FILE, 'detected.jpg'))])

このように一度detected.jpgで保存したものをフォームとして渡す処理をしています。

【参考】

developers.linecorp.com

検知結果

f:id:FutureStandard:20170204125620p:plain

検知されました!

 

検知の様子を動画にしてみました。

youtu.be

 

旧大統領だけでなく現大統領もいます

さいごに

今回は顔検知+LINE通知でしたが、これを温度センサー+Slack通知にもできますし、たくさんのSCORERカメラを用意して、雨雲監視網を作るなど様々なIoT的な用途に活用できるのが想像できるのではないでしょうか?

SCORERはとにかく簡単に、誰でもカメラやセンサーを使ったIoTの仕組みを最速で作り上げられるように考えて日々改善しているので、無料で使えるので是非一度おためしください!

https://peraichi.com/landing_pages/view/scorersdk

Express + Passport + Cognito でサーバーサイドのユーザー認証を手軽に実装

データの分析を生業にしているはずが、なぜか Node.js でサーバー&フロントエンドの開発をしている金田です。この度、Amazon Web Service (AWS) の提供する Cognito User Pools というユーザー認証機能を Node.js から簡単に利用できるモジュールを作成しましたので紹介できればと思います。

www.npmjs.com

概要

AWS には、Cognito User Pools というユーザー認証機能を簡単に実装できるマネージドサービスがあります。しかしながら、AWSから提供されているSDKは、基本的にクライアントサイドでの利用を想定しており、iOSAndroidJavaScript(ブラウザ版) しか提供されていません。今回、弊社で開発をしている SCORER という動画解析サービスの開発にあたり、Node.js を使ったサーバーサイドでの認証に Cognito User Pools を利用したかったため、Passport という認証用ミドルウェアを作成することで簡単に実装が行えるようにしました。

SCORER とは?

動画データの保存、解析、分析といった映像解析ソリューションを開発する際に必要になる機能を、ハードウェアからソフトウェアまでワンストップで提供するBaas (Backend as a Service) です。映像解析を活用したサービスを提供したい開発者が、簡単に映像解析技術を利用したサービスを開発できるプラットフォームを目指しています。現在は、標準機能として、下記の画像にあるように顔検知、動体検知、AR/QRコードの読み取りといった機能を利用することができます。

f:id:FutureStandard:20161019165038p:plain:w280 f:id:FutureStandard:20161019164759p:plain:w350

Cognito とは

Cognito はAWSの提供するユーザー認証基盤です。実は、Cognito には、Cognito Identity と Cognito User Pools の二つのサービスがあり、前者はFacebookGoogleTwitter といった外部もしくは自前のIDプロバイダを必要とするのに対して、後者はユーザー登録、多要素認証、パスワード変更といったユーザー認証に関わる一切のサービスを提供してくれるため、簡単に独自ユーザー認証基盤を作成することができます。当初、Cognito を使い始めたときに、この辺りの違いが理解できず苦労しました。

ただし、Cognito Identity と Cognito User Pools は連携をさせることもでき、この場合は、Cognito User Pools が外部IDプロバイダと同じ役割をすることになります。詳しくは、下記のリンクの Use case 17. あたりが詳しいです。
GitHub - aws/amazon-cognito-identity-js

参考:
Amazon Cognito User Poolsを使って、webサイトにユーザ認証基盤を作る - Qiita
[新機能] Amazon Cognito に待望のユーザー認証基盤「User Pools」が追加されました! | Developers.IO
Amazon Cognito User Poolsの情報をサーバサイドで取得する - Qiita

Passport とは

Express という Node.js のフレームワークとともに利用される認証用のミドルウェアです。FacebookGoogle等を使ったOAuth認証、ベーシック認証といった様々な認証方法をを統一的なAPIで利用できるため Express をで認証機能を利用する際は、事実上の標準となっています。今回は、 Cognito User Pools の認証ができる Passport のモジュールを作成して公開しました。

使い方

Express と Passport の使い方に関しては下記の記事が大変参考になりました。本稿の説明も、基本的にはこちらに書いてある Passport の実装方法に拠っています。 express実践入門 · GitHub

また、Cognito User Pools の設定はすでになされているものとします。詳しい設定方法に関しては下記の記事が詳しいのでご参照ください。

[新機能] Amazon Cognito に待望のユーザー認証基盤「User Pools」が追加されました! | Developers.IO
Amazon Cognito User Poolsを使って、webサイトにユーザ認証基盤を作る - Qiita

なお一点だけ注意点があり、JavaScript から利用する場合は、User Pools の App の設定画面で「Generate client secret」のチェックを必ず外してから作成を行ってください。

f:id:FutureStandard:20161019210305p:plain

インストール

npm のパッケージとして公開しているため、下記のコマンドを入力するだけでインストール可能です。必要に応じて --save などのオプションをつけてください。

$ npm install passport-cognito

ブラウザ側の実装

ブラウザ側からは、フォームを入力してボタンを押した際に、POSTで username と password を送るように設定します。例えば、jQueryで値を送る場合は下記のようになります。

$.ajax({
  type: 'POST',
  url: '/auth/cognito',
  data: { username: username, password: password }
})

サーバーサイドの実装

Passport と middlware の設定

まず、Passport モジュールの読み込みとセッション用の middleware を設定します。ここはほぼコピペでOKです。

app.js

var passport = require('passport');

// passportモジュールをLoad
require('./passport')(app);

// session用のmiddlewaresを有効化
app.use(passport.initialize());
app.use(passport.session());

Strategy の設定

次に、Cognito 認証の Passport モジュールを読み込み、コンストラクタから必要なオプションを与えてインスタンスを生成します。コンストラクタの2つ目の引数が、コールバック関数になっており、認証が成功した場合はここでトークンやユーザー情報が取得できますので、必要に応じて処理を書いてください。なお、この user オブジェクトにトークン情報などを格納しておくと、ルーティングの際に、req.user を参照することで情報を引き回すことができるようになります。

config/passport/cognito.js

var CognitoStrategy = require('passport-cognito')
 
module.exports = new CognitoStrategy({
    userPoolId: 'ap-northeast-1_eSjqLfqKc',
    clientId: 'vtvg02tr21zmxvspyvawtv09b',
    region: 'ap-northeast-1'
  },
  function(accessToken, idToken, refreshToken, user, cb) {
    process.nextTick(function() {
      ...
      cb(null, user);
    });
  }
);

Serialize/Deserialize の設定

セッションへの情報の格納と復元の設定を行います。なお、ここで上記のコールバック関数の引数で渡した user を session 情報に格納しているため、req.user で値を引き回すことができるようになります。ここもほぼコピペでOKです。

config/passport.js

module.exports = function(){
  var passport = require('passport');

  // sessionにユーザー(のキー)情報を格納する処理
  passport.serializeUser(function(user, done) {
    done(null, user);
  });

  // sessionからユーザー情報を復元する処理
  passport.deserializeUser(function(obj, done) {
    done(null, obj);
  });

  // 利用するstrategyを設定
  passport.use(require('./passport/cognito'));
}

Routing の設定

下記のエンドポイントにPOSTでリクエストがあった際に、passport.authenticate() が呼ばれるようにします。なお、この際、req.body.username と req.body.password でブラウザから送られてきた値が取得できている必要があります。

app.post('/auth/cognito',
  passport.authenticate('cognito', {
    successRedirect: '/',
    failureRedirect: '/login'
}));

最後に

上記の設定ができると、Cognito User Pools の認証は Passport モジュールが裏でいい感じに行ってくれ、AWSの他のサービスのアクセス等に必要なトークンの情報や User Pools に登録してあるユーザー情報が取得できるようになります。Express と Passport に多少は習熟する必要はありますが、上記のモジュールを使うと簡単にCognito User Pools の認証処理が実装できることをお分かりいただけたのではと思います。

なお最後にはなりますが、上記で紹介した SCORER の開発キットは Yahoo! ショッピングで絶賛販売しております!興味を持って頂けたらぜひ購入をお願いします。Raspberry Pi3、USBカメラ、SDカードが付いており、ほぼ原価で販売してますので大変お得になっております(笑)

Future Standard - Yahoo!ショッピング