В този конкретен случай е по-добре да паднете до ниво DB-API, защото имате нужда от някои инструменти, които не са изложени дори директно от SQLAlchemy Core, като например copy_expert()
. Това може да стане с помощта на raw_connection()
. Ако вашите изходни данни са CSV файл, в този случай изобщо не се нуждаете от панди. Започнете със създаване на временна междинна таблица, копирайте данни във временната таблица и вмъкнете в целевата таблица с обработка на конфликти:
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
ON COMMIT DROP""")
with open("your_source.csv") as data:
cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
FROM STDIN WITH CSV""", data)
cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
SELECT itemid, title, street, pincode
FROM TEST_STAGING
ON CONFLICT ( itemid )
DO UPDATE SET title = EXCLUDED.title
, street = EXCLUDED.street
, pincode = EXCLUDED.pincode""")
except:
conn.rollback()
raise
else:
conn.commit()
finally:
conn.close()
Ако от друга страна вашите изходни данни са DataFrame
, все още можете да използвате COPY
чрез подаване на функция като method=
към to_sql()
. Функцията може дори да скрие цялата горна логика:
import csv
from io import StringIO
from psycopg2 import sql
def psql_upsert_copy(table, conn, keys, data_iter):
dbapi_conn = conn.connection
buf = StringIO()
writer = csv.writer(buf)
writer.writerows(data_iter)
buf.seek(0)
if table.schema:
table_name = sql.SQL("{}.{}").format(
sql.Identifier(table.schema), sql.Identifier(table.name))
else:
table_name = sql.Identifier(table.name)
tmp_table_name = sql.Identifier(table.name + "_staging")
columns = sql.SQL(", ").join(map(sql.Identifier, keys))
with dbapi_conn.cursor() as cur:
# Create the staging table
stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
cur.execute(stmt)
# Populate the staging table
stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
stmt = sql.SQL(stmt).format(tmp_table_name, columns)
cur.copy_expert(stmt, buf)
# Upsert from the staging table to the destination. First find
# out what the primary key columns are.
stmt = """
SELECT kcu.column_name
FROM information_schema.table_constraints tco
JOIN information_schema.key_column_usage kcu
ON kcu.constraint_name = tco.constraint_name
AND kcu.constraint_schema = tco.constraint_schema
WHERE tco.constraint_type = 'PRIMARY KEY'
AND tco.table_name = %s
"""
args = (table.name,)
if table.schema:
stmt += "AND tco.table_schema = %s"
args += (table.schema,)
cur.execute(stmt, args)
pk_columns = {row[0] for row in cur.fetchall()}
# Separate "data" columns from (primary) key columns
data_columns = [k for k in keys if k not in pk_columns]
# Build conflict_target
pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))
set_ = sql.SQL(", ").join([
sql.SQL("{} = EXCLUDED.{}").format(k, k)
for k in map(sql.Identifier, data_columns)])
stmt = """
INSERT INTO {} ( {} )
SELECT {}
FROM {}
ON CONFLICT ( {} )
DO UPDATE SET {}
"""
stmt = sql.SQL(stmt).format(
table_name, columns, columns, tmp_table_name, pk_columns, set_)
cur.execute(stmt)
След това ще вмъкнете новата DataFrame
използвайки
df.to_sql("test_table", engine,
method=psql_upsert_copy,
index=False,
if_exists="append")
Използването на този метод за връщане на ~1 000 000 реда отне около 16 секунди на тази машина с локална база данни.