Python Forum
Code runs perfectly and just stops with zero errors
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Code runs perfectly and just stops with zero errors
#1
Good afternoon all,

I've got a really odd issue and I simply don't know where it is, I've been working with pulling tickers from MongoDB and then piping that into an async def to then pull live ticker data.

To note, this code works perfectly, other than the fact when it gets to around 500 - 530, it stops. There are no errors that it shows, there are no errors in the IB gateway and I'm not running out of resources either as I checked both VMs.

import motor.motor_asyncio
import asyncio
import logging
from ib_async import IB, Stock
import asyncpg
import signal

# -------------------- Logging --------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# -------------------- MongoDB Fetch --------------------
async def fetch_tickers():
    client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://192.168.1.152:27017")
    db = client["tickers"]
    collection = db["nasdaq_tickers"]

    tickers = []
    async for doc in collection.find({}):
        tickers.append({
            "symbol": doc.get("symbol"),
            "currency": "USD",
            "exchange": "NASDAQ"
        })
    return tickers

# -------------------- PostgreSQL --------------------
async def setup_db():
    try:
        return await asyncpg.create_pool(
            user="raggy_sql",
            password='"',
            database="raggy_sql",
            host="192.168.1.152",
            port=5432,
            min_size=1,
            max_size=10,
            timeout=30
        )
    except Exception as e:
        logger.error(f"Failed to connect to DB: {e}")
        raise

async def insert_tick(pool, batch):
    if not batch:
        return
    try:
        async with pool.acquire() as conn:
            await conn.executemany("""
                INSERT INTO ib_realtime_bars(
                    symbol, marketDataType, mintick,
                    bid, bidsize, ask, asksize,
                    last, lastsize, volume, close, marketprice
                ) VALUES (
                    $1, $2, $3,
                    $4, $5, $6, $7,
                    $8, $9, $10, $11, $12
                )
            """, [
                (
                    d["symbol"], d["marketDataType"], d["minTick"],
                    d["bid"], d["bidSize"], d["ask"], d["askSize"],
                    d["last"], d["lastSize"], d["volume"], d["close"], d["marketPrice"]
                )
                for d in batch
            ])
    except Exception as e:
        logger.error(f"Failed batch insert: {e}")

# -------------------- Tick Processor --------------------
async def tick_worker(queue, pool):
    while True:
        try:
            tick = await queue.get()
            await insert_tick(pool, [tick])  # wrap in list for executemany
            queue.task_done()
        except asyncio.CancelledError:
            break


# -------------------- IB Data --------------------
MAX_CONCURRENT_TICKERS = 1  # lowered to 1 per second

async def process_ticker_batch(pool, ib, contracts, queue):
    ticker_objs = [ib.reqMktData(c) for c in contracts]
    await asyncio.sleep(1)

    for contract, ticker in zip(contracts, ticker_objs):
        if ticker.last is not None:
            tick_data = {
                "symbol": contract.symbol,
                "marketDataType": getattr(ticker, "marketDataType", None),
                "minTick": getattr(ticker, "minTick", None),
                "bid": getattr(ticker, "bid", None),
                "bidSize": getattr(ticker, "bidSize", None),
                "ask": getattr(ticker, "ask", None),
                "askSize": getattr(ticker, "askSize", None),
                "last": ticker.last,
                "lastSize": getattr(ticker, "lastSize", None),
                "volume": getattr(ticker, "volume", None),
                "close": ticker.last,
                "marketPrice": ticker.last,
            }
            await queue.put(tick_data)
    # cancel market data to rotate cleanly
    for c in contracts:
        ib.cancelMktData(c)

# -------------------- Main --------------------
async def main():
    ib = IB()
    pool = await setup_db()
    tick_queue = asyncio.Queue()

    # Start worker
    worker_task = asyncio.create_task(tick_worker(tick_queue, pool))

    # Graceful shutdown
    stop_event = asyncio.Event()
    def shutdown():
        stop_event.set()
    for sig in (signal.SIGINT, signal.SIGTERM):
        signal.signal(sig, lambda s, f: shutdown())

    # Connect IB
    while True:
        try:
            await ib.connectAsync("127.0.0.1", 4002, clientId=1)
            logger.info("Connected to IB API")
            break
        except Exception as e:
            logger.warning(f"IB connect failed: {e}, retrying in 5s...")
            await asyncio.sleep(5)

    # Fetch tickers from MongoDB
    logger.info("Fetching tickers from MongoDB...")
    tickers = await fetch_tickers()
    logger.info(f"Fetched {len(tickers)} tickers. Example: {tickers[:]}")

    # Create contracts
    contracts = [Stock(t["symbol"], t["exchange"], t["currency"]) for t in tickers]
    contracts = await ib.qualifyContractsAsync(*contracts)
    logger.info(f"Qualified {len(contracts)} contracts")

    # Rotating batch loop
    async def rotating_batches():
        while not stop_event.is_set():
            for i in range(0, len(contracts), MAX_CONCURRENT_TICKERS):
                batch = contracts[i:i + MAX_CONCURRENT_TICKERS]
                await process_ticker_batch(pool, ib, batch, tick_queue)
                await asyncio.sleep(1)

    rotator_task = asyncio.create_task(rotating_batches())

    # Wait for shutdown
    await stop_event.wait()
    logger.info("Shutting down...")
    rotator_task.cancel()
    await asyncio.gather(rotator_task, return_exceptions=True)
    worker_task.cancel()
    await asyncio.gather(worker_task, return_exceptions=True)
    await pool.close()
    ib.disconnect()
    logger.info("Disconnected from IB and DB")

if __name__ == "__main__":
    asyncio.run(main())
I then discussed it with IB, It thought that maybe I can't pull too many tickers, not the case, I then asked if I needed to wait for the closure of the ticker request, nope not that either. They said as long as it was only 50 per second, it wouldn't matter. So I thought ok I'll set my concurrents to 1, this does take affect and took a lot longer but none the less, 538 and boom it stopped.

It stops at 520+ or so on any batch number, I set it to 1 second on the delay and 50 on the tickers and it again, hit 5** odd and again just stopped. I also did a much longer sleep and only 40 tickers and that failed too.

To note, I checked nasdaq_tickers and there are 1300 objects in there so it's not that :(

The code is running on a windows 11 VM, mongo, postgres is all on ubuntu 24.04. Please ignore my terrible authentication methods, I'll be getting vault in soon :)

Thanks
Comps
Reply
#2
Please delete this, I found my issue.

Thanks
Comps
Reply
#3
(Nov-05-2025, 03:45 PM)compuman145 Wrote: Please delete this, I found my issue.

Thanks
Comps
That's good. But for other people who may face a similar problem sometime in the future it would be very kind from you if you briefly would describe what was your issue and how you fixed it.

Regards, noisefloor
Larz60+ and buran like this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  code not running even without errors Azdaghost 2 1,080 Apr-25-2025, 07:35 PM
Last Post: Azdaghost
  problem program runs in a loop jasserin 0 1,363 May-18-2024, 03:07 PM
Last Post: jasserin
  pip stops waiting for python walker 6 4,386 Nov-28-2023, 06:55 PM
Last Post: walker
  Are there errors in the code for my coin toss systems? Matlibplot is too perfect . Coolkat 0 1,228 Nov-13-2023, 11:54 AM
Last Post: Coolkat
  [SOLVED] [Linux] Script in cron stops after first run in loop Winfried 2 2,173 Nov-16-2022, 07:58 PM
Last Post: Winfried
  Another program runs bho68 7 3,423 Nov-08-2022, 08:16 PM
Last Post: bho68
  My code won't say Player wins or computer wins. No errors. its_a_meLuigi64 2 2,883 Dec-01-2021, 04:43 PM
Last Post: its_a_meLuigi64
  Know when the pyttsx3 engine stops talking UsualCoder 3 8,192 Aug-29-2021, 11:08 PM
Last Post: snippsat
  Rmarkdown opened by python code - errors Rav013 0 3,123 Apr-27-2021, 03:13 PM
Last Post: Rav013
  [split] Kera Getting errors when following code example Image classification from scratch hobbyist 3 6,414 Apr-13-2021, 01:26 PM
Last Post: amirian

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020