Проблемът е във вашия код. Тъй като презаписвате таблица, от която се опитвате да четете, ефективно заличавате всички данни, преди Spark действително да има достъп до тях.
Не забравяйте, че Spark е мързелив. Когато създавате Dataset
Spark извлича необходимите метаданни, но не зарежда данните. Така че няма магически кеш, който да запази оригиналното съдържание. Данните ще бъдат заредени, когато действително се изискват. Ето го, когато изпълнявате write
действие и когато започнете да пишете, няма повече данни за извличане.
Това, от което се нуждаете, е нещо подобно:
- Създайте
Dataset
. -
Приложете необходимите трансформации и запишете данни в междинна MySQL таблица.
-
TRUNCATE
оригиналния вход иINSERT INTO ... SELECT
от междинната таблица илиDROP
оригиналната таблица иRENAME
междинна маса.
Алтернативен, но по-малко благоприятен подход би бил:
- Създайте
Dataset
. - Приложете необходимите трансформации и запишете данни към постоянна таблица на Spark (
df.write.saveAsTable(...)
или еквивалент) TRUNCATE
оригиналния вход.- Прочетете обратно данните и запазете (
spark.table(...).write.jdbc(...)
) - Пуснете таблицата Spark.
Не можем да подчертаем достатъчно, че използваме cache
на Spark / persist
не е пътят. Дори и с консервативния StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) кешираните данни могат да бъдат загубени (отказ на възли), което води до тихи грешки при коректността.