Hello All!
My English can be badly, sorry in advance
OS : Windows && ArchLinux
Python version 3.7.2
Task: Need do control by program use HttpServer and don't interfere with program work
Short about code: Has run_mind():
- it get options from DB (example: run task in time, etc)
- next running HttpServer for control this program using GET
- next do infinity loop (using while), where doit any tasks using asyncio
Trable: On the running HttpServer, next executing is stopping on "await asyncio.sleep(1)" in loop "while". I think, what HttpServer - using listening port Freezing, and don't give to running any tasks
Lost sleep for 3 days. Say me please, how can task solve, how do it: Listen HttpServer (or other alternative variants, example: WebSocket, Socket, JsonServer, any..) AND doing Task in while?
Thx!!!
Source:
My English can be badly, sorry in advance
OS : Windows && ArchLinux
Python version 3.7.2
Task: Need do control by program use HttpServer and don't interfere with program work
Short about code: Has run_mind():
- it get options from DB (example: run task in time, etc)
- next running HttpServer for control this program using GET
- next do infinity loop (using while), where doit any tasks using asyncio
Trable: On the running HttpServer, next executing is stopping on "await asyncio.sleep(1)" in loop "while". I think, what HttpServer - using listening port Freezing, and don't give to running any tasks
Lost sleep for 3 days. Say me please, how can task solve, how do it: Listen HttpServer (or other alternative variants, example: WebSocket, Socket, JsonServer, any..) AND doing Task in while?
Thx!!!
Source:
import _functions
import config
import asyncio
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process, Queue
import multiprocessing, logging
import nest_asyncio
nest_asyncio.apply()
class GlobVar:
options = dict()
from urllib.parse import urlparse
import urllib.parse
#import myurl
from http.server import BaseHTTPRequestHandler, HTTPServer # python3
class HandleRequests(BaseHTTPRequestHandler):
def _set_headers(self):
## self.send_response(200)
## self.send_header('Content-type', 'application/json')
## self.end_headers()
pass
def do_GET__old(self):
self._set_headers()
self.wfile.write("received get request")
def do_GET(self):
## self._set_headers()
## print(self.headers)
## content_len = int(self.headers.getheader('content-length', 0))
## post_body = self.rfile.read(content_len)
## print(post_body)
print(self.path)
#query = urlparse(self.path).query
#parsed_query = parse_qs(query)
#print(query)
#print(parsed_query)
self.send_response(200)
self.send_header('content-type','application/json')
self.end_headers()
if self.path == '/':
self.wfile.write(b"{ 0:false, 1:'Enter command', 'type':'error'}")
else:
if urllib.parse.urlparse(self.path).path == '/':
# Обработка здесь уже, что там пришло
q = urlparse(self.path)
print(q)
query = urllib.parse.urlparse(self.path).query
__arr = (dict(urllib.parse.parse_qsl(query)))
print(__arr)
GlobVar.options[4]['__running'] = 5
# print(myurl.parse(self.path))
self.wfile.write(b"{'hello world!'}")
self.wfile.write(b"{'hello world 2!'}")
self.wfile.write(bytes(self.path, 'utf-8') )
else:
self.wfile.write(b"{'hello...!'}")
def do_POST(self):
'''Reads post request body'''
self._set_headers()
content_len = int(self.headers.getheader('content-length', 0))
post_body = self.rfile.read(content_len)
self.wfile.write("received post request:<br>{}".format(post_body))
async def new_proc_wss(arr):
v = 7
if v == 1:
pass
elif v == 7:
print('start http...')
# Параллельно не работает, тормозит и пиздец
host = ''
port = 8002
await HTTPServer((host, port), HandleRequests).serve_forever()
def sub_loop(arr):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(new_proc_wss(arr))
# v 12 (down)
import asyncore
import socket
class EchoHandler(asyncore.dispatcher_with_send):
def handle_read(self):
data = self.recv(1024)
if data == "close":self.close()
self.send(data)
class EchoServer(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(11)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
print ('conn', addr)
handler = EchoHandler(sock)
# async def run():
async def run(executor):
print(' run mind()');
GlobVar.options = config.getOptions()
print('options:', GlobVar.options)
# Запуск автозапуска
# a_dict.keys()
for key in GlobVar.options:
arr = GlobVar.options[key]
if arr['mo_mode'] == 1:
if arr['mo_str'] == 'websocket':
#_websocket2.create_websocket()
#print('..1')
number = 5
#__name = '_websocket2'
__name = '_websocket2_async'
#if arr['mo_str_2'] != False:
# __name = arr['mo_str_2']
# default MODE run() v = 4
v = 3
if v == 0:
pass
elif v == 10:
# ERROR: Cannot run the event loop while another loop is running
## medicine - nest_asyncio.apply()
# NEW FUCK: не доходит до next
#
await asyncio.get_event_loop().run_in_executor(executor, sub_loop(arr))
print('next...')
elif v == 11:
# MODE run() v = 5
# Стопарит дальнейшее выполнение и не доходит до finished
task_4 = new_proc_wss(arr)
# tasks_list = [task_1, task_2, task_3, task_4]
tasks_list = [task_4]
finished, unfinished = await asyncio.wait(tasks_list, loop=executor, return_when=asyncio.ALL_COMPLETED)
print(finished, unfinished)
for unfinished_task in unfinished:
unfinished_task.cancel()
print('finished:: ', finished)
elif v == 12:
# НЕ доходит до NEXT
host = ''
port = 2222
server = EchoServer(host, port)
await asyncore.loop()
print('next...')
elif v == 13:
pass
elif v == 1:
proc = multiprocessing.Process(target=_websocket2_async.create_websocket, name=__name, args=(number,))
#proc = Process(target=child, args=())
#print('..2')
procs.append(proc)
proc.daemon=True
print('..3')
#try:
proc.start()
#except OSError as err:
# print(err)
# m = multiprocessing.Manager()
print('..4')
#proc.join()
print('..5, proc = ', proc, ', p.pid = ', proc.pid, ', is_alive = ', proc.is_alive)
elif v == 3:
# Работает, но стопарит дальнейшее выполнение, на await asyncio.sleep(1)
print('load wss 8000')
# async
future = asyncio.ensure_future(new_proc_wss(arr))
def callback_wss(fut):
result = fut.result()
print('finish callback_wss')
#asyncio.ensure_future(new_proc(arr)).add_done_callback(callback)
print(111)
future.add_done_callback(callback_wss)
print(222)
elif v == 4:
# Запускается и закрывается процесс, но это новый процесс, общаться через межпроц взаимодействие - на самый крайний случай
print('v4')
proc = multiprocessing.Process(target=new_proc_wss, name=__name, args=(arr,))
#proc = Process(target=child, args=())
#print('..2')
#procs.append(proc)
#### proc.daemon=True
print('..3')
#try:
proc.start()
print('starting')
elif v == 5:
# Запускает хттп и тормозит на этом, до next не доходит...
await new_proc_wss(arr)
print('next...')
elif v == 6:
# Блокирует дальнейшее выполнеие asyncio, до next не доходит...
task = asyncio.create_task(new_proc_wss(arr))
print('next...')
elif v == 7:
# Блокирует дальнейшее выполнеие, до next не доходит...
results = await asyncio.gather(new_proc_wss(arr))
print(results)
print('next...')
elif v == 8:
# ERROR - Cannot run the event loop while another loop is running
loop = asyncio.new_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
elif v == 9:
# До NEXT не доходит
# https://docs.python.org/3.7/library/asyncio-eventloop.html#asyncio.Server.serve_forever
async def client_connected(reader, writer):
# Communicate with the client with
# reader/writer streams. For example:
await reader.readline()
srv = await asyncio.start_server(
client_connected, '127.0.0.1', '8003')
await srv.serve_forever()
print('next...')
# Запуск цикла
boom=1
while boom >0:
print('iteration1...')
await asyncio.sleep(1)
# time.sleep(1)
print('iteration2...')
# Запуск всякой лабуды в указанное время
def run_mind():
v = 4
if v == 1:
pass
elif v == 4:
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.run_forever()
elif v == 5:
# https://stackoverflow.com/questions/38193596/asyncio-multiprocessing-unix
executor = ProcessPoolExecutor()
# executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
asyncio.get_event_loop().run_until_complete(run(executor))
if __name__ == '__main__':
run_mind()
