Les streams : des futures en séquence
Rappelez-vous comment nous avons utilisé le récepteur de notre canal async plus tôt dans ce chapitre dans la section [« Passage de messages »][17-02-messages]. La méthode async recv produit une séquence d’éléments au fil du temps. C’est une instance d’un motif beaucoup plus général connu sous le nom de stream. De nombreux concepts sont naturellement représentés sous forme de streams : des éléments devenant disponibles dans une file d’attente, des morceaux de données extraits progressivement du système de fichiers quand l’ensemble complet de données est trop volumineux pour la mémoire de l’ordinateur, ou des données arrivant par le réseau au fil du temps. Comme les streams sont des futures, nous pouvons les utiliser avec tout autre type de future et les combiner de manières intéressantes. Par exemple, nous pouvons regrouper des événements pour éviter de déclencher trop d’appels réseau, définir des timeouts sur des séquences d’opérations longues, ou limiter les événements de l’interface utilisateur pour éviter de faire un travail inutile.
Nous avons vu une séquence d’éléments au chapitre 13, quand nous avons examiné le trait Iterator dans la section [« Le trait Iterator et la méthode next »][iterator-trait], mais il y a deux différences entre les itérateurs et le récepteur de canal async. La première différence est le temps : les itérateurs sont synchrones, tandis que le récepteur de canal est asynchrone. La deuxième différence est l’API. En travaillant directement avec Iterator, nous appelons sa méthode synchrone next. Avec le stream trpl::Receiver en particulier, nous avons appelé une méthode asynchrone recv à la place. Sinon, ces API sont très similaires, et cette similarité n’est pas une coïncidence. Un stream est comme une forme asynchrone d’itération. Alors que trpl::Receiver attend spécifiquement de recevoir des messages, l’API de stream à usage général est beaucoup plus large : elle fournit l’élément suivant comme le fait Iterator, mais de manière asynchrone.
La similarité entre les itérateurs et les streams en Rust signifie que nous pouvons réellement créer un stream à partir de n’importe quel itérateur. Comme avec un itérateur, nous pouvons travailler avec un stream en appelant sa méthode next puis en attendant la sortie, comme dans l’encart 17-21, qui ne compilera pas encore.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Nous commençons avec un tableau de nombres, que nous convertissons en itérateur puis sur lequel nous appelons map pour doubler toutes les valeurs. Ensuite, nous convertissons l’itérateur en stream en utilisant la fonction trpl::stream_from_iter. Puis, nous bouclons sur les éléments du stream à mesure qu’ils arrivent avec la boucle while let.
Malheureusement, quand nous essayons d’exécuter le code, il ne compilé pas et signale qu’il n’y a pas de méthode next disponible :
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Comme cette sortie l’explique, la raison de l’erreur du compilateur est que nous avons besoin du bon trait dans la portée pour pouvoir utiliser la méthode next. Compte tenu de notre discussion jusqu’à présent, vous pourriez raisonnablement vous attendre à ce que ce trait soit Stream, mais c’est en fait StreamExt. Abréviation d’extension, Ext est un motif courant dans la communauté Rust pour étendre un trait avec un autre.
Le trait Stream définit une interface de bas niveau qui combine effectivement les traits Iterator et Future. StreamExt fournit un ensemble d’API de plus haut niveau par-dessus Stream, incluant la méthode next ainsi que d’autres méthodes utilitaires similaires à celles fournies par le trait Iterator. Stream et StreamExt ne font pas encore partie de la bibliothèque standard de Rust, mais la plupart des crates de l’écosystème utilisent des définitions similaires.
La solution à l’erreur du compilateur est d’ajouter une instruction use pour trpl::StreamExt, comme dans l’encart 17-22.
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Avec tous ces éléments assemblés, ce code fonctionne comme nous le voulons ! De plus, maintenant que nous avons StreamExt dans la portée, nous pouvons utiliser toutes ses méthodes utilitaires, tout comme avec les itérateurs.