1 year ago
#373324
SSAH
How to write pytest for Websocket Client endpoint
I used WebsocketApp to create a python Websocket client(synch) which interacts with Websocket server. Now, I am interested in integrating the Websocket client in end to end test. I tried to write a Pytest for Websocket client, but because I am a beginner with Pytest, so I blocked and I do not know how should I continue it.
this is my code:
import json
import websocket
no_message = 0
count = 1
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("############## closed ###############")
def on_message(ws, message):
try:
if message is not None:
print("RESPONSE FROM SERVER.......", message)
global no_message
no_message += 1
if no_message >= count:
ws.close()
if message is None:
ws.close()
except:
print("error")
ws.on_error()
def on_open(ws):
global count
message = {
"session_id": "String",
"destination": "127.0.0.1",
"count": count,
"command": "LIVE_PING",
}
count = message.get("count")
ws.send(json.dumps(message))
if __name__ == "__main__":
websocket.enableTrace(True)
host = "ws://127.0.0.1:8000/"
ws = websocket.WebSocketApp(
host,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws.run_forever()
and this my pytest for:
import json
import unittest.mock as mock
import websocket
class Testws():
def test_on_close(self):
print("Close")
def test_on_error(self):
print('ERROR')
def test_on_message(self, message):
assert message == 'RESPONSE FROM SERVER....... {"session_id":"String","sample":0,"elapsed":15}'
def test_on_open(self, ws):
fake_message = {
"session_id": "String",
"destination": "127.0.0.1",
"count": 2,
"command": "LIVE_PING",
}
ws.send(json.dumps(fake_message))
c = Testws()
host = "ws://127.0.0.1:8000/"
ws = websocket.WebSocketApp(host, on_open=c.test_on_open, on_message=c.test_on_message, )
ws.run_forever()
I tried also using Monkeypatching, but I am confused and do not think it is the right way
import json
import websocket
from wsClient import synch_ws_client as cl
from websocket import WebSocketApp
def test_synch_ws_client(monkeypatch):
def mock_on_message():
print('RESPONSE FROM SERVER....... {"session_id":"String","sample":0,"elapsed":2}')
def mock_on_error():
print('Error')
def mock_on_close():
print('############## closed ###############')
def mock_on_open(ws):
fake_massage = {
"session_id": "String",
"destination": "127.0.0.1",
"count": 1,
"command": "LIVE_PING"
}
ws.send(json.dumps(fake_massage))
monkeypatch.setattr(WebSocketApp, 'on_message', mock_on_message)
monkeypatch.setattr(WebSocketApp, 'on_close', mock_on_close)
monkeypatch.setattr(WebSocketApp, 'on_error', mock_on_error)
monkeypatch.setattr(WebSocketApp, 'on_open', mock_on_open)
assert cl.on_message() == 'RESPONSE FROM SERVER....... {"session_id":"String","sample":0,"elapsed":2}'
I do not know, how can I check for example the test_on_open and test_on_message modules are call successfully.
python-3.x
websocket
pytest
monkeypatching
0 Answers
Your Answer