Реших проблема си. Причината за непоследователните преброявания беше MongoDefaultPartitioner който обвива MongoSamplePartitioner който използва произволна извадка. Честно казано, това е доста странно по подразбиране за мен. Аз лично бих предпочел вместо това да имам бавен, но последователен разделител. Подробностите за опциите за дялове могат да бъдат намерени в официалните опции за конфигуриране документация.
код:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()