Appliquer la concurrence avec async
Dans cette section, nous appliquerons l’async à certains des mêmes défis de concurrence que nous avons abordés avec les threads au chapitre 16. Comme nous avons déjà discuté de beaucoup d’idées clés là-bas, dans cette section nous nous concentrerons sur ce qui diffère entre les threads et les futures.
Dans de nombreux cas, les API pour travailler avec la concurrence en utilisant l’async sont très similaires à celles utilisant les threads. Dans d’autres cas, elles finissent par être assez différentes. Même quand les API semblent similaires entre les threads et l’async, elles ont souvent un comportement différent — et elles ont presque toujours des caractéristiques de performance différentes.
Créer une nouvelle tâche avec spawn_task
La première opération que nous avons abordée dans la section « Créer un nouveau thread avec spawn » du chapitre 16 était de compter sur deux threads séparés. Faisons la même chose en utilisant async. La crate trpl fournit une fonction spawn_task qui ressemble beaucoup à l’API thread::spawn, et une fonction sleep qui est une version async de l’API thread::sleep. Nous pouvons les utiliser ensemble pour implémenter l’exemple de comptage, comme le montre l’encart 17-6.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
Comme point de départ, nous configurons notre fonction main avec trpl::block_on pour que notre fonction de niveau supérieur puisse être async.
Remarque : à partir de maintenant dans le chapitre, chaque exemple inclura ce même code d’enveloppe avec
trpl::block_ondansmain, donc nous l’omettrons souvent comme nous le faisons avecmain. N’oubliez pas de l’inclure dans votre code !
Ensuite, nous écrivons deux boucles dans ce bloc, chacune contenant un appel à trpl::sleep, qui attend une demi-seconde (500 millisecondes) avant d’envoyer le message suivant. Nous mettons une boucle dans le corps d’un trpl::spawn_task et l’autre dans une boucle for de niveau supérieur. Nous ajoutons aussi un await après les appels à sleep.
Ce code se comporte de manière similaire à l’implémentation basée sur les threads — y compris le fait que vous pourriez voir les messages apparaître dans un ordre différent dans votre propre terminal quand vous l’exécutez :
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
Cette version s’arrête dès que la boucle for dans le corps du bloc async principal se terminé, car la tâche lancée par spawn_task est arrêtée quand la fonction main se terminé. Si vous voulez qu’elle s’exécute jusqu’à la fin de la tâche, vous devrez utiliser un handle de jointure pour attendre que la première tâche se terminé. Avec les threads, nous utilisions la méthode join pour « bloquer » jusqu’à ce que le thread ait fini de s’exécuter. Dans l’encart 17-7, nous pouvons utiliser await pour faire la même chose, car le handle de tâche est lui-même une future. Son type Output est un Result, donc nous le déballons aussi après l’avoir attendu.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
});
}
await with a join handle to run a task to completionCette version mise à jour s’exécute jusqu’à ce que les deux boucles soient terminées :
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Jusqu’ici, il semble que l’async et les threads nous donnent des résultats similaires, juste avec une syntaxe différente : utiliser await au lieu d’appeler join sur le handle de jointure, et attendre les appels à sleep.
La plus grande différence est que nous n’avons pas eu besoin de créer un autre thread du système d’exploitation pour faire cela. En fait, nous n’avons même pas besoin de lancer une tâche ici. Comme les blocs async se compilent en futures anonymes, nous pouvons mettre chaque boucle dans un bloc async et laisser le runtime les exécuter toutes les deux jusqu’à leur achèvement en utilisant la fonction trpl::join.
Dans la section « Attendre la fin de tous les threads » du chapitre 16, nous avons utilisé la méthode join pour nous assurer que le programme n’arrêtait pas avant que les threads générés aient terminé. Nous avons besoin du même type de mécanisme ici : nous avons besoin d’un moyen d’attendre que la tâche async se terminé avant de continuer.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let fut1 = async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
});
}
trpl::join to await two anonymous futuresQuand nous exécutons cela, nous voyons les deux futures s’exécuter jusqu’à leur achèvement :
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Maintenant, vous verrez exactement le même ordre à chaque fois, ce qui est très différent de ce que nous avons vu avec les threads et avec trpl::spawn_task dans l’encart 17-7. C’est parce que la fonction trpl::join est équitable, ce qui signifie qu’elle vérifie chaque future aussi souvent, en alternant entre elles, et ne laisse jamais l’une prendre de l’avance si l’autre est prête. Avec les threads, le système d’exploitation décide quel thread vérifier et combien de temps le laisser s’exécuter. Avec le Rust async, c’est le runtime qui décide quelle tâche vérifier. (En pratique, les détails se compliquent car un runtime async peut utiliser des threads du système d’exploitation en coulisses dans le cadre de sa gestion de la concurrence, donc garantir l’équité peut demander plus de travail pour un runtime — mais c’est quand même possible !) Les runtimes n’ont pas à garantir l’équité pour une opération donnée, et ils offrent souvent différentes API pour vous permettre de choisir si vous voulez ou non l’équité.
Essayez quelques-unes de ces variations sur l’attente des futures et voyez ce qu’elles font :
- Retirez le bloc async autour de l’une où des deux boucles.
- Attendez chaque bloc async immédiatement après l’avoir défini.
- N’encapsulez que la première boucle dans un bloc async, et attendez la future résultante après le corps de la deuxième boucle.
Pour un défi supplémentaire, essayez de deviner quelle sera la sortie dans chaque cas avant d’exécuter le code !
Envoyer des données entre deux tâches en utilisant le passage de messages
Le partage de données entre futures vous sera aussi familier : nous utiliserons à nouveau le passage de messages, mais cette fois avec des versions async des types et des fonctions. Nous prendrons un chemin légèrement différent de celui de la section [« Transférer des données entre threads avec le passage de messages »][message-passing-threads] du chapitre 16 pour illustrer certaines des différences clés entre la concurrence basée sur les threads et celle basée sur les futures. Dans l’encart 17-9, nous commencerons avec un seul bloc async — sans lancer de tâche séparée comme nous avions lancé un thread séparé.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let val = String::from("hi");
tx.send(val).unwrap();
let received = rx.recv().await.unwrap();
println!("received '{received}'");
});
}
tx and rxIci, nous utilisons trpl::channel, une version async de l’API de canal multiple-producteurs, unique-consommateur que nous avons utilisée avec les threads au chapitre 16. La version async de l’API est seulement un peu différente de la version basée sur les threads : elle utilise un récepteur rx mutable plutôt qu’immutable, et sa méthode recv produit une future que nous devons attendre au lieu de produire directement la valeur. Maintenant, nous pouvons envoyer des messages de l’émetteur au récepteur. Remarquez que nous n’avons pas besoin de lancer un thread séparé ni même une tâche ; nous avons simplement besoin d’attendre l’appel rx.recv.
La méthode synchrone Receiver::recv de std::mpsc::channel bloque jusqu’à recevoir un message. La méthode trpl::Receiver::recv ne le fait pas, car elle est async. Au lieu de bloquer, elle rend le contrôle au runtime jusqu’à ce qu’un message soit reçu ou que le côté émetteur du canal se ferme. En revanche, nous n’attendons pas l’appel send, car il ne bloque pas. Il n’a pas besoin de le faire, car le canal dans lequel nous envoyons est non borné.
Remarque : comme tout ce code async s’exécute dans un bloc async dans un appel à
trpl::block_on, tout ce qui est à l’intérieur peut éviter de bloquer. Cependant, le code à l’extérieur bloquera en attendant que la fonctionblock_onretourné. C’est tout l’intérêt de la fonctiontrpl::block_on: elle vous permet de choisir où bloquer sur un ensemble de code async, et donc où faire la transition entre le code sync et async.
Remarquez deux choses à propos de cet exemple. Premièrement, le message arrivera immédiatement. Deuxièmement, bien que nous utilisions une future ici, il n’y a pas encore de concurrence. Tout dans le listing se passe en séquence, comme ce serait le cas s’il n’y avait pas de futures impliquées.
Abordons la première partie en envoyant une série de messages et en dormant entre eux, comme montré dans l’encart 17-10.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
await between each messageEn plus d’envoyer les messages, nous devons les recevoir. Dans ce cas, comme nous savons combien de messages arrivent, nous pourrions le faire manuellement en appelant rx.recv().await quatre fois. Dans le monde réel, cependant, nous attendrons généralement un nombre inconnu de messages, donc nous devons continuer à attendre jusqu’à ce que nous déterminions qu’il n’y a plus de messages.
Dans l’encart 16-10, nous avons utilisé une boucle for pour traiter tous les éléments reçus d’un canal synchrone. Rust n’a pas encore de moyen d’utiliser une boucle for avec une série d’éléments produits de manière asynchrone, donc nous devons utiliser une boucle que nous n’avons pas vue avant : la boucle conditionnelle while let. C’est la version boucle de la construction if let que nous avons vue dans la section [« Flux de contrôle concis avec if let et let...else »][if-let] du chapitre 6. La boucle continuera à s’exécuter tant que le motif qu’elle spécifie continue de correspondre à la valeur.
L’appel rx.recv produit une future, que nous attendons. Le runtime mettra en pause la future jusqu’à ce qu’elle soit prête. Une fois qu’un message arrive, la future se résoudra en Some(message) autant de fois qu’un message arrive. Quand le canal se ferme, que des messages soient arrivés ou non, la future se résoudra plutôt en None pour indiquer qu’il n’y a plus de valeurs et que nous devons donc arrêter d’interroger — c’est-à-dire arrêter d’attendre.
La boucle while let rassemble tout cela. Si le résultat de l’appel à rx.recv().await est Some(message), nous avons accès au message et pouvons l’utiliser dans le corps de la boucle, tout comme nous pourrions le faire avec if let. Si le résultat est None, la boucle se terminé. Chaque fois que la boucle se complète, elle atteint à nouveau le point d’attente, donc le runtime la met à nouveau en pause jusqu’à ce qu’un autre message arrive.
Le code envoie et reçoit maintenant avec succès tous les messages. Malheureusement, il reste encore quelques problèmes. D’une part, les messages n’arrivent pas à intervalles d’une demi-seconde. Ils arrivent tous en même temps, 2 secondes (2 000 millisecondes) après le démarrage du programme. D’autre part, ce programme ne se terminé jamais ! À la place, il attend indéfiniment de nouveaux messages. Vous devrez l’arrêter en utilisant ctrl-C.
Le code dans un seul bloc async s’exécute de manière linéaire
Commençons par examiner pourquoi les messages arrivent tous en même temps après le délai complet, plutôt que d’arriver avec des délais entre chacun. Dans un bloc async donné, l’ordre dans lequel les mots-clés await apparaissent dans le code est aussi l’ordre dans lequel ils sont exécutés quand le programme s’exécute.
Il n’y a qu’un seul bloc async dans l’encart 17-10, donc tout s’exécute linéairement. Il n’y a toujours pas de concurrence. Tous les appels tx.send se produisent, entrecoupés de tous les appels trpl::sleep et de leurs points d’attente associés. Ce n’est qu’ensuite que la boucle while let passe par les points d’attente des appels recv.
Pour obtenir le comportement que nous voulons, où le délai de sommeil se produit entre chaque message, nous devons mettre les opérations tx et rx dans leurs propres blocs async, comme montré dans l’encart 17-11. Ensuite, le runtime peut exécuter chacun d’eux séparément en utilisant trpl::join, comme dans l’encart 17-8. Encore une fois, nous attendons le résultat de l’appel à trpl::join, pas les futures individuelles. Si nous attendions les futures individuelles en séquence, nous nous retrouverions dans un flux séquentiel — exactement ce que nous essayons de ne pas faire.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send and recv into their own async blocks and awaiting the futures for those blocksAvec le code mis à jour dans l’encart 17-11, les messages s’affichent à intervalles de 500 millisecondes, plutôt que tous en rafale après 2 secondes.
Déplacer la possession dans un bloc async
Le programme ne se terminé cependant toujours jamais, à cause de la façon dont la boucle while let interagit avec trpl::join :
- La future retournée par
trpl::joinne se terminé que quand les deux futures qui lui ont été passées sont terminées. - La future
tx_futse terminé une fois qu’elle a fini de dormir après l’envoi du dernier message dansvals. - La future
rx_futne se terminera pas tant que la bouclewhile letne se terminera pas. - La boucle
while letne se terminera pas tant que l’attente derx.recvne produira pasNone. - L’attente de
rx.recvne retourneraNoneque quand l’autre extrémité du canal sera fermée. - Le canal ne se fermera que si nous appelons
rx.closeou quand le côté émetteur,tx, est droppé. - Nous n’appelons
rx.closenulle part, ettxne sera pas droppé tant que le bloc async le plus externe passé àtrpl::block_onne se terminera pas. - Le bloc ne peut pas se terminer car il est bloqué en attendant que
trpl::joinse terminé, ce qui nous ramène en haut de cette liste.
Actuellement, le bloc async où nous envoyons les messages ne fait qu’emprunter tx car l’envoi d’un message ne nécessite pas la possession, mais si nous pouvions déplacer tx dans ce bloc async, il serait droppé une fois que ce bloc se terminé. Dans la section « Capturer des références ou déplacer la possession » du chapitre 13, vous avez appris à utiliser le mot-clé move avec les closures, et comme discuté dans la section « Utiliser les closures move avec les threads » du chapitre 16, nous devons souvent déplacer des données dans les closures quand nous travaillons avec les threads. Les mêmes dynamiques fondamentales s’appliquent aux blocs async, donc le mot-clé move fonctionne avec les blocs async comme il le fait avec les closures.
Dans l’encart 17-12, nous changeons le bloc utilisé pour envoyer des messages de async à async move.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
Quand nous exécutons cette version du code, il se terminé proprement après l’envoi et la réception du dernier message. Voyons maintenant ce qui devrait changer pour envoyer des données depuis plus d’une future.
Joindre un nombre de futures avec la macro join!
Ce canal async est aussi un canal à producteurs multiples, donc nous pouvons appeler clone sur tx si nous voulons envoyer des messages depuis plusieurs futures, comme montré dans l’encart 17-13.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join!(tx1_fut, tx_fut, rx_fut);
});
}
D’abord, nous clonons tx, créant tx1 en dehors du premier bloc async. Nous déplaçons tx1 dans ce bloc comme nous l’avons fait auparavant avec tx. Puis, plus tard, nous déplaçons le tx original dans un nouveau bloc async, où nous envoyons plus de messages avec un délai légèrement plus lent. Il se trouve que nous mettons ce nouveau bloc async après le bloc async de réception des messages, mais il pourrait tout aussi bien être avant. Ce qui compte, c’est l’ordre dans lequel les futures sont attendues, pas l’ordre dans lequel elles sont créées.
Les deux blocs async pour l’envoi de messages doivent être des blocs async move pour que tx et tx1 soient tous les deux droppés quand ces blocs se terminent. Sinon, nous nous retrouverons dans la même boucle infinie qu’au départ.
Enfin, nous passons de trpl::join à trpl::join! pour gérer la future supplémentaire : la macro join! attend un nombre arbitraire de futures dont nous connaissons le nombre au moment de la compilation. Nous discuterons de l’attente d’une collection d’un nombre inconnu de futures plus loin dans ce chapitre.
Maintenant nous voyons tous les messages des deux futures d’envoi, et comme les futures d’envoi utilisent des délais légèrement différents après l’envoi, les messages sont aussi reçus à ces différents intervalles :
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
Nous avons exploré comment utiliser le passage de messages pour envoyer des données entre futures, comment le code dans un bloc async s’exécute séquentiellement, comment déplacer la possession dans un bloc async, et comment joindre plusieurs futures. Voyons maintenant comment et pourquoi indiquer au runtime qu’il peut passer à une autre tâche.