Jun-06-2017, 11:46 AM
I am using Python 2.7 and the Pika library to publish messages to a RabbitMQ server. The script below will read the last line of a text file every 30 seconds, then publish the last line to the rabbitmq server.
My problem is that the internet connection is very unstable and has a high latency (600ms - 800ms). The code will run fine for a couple minutes, but then it starts throwing different exceptions. The most recent exception is:
The idea behind this software is as follows: I have a piece of industrial hardware in a remote location that outputs a string of values once per second to a specific TCP port. I then run TeraTerm and connect to this port, and then log/dump the data to a text file. Finally, I run this python script to read the last line every 30 seconds, then upload it to rabbitmq. With the highly unstable internet connection, I dont need every 30th sample. In other words, if a message fails, the software does not need to re-try. It can move on to the next message 30 seconds later.
Anyone have any suggestions?
My problem is that the internet connection is very unstable and has a high latency (600ms - 800ms). The code will run fine for a couple minutes, but then it starts throwing different exceptions. The most recent exception is:
ERROR:pika.adapters.base_connection:Connection to 61.21.223.233:5672 failed: timeout
WARNING:pika.connection:Could not connect, 2 attempts left
Traceback (most recent call last):
File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 50, in <module>
dmsPublish(csample)
File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 27, in dmsPublish
connection = pika.BlockingConnection(params) # Connect to CloudAMQP
File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 339, in __init__
self._process_io_for_connection_setup()
File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 374, in _process_io_for_connection_setup
self._open_error_result.is_ready)
File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 410, in _flush_output
self._impl.ioloop.poll()
File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 400, in poll
self.get_next_deadline())
error: (10022, 'An invalid argument was supplied')To get around all these random exceptions, I used a catch all exception, and it seems to be working (see code below). I know that isnt right however, and am wondering if anyone has any experience or thoughts as to how to make this more reliable. The idea behind this software is as follows: I have a piece of industrial hardware in a remote location that outputs a string of values once per second to a specific TCP port. I then run TeraTerm and connect to this port, and then log/dump the data to a text file. Finally, I run this python script to read the last line every 30 seconds, then upload it to rabbitmq. With the highly unstable internet connection, I dont need every 30th sample. In other words, if a message fails, the software does not need to re-try. It can move on to the next message 30 seconds later.
Anyone have any suggestions?
import os
import csv
import time
import pika
import logging
import datetime
logging.basicConfig()
def getLastFile(filename):
distance = 1024
with open(filename,'rb') as f:
lastline = f.readlines()[-1]
return lastline
def dmsPublish(message):
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:[email protected]/%2f?connection_attempts=3&heartbeat_interval=3600')
params = pika.URLParameters(url)
params.socket_timeout = 5
params.connection_attempts = 3
params.retry_delay = 3
print time.ctime() + " [x] Connecting to RabbitMQ server"
connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue='LastLine') # Declare a queue
channel.basic_publish(exchange='', routing_key='LastLine', body=message)
print time.ctime() + " [x] Message sent to rabbitMQ"
connection.close()
loop = 1
while True:
lastline = getLastFile("teraterm.log")
sampletime = datetime.datetime.utcnow().isoformat() + ","
csample = sampletime + lastline
print " [x] Most Recent Sample: " + csample
try:
dmsPublish(csample)
except:
print " [!] Error... trying again."
print time.ctime() + " [x] Sleeping 30 seconds ....."
time.sleep(30)
