Redis
 sql >> база данни >  >> NoSQL >> Redis

Как да внедрите поток от фючърси за блокиращо обаждане с помощта на futures.rs и Redis PubSub?

Тежко предупреждение Никога преди не съм използвал тази библиотека и познанията ми на ниско ниво за някои от концепциите са малко... липсват. Предимно чета урока. Почти съм сигурен, че всеки, който е правил асинхронна работа, ще прочете това и ще се смее, но може да е полезна отправна точка за други хора. Предупреждение!

Нека започнем с нещо малко по-просто, като демонстрираме как Stream върши работа. Можем да преобразуваме итератор на Result s в поток:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

Това ни показва един начин да консумираме потока. Използваме and_then да направите нещо за всеки полезен товар (тук просто го разпечатвате) и след това for_each за да конвертирате Stream обратно в Future . След това можем да стартираме бъдещето, като извикаме странното име forget метод.

Следващото е да свържете библиотеката Redis в микса, като обработвате само едно съобщение. Тъй като get_message() методът блокира, трябва да въведем някои нишки в микса. Не е добра идея да извършвате голямо количество работа в този тип асинхронна система, тъй като всичко останало ще бъде блокирано. Например:

Освен ако не е уредено по друг начин, трябва да се гарантира, че внедряването на тази функция завършва много бързо .

В един идеален свят кутията за redis ще бъде изградена върху библиотека като фючърси и ще изложи всичко това естествено.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Тук разбирането ми става по-мътно. В отделна нишка блокираме съобщението и го избутваме в канала, когато го получим. Това, което не разбирам, е защо трябва да държим дръжката на конеца. Бих очаквал това foo.forget ще се блокира, чакайки, докато потокът се изпразни.

При telnet връзка със сървъра Redis, изпратете това:

publish rust awesome

И ще видите, че работи. Добавянето на оператори за печат показва, че (за мен) foo.forget операторът се изпълнява преди нишката да се създаде.

Множеството съобщения е по-сложно. Sender консумира себе си, за да попречи на генериращата страна да изпревари твърде далече от страната-консуматор. Това се постига чрез връщане на друго бъдеще от send ! Трябва да го върнем оттам, за да го използваме повторно за следващата итерация на цикъла:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

Сигурен съм, че с течение на времето ще има повече екосистема за този тип взаимодействие. Например, кутията на futures-cpupool вероятно да бъде разширен, за да поддържа подобен случай на употреба.




  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Персонализирани команди на Redis

  2. Redis запазва ли данните?

  3. не може да убие redis-сървър на linux

  4. Прескачащи ключове за сканиране на Redis

  5. blpop спира обработката на опашката след известно време