Nov-05-2025, 03:08 PM
(This post was last modified: Nov-05-2025, 03:08 PM by compuman145.)
Good afternoon all,
I've got a really odd issue and I simply don't know where it is, I've been working with pulling tickers from MongoDB and then piping that into an async def to then pull live ticker data.
To note, this code works perfectly, other than the fact when it gets to around 500 - 530, it stops. There are no errors that it shows, there are no errors in the IB gateway and I'm not running out of resources either as I checked both VMs.
It stops at 520+ or so on any batch number, I set it to 1 second on the delay and 50 on the tickers and it again, hit 5** odd and again just stopped. I also did a much longer sleep and only 40 tickers and that failed too.
To note, I checked nasdaq_tickers and there are 1300 objects in there so it's not that :(
The code is running on a windows 11 VM, mongo, postgres is all on ubuntu 24.04. Please ignore my terrible authentication methods, I'll be getting vault in soon :)
Thanks
Comps
I've got a really odd issue and I simply don't know where it is, I've been working with pulling tickers from MongoDB and then piping that into an async def to then pull live ticker data.
To note, this code works perfectly, other than the fact when it gets to around 500 - 530, it stops. There are no errors that it shows, there are no errors in the IB gateway and I'm not running out of resources either as I checked both VMs.
import motor.motor_asyncio
import asyncio
import logging
from ib_async import IB, Stock
import asyncpg
import signal
# -------------------- Logging --------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# -------------------- MongoDB Fetch --------------------
async def fetch_tickers():
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://192.168.1.152:27017")
db = client["tickers"]
collection = db["nasdaq_tickers"]
tickers = []
async for doc in collection.find({}):
tickers.append({
"symbol": doc.get("symbol"),
"currency": "USD",
"exchange": "NASDAQ"
})
return tickers
# -------------------- PostgreSQL --------------------
async def setup_db():
try:
return await asyncpg.create_pool(
user="raggy_sql",
password='"',
database="raggy_sql",
host="192.168.1.152",
port=5432,
min_size=1,
max_size=10,
timeout=30
)
except Exception as e:
logger.error(f"Failed to connect to DB: {e}")
raise
async def insert_tick(pool, batch):
if not batch:
return
try:
async with pool.acquire() as conn:
await conn.executemany("""
INSERT INTO ib_realtime_bars(
symbol, marketDataType, mintick,
bid, bidsize, ask, asksize,
last, lastsize, volume, close, marketprice
) VALUES (
$1, $2, $3,
$4, $5, $6, $7,
$8, $9, $10, $11, $12
)
""", [
(
d["symbol"], d["marketDataType"], d["minTick"],
d["bid"], d["bidSize"], d["ask"], d["askSize"],
d["last"], d["lastSize"], d["volume"], d["close"], d["marketPrice"]
)
for d in batch
])
except Exception as e:
logger.error(f"Failed batch insert: {e}")
# -------------------- Tick Processor --------------------
async def tick_worker(queue, pool):
while True:
try:
tick = await queue.get()
await insert_tick(pool, [tick]) # wrap in list for executemany
queue.task_done()
except asyncio.CancelledError:
break
# -------------------- IB Data --------------------
MAX_CONCURRENT_TICKERS = 1 # lowered to 1 per second
async def process_ticker_batch(pool, ib, contracts, queue):
ticker_objs = [ib.reqMktData(c) for c in contracts]
await asyncio.sleep(1)
for contract, ticker in zip(contracts, ticker_objs):
if ticker.last is not None:
tick_data = {
"symbol": contract.symbol,
"marketDataType": getattr(ticker, "marketDataType", None),
"minTick": getattr(ticker, "minTick", None),
"bid": getattr(ticker, "bid", None),
"bidSize": getattr(ticker, "bidSize", None),
"ask": getattr(ticker, "ask", None),
"askSize": getattr(ticker, "askSize", None),
"last": ticker.last,
"lastSize": getattr(ticker, "lastSize", None),
"volume": getattr(ticker, "volume", None),
"close": ticker.last,
"marketPrice": ticker.last,
}
await queue.put(tick_data)
# cancel market data to rotate cleanly
for c in contracts:
ib.cancelMktData(c)
# -------------------- Main --------------------
async def main():
ib = IB()
pool = await setup_db()
tick_queue = asyncio.Queue()
# Start worker
worker_task = asyncio.create_task(tick_worker(tick_queue, pool))
# Graceful shutdown
stop_event = asyncio.Event()
def shutdown():
stop_event.set()
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, lambda s, f: shutdown())
# Connect IB
while True:
try:
await ib.connectAsync("127.0.0.1", 4002, clientId=1)
logger.info("Connected to IB API")
break
except Exception as e:
logger.warning(f"IB connect failed: {e}, retrying in 5s...")
await asyncio.sleep(5)
# Fetch tickers from MongoDB
logger.info("Fetching tickers from MongoDB...")
tickers = await fetch_tickers()
logger.info(f"Fetched {len(tickers)} tickers. Example: {tickers[:]}")
# Create contracts
contracts = [Stock(t["symbol"], t["exchange"], t["currency"]) for t in tickers]
contracts = await ib.qualifyContractsAsync(*contracts)
logger.info(f"Qualified {len(contracts)} contracts")
# Rotating batch loop
async def rotating_batches():
while not stop_event.is_set():
for i in range(0, len(contracts), MAX_CONCURRENT_TICKERS):
batch = contracts[i:i + MAX_CONCURRENT_TICKERS]
await process_ticker_batch(pool, ib, batch, tick_queue)
await asyncio.sleep(1)
rotator_task = asyncio.create_task(rotating_batches())
# Wait for shutdown
await stop_event.wait()
logger.info("Shutting down...")
rotator_task.cancel()
await asyncio.gather(rotator_task, return_exceptions=True)
worker_task.cancel()
await asyncio.gather(worker_task, return_exceptions=True)
await pool.close()
ib.disconnect()
logger.info("Disconnected from IB and DB")
if __name__ == "__main__":
asyncio.run(main())I then discussed it with IB, It thought that maybe I can't pull too many tickers, not the case, I then asked if I needed to wait for the closure of the ticker request, nope not that either. They said as long as it was only 50 per second, it wouldn't matter. So I thought ok I'll set my concurrents to 1, this does take affect and took a lot longer but none the less, 538 and boom it stopped.It stops at 520+ or so on any batch number, I set it to 1 second on the delay and 50 on the tickers and it again, hit 5** odd and again just stopped. I also did a much longer sleep and only 40 tickers and that failed too.
To note, I checked nasdaq_tickers and there are 1300 objects in there so it's not that :(
The code is running on a windows 11 VM, mongo, postgres is all on ubuntu 24.04. Please ignore my terrible authentication methods, I'll be getting vault in soon :)
Thanks
Comps
