1) Въведение
Здравейте всички! Много хора знаят какво е Redis и ако не знаете, официалният сайт може да ви информира.
За повечето Redis е кеш и понякога опашка от съобщения.
Но какво ще стане, ако се побъркаме и се опитаме да проектираме цяло приложение, използвайки само Redis като съхранение на данни? Какви задачи можем да решим с Redis?
Ще се опитаме да отговорим на тези въпроси в тази статия.
Какво няма да видим тук?
- Всяка структура от данни на Redis в детайли няма да бъде тук. За какви цели трябва да прочетете специални статии или документация.
- Тук също няма да има готов за производство код, който бихте могли да използвате в работата си.
Какво ще видим тук?
- Ще използваме различни структури от данни на Redis, за да реализираме различни задачи на приложение за запознанства.
- Тук ще бъдат примери за Kotlin + Spring Boot.
2) Научете се да създавате и заявявате потребителски профили.
-
Първо, нека се научим как да създаваме потребителски профили с техните имена, харесвания и т.н.
За да направим това, се нуждаем от просто хранилище ключ-стойност. Как да го направя?
- Просто. Redis има структура от данни - хеш. По същество това е просто позната хеш карта за всички нас.
Командите на езика за заявки Redis могат да бъдат намерени тук и тук.
Документацията дори има интерактивен прозорец за изпълнение на тези команди направо на страницата. И целият списък с команди може да бъде намерен тук.
Подобни връзки работят за всички следващи команди, които ще разгледаме.
В кода използваме RedisTemplate почти навсякъде. Това е основно нещо за работа с Redis в екосистемата Spring.
Единствената разлика от картата тук е, че предаваме "поле" като първи аргумент. „Полето“ е името на нашия хеш.
fun addUser(user: User) {
val hashOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
hashOps.put(Constants.USERS, user.name, user)
}
fun getUser(userId: String): User {
val userOps: HashOperations<String, String, User> = userRedisTemplate.opsForHash()
return userOps.get(Constants.USERS, userId)?: throw NotFoundException("Not found user by $userId")
}
По-горе е пример за това как може да изглежда в Kotlin с помощта на библиотеките на Spring.
Всички части от кода от тази статия можете да намерите в Github.
3) Актуализиране на харесванията на потребителите чрез Redis списъци.
-
Страхотен!. Имаме потребители и информация за харесвания.
Сега трябва да намерим начин как да актуализираме това харесване.
Предполагаме, че събитията могат да се случват много често. Така че нека използваме асинхронен подход с някаква опашка. И ще четем информацията от опашката по график.
- Redis има структура на списъчни данни с такъв набор от команди. Можете да използвате Redis списъци както като FIFO опашка, така и като LIFO стек.
През пролетта използваме същия подход за получаване на ListOperations от RedisTemplate.
Трябва да пишем вдясно. Защото тук симулираме FIFO опашка от дясно на ляво.
fun putUserLike(userFrom: String, userTo: String, like: Boolean) {
val userLike = UserLike(userFrom, userTo, like)
val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
listOps.rightPush(Constants.USER_LIKES, userLike)
}
Сега ще изпълняваме работата си по график.
Ние просто прехвърляме информация от една структура от данни на Redis в друга. Това ни е достатъчно за пример.
fun processUserLikes() {
val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
userLikes.forEach{updateUserLike(it)}
}
Тук актуализирането на потребителя е наистина лесно. Поздравете HashOperation от предишната част.
private fun updateUserLike(userLike: UserLike) {
val userOps: HashOperations<String, String, User> = userLikeRedisTemplate.opsForHash()
val fromUser = userOps.get(Constants.USERS, userLike.fromUserId)?: throw UserNotFoundException(userLike.fromUserId)
fromUser.fromLikes.add(userLike)
val toUser = userOps.get(Constants.USERS, userLike.toUserId)?: throw UserNotFoundException(userLike.toUserId)
toUser.fromLikes.add(userLike)
userOps.putAll(Constants.USERS, mapOf(userLike.fromUserId to fromUser, userLike.toUserId to toUser))
}
И сега показваме как да получите данни от списъка. Получаваме това отляво. За да получим куп данни от списъка, ще използваме range
метод.
И има един важен момент. Методът на диапазон ще получи само данни от списъка, но не и ще ги изтрие.
Така че трябва да използваме друг метод за изтриване на данни. trim
направи го. (И можете да имате някои въпроси там).
private fun getUserLikesLast(number: Long): List<UserLike> {
val listOps: ListOperations<String, UserLike> = userLikeRedisTemplate.opsForList()
return (listOps.range(Constants.USER_LIKES, 0, number)?:mutableListOf()).filterIsInstance(UserLike::class.java)
.also{
listOps.trim(Constants.USER_LIKES, number, -1)
}
}
А въпросите са:
- Как да прехвърля данни от списъка в няколко нишки?
- И как да гарантирам, че данните няма да загубят в случай на грешка? От кутията - нищо. Трябва да получите данни от списъка в една нишка. И трябва да се справяте с всички нюанси, които възникват сами.
4) Изпращане на push известия до потребители чрез pub/sub
-
Продължавам напред!
Вече имаме потребителски профили. Разбрахме как да се справим с потока от харесвания от тези потребители.Но представете си случая, когато искате да изпратите push известие до потребител в момента, в който получим харесване.
Какво ще правиш?
- Вече имаме асинхронен процес за обработка на харесвания, така че нека просто вградим там изпращане на push известия. Ще използваме WebSocket за тази цел, разбира се. И можем просто да го изпратим чрез WebSocket, където получаваме харесване. Но какво ще стане, ако искаме да изпълним дългосрочен код, преди да изпратим? Или какво, ако искаме да делегираме работата с WebSocket на друг компонент?
- Ще вземем и ще прехвърлим данните си отново от една структура от данни на Redis (списък) в друга (pub/sub).
fun processUserLikes() {
val userLikes = getUserLikesLast(USERS_BATCH_LIMIT).filter{ it.isLike}
pushLikesToUsers(userLikes)
userLikes.forEach{updateUserLike(it)}
}
private fun pushLikesToUsers(userLikes: List<UserLike>) {
GlobalScope.launch(Dispatchers.IO){
userLikes.forEach {
pushProducer.publish(it)
}
}
}
@Component
class PushProducer(val redisTemplate: RedisTemplate<String, String>, val pushTopic: ChannelTopic, val objectMapper: ObjectMapper) {
fun publish(userLike: UserLike) {
redisTemplate.convertAndSend(pushTopic.topic, objectMapper.writeValueAsString(userLike))
}
}
Обвързването на слушателя към темата се намира в конфигурацията.
Сега можем просто да вземем нашия слушател в отделна услуга.
@Component
class PushListener(val objectMapper: ObjectMapper): MessageListener {
private val log = KotlinLogging.logger {}
override fun onMessage(userLikeMessage: Message, pattern: ByteArray?) {
// websocket functionality would be here
log.info("Received: ${objectMapper.readValue(userLikeMessage.body, UserLike::class.java)}")
}
}
5) Намиране на най-близките потребители чрез географски операции.
- Приключихме с харесванията. Но какво да кажем за възможността за намиране на най-близките потребители до дадена точка.
- GeoOperations ще ни помогне с това. Ще съхраняваме двойките ключ-стойност, но сега нашата стойност е потребителска координата. За да намерим, ще използваме
[radius](https://redis.io/commands/georadius)
метод. Предаваме потребителския идентификатор за намиране и самия радиус на търсене.
Redis връща резултат, включително нашия потребителски идентификатор.
fun getNearUserIds(userId: String, distance: Double = 1000.0): List<String> {
val geoOps: GeoOperations<String, String> = stringRedisTemplate.opsForGeo()
return geoOps.radius(USER_GEO_POINT, userId, Distance(distance, RedisGeoCommands.DistanceUnit.KILOMETERS))
?.content?.map{ it.content.name}?.filter{ it!= userId}?:listOf()
}
6) Актуализиране на местоположението на потребителите чрез потоци
-
Реализирахме почти всичко, от което се нуждаем. Но сега отново имаме ситуация, когато трябва да актуализираме данни, които могат да се променят бързо.
Така че трябва отново да използваме опашка, но би било хубаво да имаме нещо по-мащабируемо.
- Redis потоците могат да помогнат за решаването на този проблем.
- Вероятно знаете за Kafka и вероятно дори знаете за потоците на Kafka, но това не е същото като потоците на Redis. Но самият Кафка е нещо доста подобно на Redis потоците. Това също е структура от данни напред, която има потребителска група и изместване. Това е по-сложна структура от данни, но ни позволява да получаваме данни успоредно и с помощта на реактивен подход.
Вижте документацията за Redis поток за подробности.
Spring има ReactiveRedisTemplate и RedisTemplate за работа със структури от данни Redis. За нас би било по-удобно да използваме RedisTemplate за запис на стойността и ReactiveRedisTemplate за четене. Ако говорим за потоци. Но в такива случаи нищо няма да работи.
Ако някой знае защо работи по този начин, заради Spring или Redis, да пише в коментарите.
fun publishUserPoint(userPoint: UserPoint) {
val userPointRecord = ObjectRecord.create(USER_GEO_STREAM_NAME, userPoint)
reactiveRedisTemplate
.opsForStream<String, Any>()
.add(userPointRecord)
.subscribe{println("Send RecordId: $it")}
}
Нашият метод за слушател ще изглежда така:
@Service
class UserPointsConsumer(
private val userGeoService: UserGeoService
): StreamListener<String, ObjectRecord<String, UserPoint>> {
override fun onMessage(record: ObjectRecord<String, UserPoint>) {
userGeoService.addUserPoint(record.value)
}
}
Просто преместваме нашите данни в структура от географски данни.
7) Пребройте уникалните сесии с помощта на HyperLogLog.
- И накрая, нека си представим, че трябва да изчислим колко потребители са влезли в приложението на ден.
- Освен това, нека имаме предвид, че можем да имаме много потребители. Така че една проста опция с помощта на хеш карта не е подходяща за нас, защото ще консумира твърде много памет. Как можем да направим това, използвайки по-малко ресурси?
- Там влиза в действие вероятностна структура от данни HyperLogLog. Можете да прочетете повече за това на страницата в Уикипедия. Ключова характеристика е, че тази структура от данни ни позволява да решим проблема, използвайки значително по-малко памет от опцията с хеш карта.
fun uniqueActivitiesPerDay(): Long {
val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
return hyperLogLogOps.size(Constants.TODAY_ACTIVITIES)
}
fun userOpenApp(userId: String): Long {
val hyperLogLogOps: HyperLogLogOperations<String, String> = stringRedisTemplate.opsForHyperLogLog()
return hyperLogLogOps.add(Constants.TODAY_ACTIVITIES, userId)
}
8) Заключение
В тази статия разгледахме различните структури от данни на Redis. Включително не толкова популярни гео операции и HyperLogLog.
Използвахме ги за решаване на реални проблеми.
Почти проектирахме Tinder, възможно е във FAANG след това)))
Също така, ние подчертахме основните нюанси и проблеми, които могат да се срещнат при работа с Redis.
Redis е много функционално съхранение на данни. И ако вече го имате във вашата инфраструктура, може да си струва да погледнете Redis като инструмент за решаване на другите ви задачи с него без ненужни усложнения.
PS:
Всички примери за код могат да бъдат намерени на github.
Пишете в коментарите, ако забележите грешка.
Оставете коментар по-долу за такъв начин за описание с помощта на някаква технология. Харесва ли ви или не?
И ме последвайте в Twitter:🐦@de____ro