Според грешката вече имате низ (вече сте направили df.selectExpr("CAST(value AS STRING)")
), така че трябва да опитате да получите събитието Row като String
, а не Array[Byte]
Започнете с промяна на
val valueStr = new String(record.getAs[Array[Byte]]("value"))
към
val valueStr = record.getAs[String]("value")
Разбирам, че може вече да имате клъстер за изпълнение на код на Spark, но бих предложил да разгледате Конектор за мивка Kafka Connect Mongo така че да не се налага да пишете и поддържате свой собствен Mongo писател в код на Spark.
Или можете да запишете набори от данни на Spark също директно в mongo