Thor's Columns
PyQtでお手軽GUI開発♪―――は可能だったか? 第14回 簡単チャット編

PyQtでお手軽GUI開発♪―――は可能だったか? 第14回 簡単チャット編

 前回、ローカルマシンでの並列処理は使用しているPCがショボかったんであまり効果が出ませんでした。そこで最後の手段として、リモート並列処理をやってしまおうというのが今回のテーマです。

リモート並列処理

 リモート並列処理とはネットワークでつながった複数のPCを使って並列計算しようということです。

 このような場合、他のマシンとのデータ転送はローカルマシン内のプロセス同士に比べて遙かに遅くなってしまうのは仕方がありませんが、マンデルブロ計算のような遅い処理の場合、1時間かかる処理を15分で終わらせたいといったことになるので、そういうところは無視できます。


 ここで一番の問題になるのが、私が今までそんなことをやったことがない、ということでしたw

 これから何を作らなければならないかというと、要するにネットワーク上にサーバーを立てて、そこに接続したクライアントとデータ通信するシステムを構築しなければならないわけですが―――しかし、そういうのってマルチプロセスよりももっと無謀なんじゃないでしょうか?


 実際、そのあたりに関しては本家ドキュメントの18. プロセス間通信とネットワークにとてもとても詳しく書いてあって、おかげで見ても何のことやらよく分かりません。

 ところがそんなところにふっと見つけたのが、また同じくmultiprocessingモジュールにあった、17.2.2.10. リスナーとクライアントという項目でした。

 ここを見てみると……

17.2.2.10. リスナーとクライアント


通常、プロセス間でメッセージを渡すにはキューを使用するか Pipe() が返す Connection オブジェクトを使用します。


しかし multiprocessing.connection モジュールにはさらに柔軟な仕組みがあります。 このモジュールは、基本的にはソケットもしくは Windows の名前付きパイプを扱う高レベルのメッセージ指向 API を提供します。

 ―――ぱっと見にはパイプやキューのもうちょっと便利なバージョンかとも思えるのですが、なんですか? ソケットって、もしかしてTCP/IPのソケットのことでしょうか? と、思ってもうちょっと下の方を見ると……

注釈:‘0.0.0.0’ のアドレスを使用する場合、Windows 上の終点へ接続することができません。終点へ接続したい場合は ‘127.0.0.1’ を使用すべきです。

 注釈の内容はともかく、これって間違いなくIPアドレスのことです。ならば原理的にはこれで世界中のマシンに接続できる、ということになるのですが……


 そこでこの節をもっとよく読んでみると、まずサーバーの場合には、

  1. 指定したaddressのListenerオブジェクトを作ります。
  2. そのacceptメソッドで外部からの接続を待ちます。
  3. クライアントが接続してきたらConnectionオブジェクトが返ります。

 クライアントの場合には、

  1. サーバaddressを指定してClient関数を実行します。
  2. 接続に成功したらConnectionオブジェクトが返ります。

 そしてそのConnectionオブジェクトには例えば以下のようなメソッドが用意されています。

send(obj) コネクションの相手側へ recv() を使用して読み込むオブジェクトを送ります。
recv() コネクションの相手側から send() を使用して送られたオブジェクトを返します。
poll() 読み込み可能なデータがあるかどうかを返します。
close() コネクションをクローズします。

 sendメソッドで送れるオブジェクトはpickle化(シリアライズ)できるものに限られるということですが、リンク先を見てみれば普通のデータならまず問題ありません。

 要するにconnectionオブジェクトができてしまえば、あとはそれで通信し放題ということのように思えるのですが……


 というわけでこれまた半信半疑でドキュメントにあったサンプルをコピペして動かしてみたら―――なんと拍子抜けするくらいにあっさりとつながってしまいました!

 これまた本来ならばここで、まずはウンともスンとも(以下略)とかいった感じで地獄のように難航するはずだったんですが……


 ともかくつながってしまった以上、あとは実際にデータをやりとりする仕組みを作るだけです。

 まずドキュメントのサンプルはデータを一発送って終わりですが、実際にはポーリングループを書かなければなりません。サーバ側では複数のクライアントからの接続を管理しなければなりません。

 そういった場合には結局それらの機能をスレッド化する必要が出てきますが―――前回に予備検討しているのでこれもほとんど問題なさそうです。


 またデータやりとりのためのプロトコルも決めなければなりませんが、まあ今回のような場合なら、接続とデータのやりとり、終了といった信号を適当に決めておけばいいでしょう。


 しかし何しろ初めてなので、まずテスト用に簡単なチャットサーバーとクライアントを作ってみることにします。それができればマンデルブロサーバーに改造するのは容易です。


簡単なチャットサーバー

 というわけで以下が作ってみたチャットシステムです。


 まずチャットサーバーは以下のような動作をしています。

まず親スレッド(メインプロセス)は最初にクライアントを待つ待ち受けスレッドを作る。


待ち受けスレッドに接続が来た場合

  1. ステータスキューを使って親スレッドに接続情報を送る。
  2. 今後クライアントとのやりとりはそのスレッドで行う。
  3. 親スレッドはステータスキューの情報を見て、新しい待ち受けスレッドを作る。

クライアントからチャットメッセージが来た場合

  1. 子スレッドはメッセージキューにメッセージをセットする。
  2. 親スレッドはメッセージキューを監視してメッセージがあれば、接続している全ての子スレッドのメッセージバッファに値をセットする。
  3. 子スレッドはメッセージバッファに何か値が入っていたら、それをクライアントに送り返す。

クライアントから終了コマンドが来ていた場合

  1. 子スレッドはステータスキューにコマンドを入れてクライアントとは切断処理を行う。
  2. 親スレッドはステータスキューを監視して、接続の終わったスレッドがあれば消す。

 こういった仕様で書いてみたサンプルが以下のようなものです。

# -*- coding: utf-8 -*-
"""簡単なチャットサーバー"""

from threading import Thread
import time
from multiprocessing.connection import Listener

address = ('192.168.10.8', 6000)
endcode = ':end'

class serverthread(Thread):

    def __init__(self, Listener, Qstatus, Qmessage):
        """サーバースレッドの初期化"""

        super().__init__()

        self.listener = Listener
        self.QStatus = Qstatus
        self.Qmessage = Qmessage
        self.chatmessage = []
        self.connecting = False

    def run(self):
        """チャット処理を行うスレッド関数"""

        #外部からの接続を待つ
        with self.listener.accept() as conn:

            #接続が来たときの処理
            useraddr = listener.last_accepted
            self.connecting = True
            self.QStatus.append((True, self))       #親プロセスに状況を通知
            self.Qmessage.append('{} が接続したよ!'.format(useraddr))

            #メイン処理ループ
            s = ''
            while True:
                if conn.poll():                     #クライアントからメッセージが来てないか調べて
                    s = conn.recv()                 #来ていればデータを受け取る
                    if s == endcode:                #終了コードだった場合は抜ける
                        self.Qmessage.append('{} が切断したよ!'.format(useraddr))
                        break
                    self.Qmessage.append(s)         #親に通信(親がchatmessageに入れる)
                    time.sleep(0.03)                #ちょっと待っていれば親が処理する

                #chatmessageに親プロセスが値をセットしていた場合(他のユーザーからのメッセージ等も含め)
                if self.chatmessage != []:
                    for msg in self.chatmessage:
                        conn.send(msg)              #メッセージを送り出す
                    self.chatmessage.clear()
                time.sleep(0.01)

            #終了処理
            conn.send(endcode)                      #クライアントに終了を通知する
            conn.close()
            self.connecting = False
            self.QStatus.append((False, self))      #親プロセスに終了を通知する

if __name__ == '__main__':

    print('chatserver start')
    q_status = [(True, None)]       #各スレッドの状態を知らせるキュー(状態、スレッド)のタプル
                                    #開始時に最初の待ち受けを作らせるためTrueになっている
    q_message = []                  #チャットメッセージを知らせるキュー
    users = []                      #スレッドのリスト(users[-1]が待ち受けスレッドになる)

    with Listener(address, authkey=b'secret password') as listener:

        #親プロセスのメインループ
        while True:

            #コネクションのチェック
            if len(q_status) > 0:
                for stat in q_status:
                    #コネクションが来ていた場合新しい待ち受けスレッドを作る
                    if stat[0] == True:
                        users.append(
                                serverthread(listener, q_status, q_message))
                        users[-1].start()
                    #セッションが終了していた場合スレッドを終了させる
                    else:
                        stat[1].join
                        users.remove(stat[1])
                q_status.clear()

            #メッセージのチェック
            if len(q_message) > 0:
                #メッセージが来ていたら全員に知らせる
                for t in users:
                    if t.connecting:
                        t.chatmessage = t.chatmessage + q_message
                q_message.clear()

            time.sleep(0.01)

    #面倒なんで終了処理とかは無しw

 読んでもらえば難しいことは一切していません。

簡単なチャットクライアント

 続いてクライアントの仕組みです。


 まず以下のような簡単なウインドウを作ります。



 これはメニューにチャットの接続切断、それから終了コマンドがあるだけで、本体下のlineEditにメッセージを入力したらサーバーにメッセージが送られ、上のtextEditにサーバーからのチャットのメッセージが表示されるというものです。


 サーバーとの通信をするところはスレッド化していますが、やることがシンプルなのでサーバーのようにThreadのサブクラスを作るようなことはせず、下記のようにThreadオブジェクトのコンストラクタのtargetパラメータにrecive_message関数を指定する方法で作成しています。

# -*- coding: utf-8 -*-
"""簡単なチャットクライアント"""

import sys
import time
import os
from threading import Thread

from PyQt5 import QtWidgets as Qw
from multiprocessing.connection import Client

import uiconvert

import chatform

address = ('192.168.10.8', 6000)
endcode = ':end'

def recive_message(self, connection):
    """サーバーからメッセージを受け取って表示するスレッド関数"""

    s = ''
    while self.threadliving:
        if connection.poll():
            s = connection.recv()
            if s == endcode:
                break
            self.ui.textEdit.append(s)
        time.sleep(0.01)

class MyForm(Qw.QMainWindow):

    def __init__(self, parent=None):
        """MyFormの初期化を行う"""

        super().__init__(parent)
        self.ui = chatform.Ui_MainWindow()
        self.ui.setupUi(self)

        self.threadliving = False
        self.username = os.environ['USERNAME']
        self.lbl = Qw.QLabel()
        self.ui.statusbar.addPermanentWidget(self.lbl)
        self.lbl.setText('切断中')

    def startChat(self):
        """チャットの開始をする"""

        if not self.threadliving:
            self.connection = Client(address, authkey=b'secret password')
            self.messagegetter = Thread(
                    target=recive_message, args=(self, self.connection))
            self.threadliving = True
            self.messagegetter.start()
            self.lbl.setText('接続中')

    def endChat(self):
        """チャットの終了をする"""

        if self.threadliving:
            self.connection.send(endcode)   #サーバーに終了の通知をする
            self.messagegetter.join()       #スレッド終了を待つ
            self.threadliving = False
            self.connection.close()
            self.lbl.setText('切断中')
            self.ui.textEdit.append('切断しました')

    def sendMessage(self):
        """LineEditにテキストが入力された"""

        s = self.ui.lineEdit.text()
        if self.threadliving:
            if s == endcode:
                self.endChat()
            else:
                self.connection.send(
                        '{}: {}'.format(self.username, s))
        self.ui.lineEdit.clear()

    def closeEvent(self, event):
        """終了時には通信を終了する"""

        self.endChat()

if __name__ == '__main__':

    app = Qw.QApplication(sys.argv)
    wmain = MyForm()
    wmain.show()
    sys.exit(app.exec_())

 動かして見たのが以下の例です。



 これもほとんどトラブル無しにできてしまいました。

multiprocessing.connection.wait関数

 さて無事動いてめでたしめでたしと思っていたんですが、あとから見れば実はこれは必要以上に冗長なことをやっていたのでした。


 というのがサーバーの待ち受けに関してなのですが、connectionオブジェクトpoll関数の所に……

multiprocessing.connection.wait() を使って複数のコネクションオブジェクトを同時にポーリングできることに注意してください。

 ―――などということが書いてあったのです。

 最初は何のことかよく分からなかったんですが、要するにこの地味な名前の関数は複数のコネクションオブジェクトのリストをパラメータとして与えれば、その中から通信が来たコネクションのリストを返してくれる関数だったのです。


 前述のチャットサーバーでは待ち受けスレッドを作って、そこに接続が来たらキューでメインスレッドに通知して、メインスレッドが新しい待ち受けスレッドを作って云々ということをやっていました。

 しかしこのwait関数を使う前提であれば、待ち受けスレッドは来た接続をconnectionオブジェクトのリストに追加して新たな待ち受けを行うという処理だけになり、待ち受け関連でメインスレッドがすることはありません。

 そしてメインスレッドではそのconnectionリストをパラメータに取るwait関数をwhileループで回しているだけで、各クライアントとの通信処理は事足りてしまうわけです。connectionごとに新たなスレッドを作って云々なんてことをする必要はなかったのです。


 ―――しかし、ここでwaitを使って書き直すのももう面倒なので、とっとと本番のマンデルブロサーバーを作ってしまうことにします。

2017-05-18