Проблемът е във вашия код. Тъй като презаписвате таблица, от която се опитвате да четете, ефективно заличавате всички данни, преди Spark действително да има достъп до тях.
Не забравяйте, че Spark е мързелив. Когато създавате Dataset Spark извлича необходимите метаданни, но не зарежда данните. Така че няма магически кеш, който да запази оригиналното съдържание. Данните ще бъдат заредени, когато действително се изискват. Ето го, когато изпълнявате write действие и когато започнете да пишете, няма повече данни за извличане.
Това, от което се нуждаете, е нещо подобно:
- Създайте
Dataset. -
Приложете необходимите трансформации и запишете данни в междинна MySQL таблица.
-
TRUNCATEоригиналния вход иINSERT INTO ... SELECTот междинната таблица илиDROPоригиналната таблица иRENAMEмеждинна маса.
Алтернативен, но по-малко благоприятен подход би бил:
- Създайте
Dataset. - Приложете необходимите трансформации и запишете данни към постоянна таблица на Spark (
df.write.saveAsTable(...)или еквивалент) TRUNCATEоригиналния вход.- Прочетете обратно данните и запазете (
spark.table(...).write.jdbc(...)) - Пуснете таблицата Spark.
Не можем да подчертаем достатъчно, че използваме cache на Spark / persist не е пътят. Дори и с консервативния StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2 ) кешираните данни могат да бъдат загубени (отказ на възли), което води до тихи грешки при коректността.