Oct-25-2022, 09:40 PM
Hi,
Unfortunately, my experience in websocket programming is not yet great.
I am trying to migrate my current simplewebsocket server to the sockets module. In addition, asyncio should also be implemented.
Now I get one error message after the other and can't manage to debug my script.
Could someone please help me to get the script running?
Latest: Websocketserver.py
The old version:
Unfortunately, my experience in websocket programming is not yet great.
I am trying to migrate my current simplewebsocket server to the sockets module. In addition, asyncio should also be implemented.
Now I get one error message after the other and can't manage to debug my script.
Error:Traceback (most recent call last):
File "websocketserver5_asyncio.py", line 407, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "websocketserver5_asyncio.py", line 384, in main
async with websockets.serve("192.168.0.24", PORTNUM, functools.partial(Websocket(), client)):
TypeError: __init__() missing 1 required positional argument: 'client'According to the documentation, it should actually be implemented correctly. -> https://websockets.readthedocs.io/en/stable/howto/faq.htmlCould someone please help me to get the script running?
Latest: Websocketserver.py
#!/usr/bin/env python
import asyncio
import json
import os
import secrets
import signal
import websockets
from websockets import connect
import sys
import shutil
import logging
import time
import sqlite3
from sqlite3 import Error
import functools
from functools import partial
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
logger = logging.getLogger(__name__)
# Modbus Config
UNIT = 0x1
# Adresses to Hardware
HOST = '192.168.0.24'
#HOST = '192.168.0.50'
# Portnumber
PORT = 5020
#PORT = 502
# Coil Output Base Address
COILBASE = 512
# Websocket Portnummer listen
PORTNUM = 8001
class Websocket():
def __init__(self, client): #, server, sock, address
try:
super(Websocket, self).__init__(server, sock, address)
self.modbus = client
self.functions = {
"relais1": partial(self.simple_switch, COILBASE + 16),
"relais2": partial(self.simple_switch, COILBASE + 1),
"relais3": partial(self.simple_switch, COILBASE + 2),
"set_temp1": partial(self.set_temperature1, 524),
"temperatur1": partial(self.read_temperature, 2),
"temperatur2": partial(self.read_temperature, 3),
"heizen1": partial(self.heating, 524),
"heizen2": partial(self.heating, 525),
"heatread": partial(self.simple_read, 524),
"systemp": self.systemp
}
except Exception as exception:
logger.exception("constructor")
raise
def __await__(self):
# see: http://stackoverflow.com/a/33420721/1113207
return self._async_init().__await__()
async def _async_init(self):
self._conn = connect("wss://echo.websocket.org")
self.websocket = await self._conn.__aenter__()
return self
def simple_switch(self, address, value):
""" value can be one of "on", "off" or "get" """
if value in ['on', 'off']:
rq = self.modbus.write_coil(address, value == 'on', unit=UNIT)
time.sleep(0.01)
print("rq = self.modbus.write_coil(address, value == on, unit=UNIT)")
elif value != 'get':
raise ValueError(value)
# Relaisregister reading
rp = self.modbus.read_coils(address, unit=UNIT)
time.sleep(0.01)
return "on" if rp.bits[0] else "off"
async def set_temperature1(self, address, value):
print("set_temperature1 was triggerd")
# Loop for temperature controll
print("database_reading")
print(self.database_reading()[1][0])
if (self.database_reading()[0][1]) == 524 and (self.database_reading()[1][1]) == 'on':
print("DB-Reading from set_temperature1 'heizen1': ", self.database_reading()[1][1])
while True:
print("Loop!!!!!!!")
rp = self.modbus.read_coils(address, unit=UNIT)
print(rp.bits[0])
time.sleep(0.01)
if rp.bits[0] == False:
rq = self.modbus.write_coil(address, value == "on", unit=UNIT)
time.sleep(0.01)
print("Heating is active!!!!")
print(address)
time.sleep(0.01)
print(type(self.read_temperature( address == 2, 'get')))
print(type(value))
if float(self.read_temperature( address == 2, 'get')) >= float(value):
print("Heating is on!!!!!!!")
time.sleep(0.01)
print(address)
rq = self.modbus.write_coil(address, value == "off", unit=UNIT)
time.sleep(0.01)
print("Heating Temperture is achieved! -> deactivate heating")
if (self.database_reading()[0][1]) == 524 and (self.database_reading()[1][1]) == 'off':
break
await asyncio.sleep(1)
# Breake müsste bei Heizen1 aus kommen, nicht wenn nur die Soll Temp erreicht wurde
# return NULL
def database_reading(self):
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Database state request | If heating button was pressed
sqliteConnection = sqlite3.connect("state")
sqliteConnection.row_factory = dict_factory
zeiger = sqliteConnection.cursor()
zeiger.execute("SELECT * FROM heating_relais_state")
# zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state")
row = zeiger.fetchone()
rowDict = dict(zip([c[0] for c in zeiger.description], row))
# row = json.dumps(row)
# row1 = json.loads(row)
print(type(row))
print("row1")
# print(row1)
# row1 = {}
# print(row[0])
# Convert a dict Array into List[]
result = row.items()
row1 = list(result)
print(row1)
# print(list(row.keya())[1])
# print(list(row.items())[1])
# print(list(row.values())[1])
print(row1[1][1])
# self.sendMessage(json.dumps(row))
# row = zeiger.fetchone()
# print("row2")
# print(row)
# self.sendMessage(json.dumps(row))
# print(rowDict)
# inhalt = zeiger.fetchall()
sqliteConnection.commit()
if sqliteConnection:
sqliteConnection.close()
print("The SQLite connection is closed")
# return json.dumps(row)
return row1
def simple_read(self, address, value):
try:
if value != 'get':
raise ValueError(value)
# Relaisregister reading | If heating wire is physically on
rp = self.modbus.read_coils(address, unit=UNIT)
time.sleep(0.01)
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Database state request | If heating button was pressed
sqliteConnection = sqlite3.connect("state")
sqliteConnection.row_factory = dict_factory
zeiger = sqliteConnection.cursor()
zeiger.execute("SELECT * FROM heating_relais_state")
# zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state")
row = zeiger.fetchone()
rowDict = dict(zip([c[0] for c in zeiger.description], row))
print("row12")
print(row)
self.sendMessage(json.dumps(row))
# Jump to the next row
row = zeiger.fetchone()
self.sendMessage(json.dumps(row))
# print(rowDict)
# inhalt = zeiger.fetchall()
sqliteConnection.commit()
if sqliteConnection:
sqliteConnection.close()
print("The SQLite connection is closed")
# json_object = json.dumps(inhalt)
# print(json_object)
time.sleep(0.01)
return "on" if rp.bits[0] else "off"
except AttributeError:
pass
# Send temperature data to client
def read_temperature(self, address, value):
if value != "get":
raise ValueError(value)
response = self.modbus.read_holding_registers(0x00, 8, unit=UNIT)
time.sleep(0.01)
t = response.registers[address]
time.sleep(0.01)
z = t/10
return z
# return response.registers[address]
# return response.registers[address]
# Save heating_state in database
# Get the heating_state with 'get'
def heating(self, address, value):
# Heizen an | Modul an dem der Heizdraht geschaltet wird
if value in ["on", "off"]:
# rq = self.modbus.write_coil(524, value == "on", unit=UNIT)
# time.sleep(0.05)
# rq = self.modbus.write_coil(address, value == "on", unit=UNIT)
# time.sleep(0.01)
# Database Handling | Heating_State = true does not mean Relaise_State = True | this depends on the temperature
sqliteConnection = sqlite3.connect("state")
zeiger = sqliteConnection.cursor()
sql_anweisung = ("""CREATE TABLE IF NOT EXISTS heating_relais_state (
heating_relais_address INTEGER,
state TEXT)""")
zeiger.execute(sql_anweisung)
# print(address)
# zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (address, value))
# zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (524, value))
# zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (525, value))
# zeiger.execute(sql_anweisung)
# if value == "on":
zeiger.execute("UPDATE heating_relais_state SET state =? WHERE heating_relais_address =?", (value, address))
# zeiger.execute(sql_anweisung)
zeiger.execute("SELECT * FROM heating_relais_state")
inhalt = zeiger.fetchall()
for inhalt in inhalt:
print(inhalt)
sqliteConnection.commit()
if sqliteConnection:
sqliteConnection.close()
print("The SQLite connection is closed")
elif value != 'get':
raise ValueError(value)
rp = self.modbus.read_coils(address, unit=UNIT)
time.sleep(0.01)
return "on" if rp.bits[0] else "off"
def systemp(self, value):
# result = subprocess.run(["vcgencmd", "measure_temp"], stdout=subprocess.PIPE)
# temperature = result.stdout.partition("temp=")[-1].partition("'C\n")[0]
res = os.popen("vcgencmd measure_temp").readline()
temp = (res.replace("temp=","").replace("'C\n",""))
time.sleep(0.01)
return temp
async def receive(self):
try:
# print("Echoing '%s'" % self.data)
# print("Echoing '%s'" % json.loads(self.data))
message = await self.websocket.recv()
commands = json.loads(message)
new_state = {}
print("Echoing '%s'" % commands)
# print(commands['relais1'])
for key, value in commands.items():
function = self.functions[key]
print("function '%s'" % function)
new_state[key] = function(value)
print("new_state '%s'" % new_state)
print("key '%s'" % key)
print("value '%s'" % value)
print(json.dumps(new_state))
#self.sendMessage(json.dumps(new_state))
await self.websocket.send(json.dumps(new_state))
print("Echoing '%s'" % commands)
# print(commands['relais1'])
# self.sendMessage(json.dumps(commands))
# sampleDict = {'name': 'John', 'age': 30, 'data': 'New York'}
# jsonData = json.dumps(sampleDict)
# self.sendMessage(json.dumps(sampleDict))
except Exception as exception:
logger.exception("handle message")
raise
return await self.websocket.recv()
async def sendMessage(self, message):
await self.websocket.send(message)
async def close(self):
await self._conn.__aexit__(*sys.exc_info())
async def main():
logging.basicConfig()
# echo = await Websocket()
with ModbusClient(host=HOST, port=PORT) as client:
client.connect()
time.sleep(0.025)
print("Websocket server on port %s" % PORTNUM)
# server = SimpleWebSocketServer('', PORTNUM, partial(Echo, client))
# server = await websockets.serve('', PORTNUM, partial(Echo, client))
# async with websockets.serve('', partial(Websocket, client), PORTNUM)
# await asyncio.Future() # run forever
# create_protocol = functools.partial(Websocket(), client)
# async with websockets.serve('', "192.168.0.24", PORTNUM, create_protocol):
async with websockets.serve("192.168.0.24", PORTNUM, functools.partial(Websocket(), client)):
#async with websockets.serve(handler, "", 8001):
await asyncio.Future() # run forever
if __name__ == '__main__':
# main()
asyncio.run(main())The old version:
import signal, sys, json
import os
import shutil
import logging
import time
import sqlite3
from sqlite3 import Error
from functools import partial
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from SimpleWebSocketServer import WebSocket, SimpleWebSocketServer
logger = logging.getLogger(__name__)
# Adressen
UNIT = 0x1
HOST = '192.168.0.24'
#HOST = '192.168.0.50'
PORT = 5020
#PORT = 502
# Coil Output Base Address
COILBASE = 512
# Portnummer listen
PORTNUM = 8001
# Websocket class to echo received data
class Echo(WebSocket):
def __init__(self, client, server, sock, address):
try:
super(Echo, self).__init__(server, sock, address)
self.modbus = client
self.functions = {
"relais1": partial(self.simple_switch, COILBASE + 16),
"relais2": partial(self.simple_switch, COILBASE + 1),
"relais3": partial(self.simple_switch, COILBASE + 2),
"set_temp1": partial(self.set_temperature1, 524),
"temperatur1": partial(self.read_temperature, 2),
"temperatur2": partial(self.read_temperature, 3),
"heizen1": partial(self.heating, 524),
"heizen2": partial(self.heating, 525),
"heatread": partial(self.simple_read, 524),
"systemp": self.systemp
}
except Exception as exception:
logger.exception("constructor")
raise
def simple_switch(self, address, value):
""" value can be one of "on", "off" or "get" """
if value in ['on', 'off']:
rq = self.modbus.write_coil(address, value == 'on', unit=UNIT)
time.sleep(0.01)
print("rq = self.modbus.write_coil(address, value == on, unit=UNIT)")
elif value != 'get':
raise ValueError(value)
# Relaisregister reading
rp = self.modbus.read_coils(address, unit=UNIT)
time.sleep(0.01)
return "on" if rp.bits[0] else "off"
def set_temperature1(self, address, value):
print("set_temperature1 was triggerd")
# Loop for temperature controll
print("database_reading")
print(self.database_reading()[1][0])
if (self.database_reading()[0][1]) == 524 and (self.database_reading()[1][1]) == 'on':
print("DB-Reading from set_temperature1 'heizen1': ", self.database_reading()[1][1])
while True:
print("Loop!!!!!!!")
rp = self.modbus.read_coils(address, unit=UNIT)
print(rp.bits[0])
time.sleep(0.01)
if rp.bits[0] == False:
rq = self.modbus.write_coil(address, value == "on", unit=UNIT)
time.sleep(0.01)
print("Heating is active!!!!")
print(address)
time.sleep(0.01)
print(type(self.read_temperature( address == 2, 'get')))
print(type(value))
if float(self.read_temperature( address == 2, 'get')) >= float(value):
print("Heating is on!!!!!!!")
time.sleep(0.01)
print(address)
rq = self.modbus.write_coil(address, value == "off", unit=UNIT)
time.sleep(0.01)
print("Heating Temperture is achieved! -> deactivate heating")
break
# Breake müsste bei Heizen1 aus kommen, nicht wenn nur die Soll Temp erreicht wurde
# return NULL
def database_reading(self):
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Database state request | If heating button was pressed
sqliteConnection = sqlite3.connect("state")
sqliteConnection.row_factory = dict_factory
zeiger = sqliteConnection.cursor()
zeiger.execute("SELECT * FROM heating_relais_state")
# zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state")
row = zeiger.fetchone()
rowDict = dict(zip([c[0] for c in zeiger.description], row))
# row = json.dumps(row)
# row1 = json.loads(row)
print(type(row))
print("row1")
# print(row1)
# row1 = {}
# print(row[0])
# Convert a dict Array into List[]
result = row.items()
row1 = list(result)
print(row1)
# print(list(row.keya())[1])
# print(list(row.items())[1])
# print(list(row.values())[1])
print(row1[1][1])
# self.sendMessage(json.dumps(row))
# row = zeiger.fetchone()
# print("row2")
# print(row)
# self.sendMessage(json.dumps(row))
# print(rowDict)
# inhalt = zeiger.fetchall()
sqliteConnection.commit()
if sqliteConnection:
sqliteConnection.close()
print("The SQLite connection is closed")
# return json.dumps(row)
return row1
def simple_read(self, address, value):
try:
if value != 'get':
raise ValueError(value)
# Relaisregister reading | If heating wire is physically on
rp = self.modbus.read_coils(address, unit=UNIT)
time.sleep(0.01)
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Database state request | If heating button was pressed
sqliteConnection = sqlite3.connect("state")
sqliteConnection.row_factory = dict_factory
zeiger = sqliteConnection.cursor()
zeiger.execute("SELECT * FROM heating_relais_state")
# zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state")
row = zeiger.fetchone()
rowDict = dict(zip([c[0] for c in zeiger.description], row))
print("row12")
print(row)
self.sendMessage(json.dumps(row))
# Jump to the next row
row = zeiger.fetchone()
self.sendMessage(json.dumps(row))
# print(rowDict)
# inhalt = zeiger.fetchall()
sqliteConnection.commit()
if sqliteConnection:
sqliteConnection.close()
print("The SQLite connection is closed")
# json_object = json.dumps(inhalt)
# print(json_object)
time.sleep(0.01)
return "on" if rp.bits[0] else "off"
except AttributeError:
pass
# Send temperature data to client
def read_temperature(self, address, value):
if value != "get":
raise ValueError(value)
response = self.modbus.read_holding_registers(0x00, 8, unit=UNIT)
time.sleep(0.01)
t = response.registers[address]
time.sleep(0.01)
z = t/10
return z
# return response.registers[address]
# return response.registers[address]
# Save heating_state in database
# Get the heating_state with 'get'
def heating(self, address, value):
# Heizen an | Modul an dem der Heizdraht geschaltet wird
if value in ["on", "off"]:
# rq = self.modbus.write_coil(524, value == "on", unit=UNIT)
# time.sleep(0.05)
# rq = self.modbus.write_coil(address, value == "on", unit=UNIT)
# time.sleep(0.01)
# Database Handling | Heating_State = true does not mean Relaise_State = True | this depends on the temperature
sqliteConnection = sqlite3.connect("state")
zeiger = sqliteConnection.cursor()
sql_anweisung = ("""CREATE TABLE IF NOT EXISTS heating_relais_state (
heating_relais_address INTEGER,
state TEXT)""")
zeiger.execute(sql_anweisung)
# print(address)
# zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (address, value))
# zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (524, value))
# zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (525, value))
# zeiger.execute(sql_anweisung)
# if value == "on":
zeiger.execute("UPDATE heating_relais_state SET state =? WHERE heating_relais_address =?", (value, address))
# zeiger.execute(sql_anweisung)
zeiger.execute("SELECT * FROM heating_relais_state")
inhalt = zeiger.fetchall()
for inhalt in inhalt:
print(inhalt)
sqliteConnection.commit()
if sqliteConnection:
sqliteConnection.close()
print("The SQLite connection is closed")
elif value != 'get':
raise ValueError(value)
rp = self.modbus.read_coils(address, unit=UNIT)
time.sleep(0.01)
return "on" if rp.bits[0] else "off"
def systemp(self, value):
# result = subprocess.run(["vcgencmd", "measure_temp"], stdout=subprocess.PIPE)
# temperature = result.stdout.partition("temp=")[-1].partition("'C\n")[0]
res = os.popen("vcgencmd measure_temp").readline()
temp = (res.replace("temp=","").replace("'C\n",""))
time.sleep(0.01)
return temp
def handleMessage(self):
try:
# print("Echoing '%s'" % self.data)
# print("Echoing '%s'" % json.loads(self.data))
commands = json.loads(self.data)
new_state = {}
print("Echoing '%s'" % commands)
# print(commands['relais1'])
for key, value in commands.items():
function = self.functions[key]
print("function '%s'" % function)
new_state[key] = function(value)
print("new_state '%s'" % new_state)
print("key '%s'" % key)
print("value '%s'" % value)
print(json.dumps(new_state))
self.sendMessage(json.dumps(new_state))
print("Echoing '%s'" % commands)
# print(commands['relais1'])
# self.sendMessage(json.dumps(commands))
# sampleDict = {'name': 'John', 'age': 30, 'data': 'New York'}
# jsonData = json.dumps(sampleDict)
# self.sendMessage(json.dumps(sampleDict))
except Exception as exception:
logger.exception("handle message")
raise
def handleConnected(self):
print("Connected")
def handleClose(self):
print("Disconnected")
def main():
logging.basicConfig()
with ModbusClient(host=HOST, port=PORT) as client:
client.connect()
time.sleep(0.02)
print("Websocket server on port %s" % PORTNUM)
# server = SimpleWebSocketServer('', PORTNUM, Echo)
server = SimpleWebSocketServer('', PORTNUM, partial(Echo, client))
try:
server.serveforever()
finally:
server.close()
if __name__ == "__main__":
main()
