PostgreSQL
 sql >> база данни >  >> RDS >> PostgreSQL

ВМЪКВАНЕ или АКТУАЛИЗИРАНЕ на групови данни от dataframe/CSV към PostgreSQL база данни

В този конкретен случай е по-добре да паднете до ниво DB-API, защото имате нужда от някои инструменти, които не са изложени дори директно от SQLAlchemy Core, като например copy_expert() . Това може да стане с помощта на raw_connection() . Ако вашите изходни данни са CSV файл, в този случай изобщо не се нуждаете от панди. Започнете със създаване на временна междинна таблица, копирайте данни във временната таблица и вмъкнете в целевата таблица с обработка на конфликти:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

Ако от друга страна вашите изходни данни са DataFrame , все още можете да използвате COPY чрез подаване на функция като method= към to_sql() . Функцията може дори да скрие цялата горна логика:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

След това ще вмъкнете новата DataFrame използвайки

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

Използването на този метод за връщане на ~1 000 000 реда отне около 16 секунди на тази машина с локална база данни.




  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Как да намерите интервала между две дати в PostgreSQL

  2. PostgreSQL - създайте нова база данни чрез pgAdmin UI

  3. Вмъкване на данни в таблици, свързани с външен ключ

  4. PostgreSQL:имена на ограничения по подразбиране

  5. Клониране на таблица на Postgres, включително индекси и данни