Jan-30-2025, 03:57 PM
I'm trying to load a PostgreSQL table with information from MSsql. I have 2 tables that are identical on both db's. On the Mssql side there is a column called ImageSource which is defined as Image, and on the PG side I have it as Bytea. The script posted is just trying to pull 1 record to test with, but I'm running into an issue in the DF portion. The 2 tables have 21 fields defined.
This is the error message I'm receiving. I can see it connect, and my column list shows 21 fields. I blanked out all db info stuff but that connection stuff is working as I can see activity(connections) on both DB's
When I get this working, I'll have 50,000 to 100K records to export daily so if you see any speed improvements I can make please denote as I'm very new to Python. hence the BatchSize
Error Message from --> df = pd.DataFrame(batch, columns=columns)
raise ValueError(f"Shape of passed values is {passed}, indices imply {implied}")
ValueError: Shape of passed values is (1, 1), indices imply (1, 21)
How do I fix this error?
Many thanks.
This is the error message I'm receiving. I can see it connect, and my column list shows 21 fields. I blanked out all db info stuff but that connection stuff is working as I can see activity(connections) on both DB's
When I get this working, I'll have 50,000 to 100K records to export daily so if you see any speed improvements I can make please denote as I'm very new to Python. hence the BatchSize
Error Message from --> df = pd.DataFrame(batch, columns=columns)
raise ValueError(f"Shape of passed values is {passed}, indices imply {implied}")
ValueError: Shape of passed values is (1, 1), indices imply (1, 21)
How do I fix this error?
Many thanks.
import pyodbc
import psycopg2
import io
from sqlalchemy import create_engine
import pandas as pd
import time
from tqdm import tqdm
import numpy as np
# MSSQL connection
mssql_conn = pyodbc.connect('DRIVER={SQL Server};SERVER=xxxx;DATABASE=yyyy;UID=xxxx;PWD=xxxx;Trusted_Connection=yes')
mssql_cursor = mssql_conn.cursor()
# PostgreSQL connection
pg_conn = psycopg2.connect(database="hhhh", user="xxxxx", password="xxxxx", host="xxxxx", port="xxxx")
pg_cursor = pg_conn.cursor()
# Create SQLAlchemy engine for PostgreSQL
engine = create_engine('postgresql://postgres:xxxxx@xxxxx/xxxxx')
# Query to fetch data from MSSQL
mssql_query = "select id,imagename,imagetype,imagesource,receiveddatetime,imagepath,site,machinenbr,linenbr,takeupnbr,spoolnbr,imageindex,cameranbr,spoolstartdt,left((convert(time(0),SpoolStartTime)),8) as SpoolStartTime,DefectDate,left((convert(time(0),DefectTime)),8) as DefectTime,defectnbr,defectclass,reviewer,UserDefectInput FROM Image_Classification_Master(nolock) WHERE id = 13604630"
mssql_cursor.execute(mssql_query)
# Fetch column names
columns = [column[0] for column in mssql_cursor.description]
# Process and insert data in batches
batch_size = 5000
while True:
batch = mssql_cursor.fetchmany(batch_size)
if not batch:
break
# Convert batch to DataFrame
df = pd.DataFrame(batch, columns=columns)
# Process Image field
if 'ImageSource' in df.columns:
df['ImageSource'] = df['ImageSource'].apply(lambda x: psycopg2.Binary(x) if x else None)
# Insert batch into PostgreSQL
df.to_sql('Image_Classification_Master', engine, if_exists='append', index=False, method='multi', chunksize=batch_size)
total_rows = len(df)
rows_inserted = 0
start_time = time.time()
with tqdm(total=total_rows, desc="Inserting data") as pbar:
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i+batch_size]
batch.to_sql('Image_Classification_Master', engine, if_exists='append', index=False, method='multi')
rows_inserted += len(batch)
pbar.update(len(batch))
# Calculate and display progress
progress = (rows_inserted / total_rows) * 100
elapsed_time = time.time() - start_time
estimated_total_time = (elapsed_time / rows_inserted) * total_rows
remaining_time = estimated_total_time - elapsed_time
print(f"Progress: {progress:.2f}% | Rows inserted: {rows_inserted}/{total_rows}")
print(f"Elapsed time: {elapsed_time:.2f}s | Estimated time remaining: {remaining_time:.2f}s")
print("Insertion complete!")
# Close connections
mssql_cursor.close()
mssql_conn.close()
pg_cursor.close()
pg_conn.close()
