1 year ago

#373324

test-img

SSAH

How to write pytest for Websocket Client endpoint

I used WebsocketApp to create a python Websocket client(synch) which interacts with Websocket server. Now, I am interested in integrating the Websocket client in end to end test. I tried to write a Pytest for Websocket client, but because I am a beginner with Pytest, so I blocked and I do not know how should I continue it.

this is my code:

import json

import websocket

no_message = 0
count = 1


def on_error(ws, error):
    print(error)


def on_close(ws, close_status_code, close_msg):
    print("############## closed ###############")


def on_message(ws, message):
    try:
        if message is not None:
            print("RESPONSE FROM SERVER.......", message)
            global no_message
            no_message += 1
            if no_message >= count:
                ws.close()
        if message is None:
            ws.close()

    except:
        print("error")
        ws.on_error()


def on_open(ws):
    global count
    message = {
        "session_id": "String",
        "destination": "127.0.0.1",
        "count": count,
        "command": "LIVE_PING",
    }
    count = message.get("count")
    ws.send(json.dumps(message))


if __name__ == "__main__":
    websocket.enableTrace(True)
    host = "ws://127.0.0.1:8000/"
    ws = websocket.WebSocketApp(
        host,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    ws.run_forever()

and this my pytest for:

import json
import unittest.mock as mock

import websocket


class Testws():
    def test_on_close(self):
        print("Close")

    def test_on_error(self):
        print('ERROR')

    def test_on_message(self, message):
        assert message == 'RESPONSE FROM SERVER....... {"session_id":"String","sample":0,"elapsed":15}'

    def test_on_open(self, ws):
        fake_message = {
            "session_id": "String",
            "destination": "127.0.0.1",
            "count": 2,
            "command": "LIVE_PING",
        }
        ws.send(json.dumps(fake_message))


c = Testws()
host = "ws://127.0.0.1:8000/"
ws = websocket.WebSocketApp(host, on_open=c.test_on_open, on_message=c.test_on_message, )
ws.run_forever()

I tried also using Monkeypatching, but I am confused and do not think it is the right way

import json

import websocket

from wsClient import synch_ws_client as cl
from websocket import WebSocketApp


def test_synch_ws_client(monkeypatch):
    def mock_on_message():
        print('RESPONSE FROM SERVER....... {"session_id":"String","sample":0,"elapsed":2}')
def mock_on_error():
    print('Error')

def mock_on_close():
    print('############## closed ###############')

def mock_on_open(ws):
    fake_massage = {
        "session_id": "String",
        "destination": "127.0.0.1",
        "count": 1,
        "command": "LIVE_PING"
    }
    ws.send(json.dumps(fake_massage))

monkeypatch.setattr(WebSocketApp, 'on_message', mock_on_message)
monkeypatch.setattr(WebSocketApp, 'on_close', mock_on_close)
monkeypatch.setattr(WebSocketApp, 'on_error', mock_on_error)
monkeypatch.setattr(WebSocketApp, 'on_open', mock_on_open)

assert cl.on_message() == 'RESPONSE FROM SERVER....... {"session_id":"String","sample":0,"elapsed":2}'

I do not know, how can I check for example the test_on_open and test_on_message modules are call successfully.

python-3.x

websocket

pytest

monkeypatching

0 Answers

Your Answer

Accepted video resources