Python Forum

Full Version: Python and Pika stability issues
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
I am using Python 2.7 and the Pika library to publish messages to a RabbitMQ server. The script below will read the last line of a text file every 30 seconds, then publish the last line to the rabbitmq server.

My problem is that the internet connection is very unstable and has a high latency (600ms - 800ms). The code will run fine for a couple minutes, but then it starts throwing different exceptions. The most recent exception is:

ERROR:pika.adapters.base_connection:Connection to 61.21.223.233:5672 failed: timeout
WARNING:pika.connection:Could not connect, 2 attempts left

Traceback (most recent call last):
  File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 50, in <module>
    dmsPublish(csample)
  File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 27, in dmsPublish
    connection = pika.BlockingConnection(params) # Connect to CloudAMQP
  File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 339, in __init__
    self._process_io_for_connection_setup()
  File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 374, in _process_io_for_connection_setup
    self._open_error_result.is_ready)
  File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 410, in _flush_output
    self._impl.ioloop.poll()
  File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 400, in poll
    self.get_next_deadline())
error: (10022, 'An invalid argument was supplied')
To get around all these random exceptions, I used a catch all exception, and it seems to be working (see code below). I know that isnt right however, and am wondering if anyone has any experience or thoughts as to how to make this more reliable.

The idea behind this software is as follows: I have a piece of industrial hardware in a remote location that outputs a string of values once per second to a specific TCP port. I then run TeraTerm and connect to this port, and then log/dump the data to a text file. Finally, I run this python script to read the last line every 30 seconds, then upload it to rabbitmq. With the highly unstable internet connection, I dont need every 30th sample. In other words, if a message fails, the software does not need to re-try. It can move on to the next message 30 seconds later.

Anyone have any suggestions?

 import os
import csv
import time
import pika
import logging
import datetime

logging.basicConfig()

def getLastFile(filename):
    distance = 1024
    with open(filename,'rb') as f:
        lastline = f.readlines()[-1]
    return lastline

def dmsPublish(message):

    url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:[email protected]/%2f?connection_attempts=3&heartbeat_interval=3600')
    params = pika.URLParameters(url)
    params.socket_timeout = 5
    params.connection_attempts = 3
    params.retry_delay = 3
    print time.ctime() + " [x] Connecting to RabbitMQ server"
    connection = pika.BlockingConnection(params) # Connect to CloudAMQP
    channel = connection.channel() # start a channel
    channel.queue_declare(queue='LastLine') # Declare a queue    
    channel.basic_publish(exchange='', routing_key='LastLine', body=message)
    print time.ctime() + " [x] Message sent to rabbitMQ"
    connection.close()


loop = 1
while True:
        lastline = getLastFile("teraterm.log")
        sampletime = datetime.datetime.utcnow().isoformat() + ","
        csample = sampletime + lastline
        print " [x] Most Recent Sample: " + csample
        try:
            dmsPublish(csample)
        except:
            print " [!] Error... trying again."
        print time.ctime() + " [x] Sleeping 30 seconds ....."
        time.sleep(30)