PostgreSQL
 sql >> база данни >  >> RDS >> PostgreSQL

Първични ключове с Apache Spark

Scala :

Ако всичко, от което се нуждаете, са уникални номера, можете да използвате zipWithUniqueId и пресъздайте DataFrame. Първо малко импортирани и фиктивни данни:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Извличане на схема за по-нататъшно използване:

val schema = df.schema

Добавяне на поле за идентификация:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Създаване на DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Същото нещо в Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Ако предпочитате последователен номер, можете да замените zipWithUniqueId с zipWithIndex но е малко по-скъпо.

Директно с DataFrame API :

(универсална Scala, Python, Java, R с почти същия синтаксис)

По-рано съм пропуснал monotonicallyIncreasingId функция, която трябва да работи добре, стига да не се нуждаете от последователни числа:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Макар и полезен monotonicallyIncreasingId е недетерминиран. Не само идентификаторите може да са различни от изпълнение до изпълнение, но без допълнителни трикове не могат да се използват за идентифициране на редове, когато следващите операции съдържат филтри.

Забележка :

Възможно е също да използвате rowNumber функция на прозореца:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

За съжаление:

ПРЕДУПРЕЖДЕНИЕ Прозорец:Няма дефиниран дял за работа с прозорец! Преместването на всички данни в един дял може да доведе до сериозно влошаване на производителността.

Така че освен ако нямате естествен начин да разделите данните си и да гарантирате уникалността, не е особено полезно в този момент.



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Apache Spark:JDBC връзката не работи

  2. PostgreSQL - извличане на реда, който има максимална стойност за колона

  3. Postgres:дефиниране на стойност по подразбиране за CAST неуспехи?

  4. psql:ФАТАЛНО:ролята postgres не съществува

  5. Има ли начин да деактивирате претоварването на функциите в Postgres