Тежко предупреждение Никога преди не съм използвал тази библиотека и познанията ми на ниско ниво за някои от концепциите са малко... липсват. Предимно чета урока. Почти съм сигурен, че всеки, който е правил асинхронна работа, ще прочете това и ще се смее, но може да е полезна отправна точка за други хора. Предупреждение!
Нека започнем с нещо малко по-просто, като демонстрираме как Stream
върши работа. Можем да преобразуваме итератор на Result
s в поток:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
Това ни показва един начин да консумираме потока. Използваме and_then
да направите нещо за всеки полезен товар (тук просто го разпечатвате) и след това for_each
за да конвертирате Stream
обратно в Future
. След това можем да стартираме бъдещето, като извикаме странното име forget
метод.
Следващото е да свържете библиотеката Redis в микса, като обработвате само едно съобщение. Тъй като get_message()
методът блокира, трябва да въведем някои нишки в микса. Не е добра идея да извършвате голямо количество работа в този тип асинхронна система, тъй като всичко останало ще бъде блокирано. Например:
Освен ако не е уредено по друг начин, трябва да се гарантира, че внедряването на тази функция завършва много бързо .
В един идеален свят кутията за redis ще бъде изградена върху библиотека като фючърси и ще изложи всичко това естествено.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Тук разбирането ми става по-мътно. В отделна нишка блокираме съобщението и го избутваме в канала, когато го получим. Това, което не разбирам, е защо трябва да държим дръжката на конеца. Бих очаквал това foo.forget
ще се блокира, чакайки, докато потокът се изпразни.
При telnet връзка със сървъра Redis, изпратете това:
publish rust awesome
И ще видите, че работи. Добавянето на оператори за печат показва, че (за мен) foo.forget
операторът се изпълнява преди нишката да се създаде.
Множеството съобщения е по-сложно. Sender
консумира себе си, за да попречи на генериращата страна да изпревари твърде далече от страната-консуматор. Това се постига чрез връщане на друго бъдеще от send
! Трябва да го върнем оттам, за да го използваме повторно за следващата итерация на цикъла:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Сигурен съм, че с течение на времето ще има повече екосистема за този тип взаимодействие. Например, кутията на futures-cpupool вероятно да бъде разширен, за да поддържа подобен случай на употреба.