В Spark функциите на RDD
s (като map
тук) се сериализират и изпращат на изпълнителите за обработка. Това означава, че всички елементи, съдържащи се в тези операции, трябва да могат да бъдат сериализирани.
Връзката Redis тук не може да се сериализира, тъй като отваря TCP връзки към целевата DB, които са свързани с машината, където е създадена.
Решението е да се създадат тези връзки на изпълнителите в контекста на локалното изпълнение. Има няколко начина да направите това. Две, които идват на ум са:
rdd.mapPartitions
:ви позволява да обработвате цял дял наведнъж и следователно да амортизирате разходите за създаване на връзки)- Единични мениджъри на връзки:Създайте връзката веднъж на изпълнител
mapPartitions
е по-лесно, тъй като всичко, което изисква, е малка промяна в структурата на програмата:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Мениджърът на единични връзки може да бъде моделиран с обект, който съдържа мързелива препратка към връзка (забележка:променлива референтна връзка също ще работи).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
След това този обект може да се използва за създаване на 1 връзка на работещ JVM и се използва като Serializable
обект в затваряне на операция.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
Предимството на използването на singleton обекта е по-малко излишно, тъй като връзките се създават само веднъж от JVM (за разлика от 1 на RDD дял)
Има и някои недостатъци:
- почистването на връзките е трудно (кука за изключване/таймери)
- трябва да се гарантира безопасност на нишките на споделените ресурси
(*) код, предоставен за илюстрация. Не е компилиран или тестван.