Transférer des données entre les tâches avec le passage de messages
Une approche de plus en plus populaire pour assurer une concurrence sûre est le passage de messages, où les threads ou les acteurs communiquent en s’envoyant mutuellement des messages contenant des données. Voici l’idée résumée dans un slogan de la documentation du langage Go : “Ne communiquez pas en partageant de la mémoire ; au contraire, partagez de la mémoire en communiquant.”
Pour accomplir la concurrence par envoi de messages, la bibliothèque standard de Rust fournit une implémentation de canaux (channels). Un canal est un concept de programmation général par lequel des données sont envoyées d’un thread à un autre.
Vous pouvez imaginer un canal en programmation comme un canal d’eau directionnel, tel qu’un ruisseau ou une rivière. Si vous mettez quelque chose comme un canard en caoutchouc dans une rivière, il voyagera en aval jusqu’à la fin du cours d’eau.
Un canal a deux moitiés : un émetteur (transmitter) et un récepteur (receiver). La moitié émettrice est l’emplacement en amont où vous mettez le canard en caoutchouc dans la rivière, et la moitié réceptrice est l’endroit où le canard en caoutchouc finit en aval. Une partie de votre code appelle des méthodes sur l’émetteur avec les données que vous voulez envoyer, et une autre partie vérifie l’extrémité réceptrice pour les messages arrivant. Un canal est dit fermé si l’une où l’autre des moitiés, émettrice ou réceptrice, est libérée.
Ici, nous allons construire un programme qui à un thread pour générer des valeurs et les envoyer dans un canal, et un autre thread qui recevra les valeurs et les affichera. Nous enverrons des valeurs simples entre les threads en utilisant un canal pour illustrer la fonctionnalité. Une fois que vous serez familier avec la technique, vous pourrez utiliser des canaux pour tous les threads qui doivent communiquer entre eux, comme un système de chat ou un système où de nombreux threads effectuent des parties d’un calcul et envoient les parties à un thread qui agrège les résultats.
D’abord, dans l’encart 16-6, nous allons créer un canal mais ne rien faire avec. Notez que cela ne compilera pas encore car Rust ne peut pas déterminer quel type de valeurs nous voulons envoyer dans le canal.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx and rxNous créons un nouveau canal en utilisant la fonction mpsc::channel ; mpsc signifie multiple producer, single consumer (multiples producteurs, consommateur unique). En bref, la façon dont la bibliothèque standard de Rust implémente les canaux signifie qu’un canal peut avoir plusieurs extrémités émettrices qui produisent des valeurs mais une seule extrémité réceptrice qui consomme ces valeurs. Imaginez plusieurs ruisseaux se rejoignant dans une grande rivière : tout ce qui est envoyé dans n’importe lequel des ruisseaux finira dans une seule rivière à la fin. Nous commencerons avec un seul producteur pour l’instant, mais nous ajouterons plusieurs producteurs quand cet exemple fonctionnera.
La fonction mpsc::channel retourné un tuple, dont le premier élément est l’extrémité émettrice – l’émetteur – et le second élément est l’extrémité réceptrice – le récepteur. Les abréviations tx et rx sont traditionnellement utilisées dans de nombreux domaines pour transmitter (émetteur) et receiver (récepteur), respectivement, donc nous nommons nos variables ainsi pour indiquer chaque extrémité. Nous utilisons une instruction let avec un motif qui déstructure les tuples ; nous aborderons l’utilisation des motifs dans les instructions let et la déstructuration au chapitre 19. Pour l’instant, sachez qu’utiliser une instruction let de cette manière est une approche pratique pour extraire les éléments du tuple retourné par mpsc::channel.
Déplaçons l’extrémité émettrice dans un thread créé et faisons-lui envoyer une chaîne de caractères pour que le thread créé communique avec le thread principal, comme montré dans l’encart 16-7. C’est comme mettre un canard en caoutchouc en amont de la rivière ou envoyer un message de chat d’un thread à un autre.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
tx to a spawned thread and sending "hi"Encore une fois, nous utilisons thread::spawn pour créer un nouveau thread puis move pour déplacer tx dans la closure afin que le thread créé possède tx. Le thread créé a besoin de posséder l’émetteur pour pouvoir envoyer des messages à travers le canal.
L’émetteur à une méthode send qui prend la valeur que nous voulons envoyer. La méthode send retourné un type Result<T, E>, donc si le récepteur a déjà été libéré et qu’il n’y a nulle part où envoyer une valeur, l’opération d’envoi retournera une erreur. Dans cet exemple, nous appelons unwrap pour paniquer en cas d’erreur. Mais dans une vraie application, nous la gérerions correctement : retournez au chapitre 9 pour revoir les stratégies de gestion d’erreurs appropriées.
Dans l’encart 16-8, nous récupérerons la valeur depuis le récepteur dans le thread principal. C’est comme récupérer le canard en caoutchouc de l’eau à la fin de la rivière ou recevoir un message de chat.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
"hi" in the main thread and printing itLe récepteur a deux méthodes utiles : recv et try_recv. Nous utilisons recv, abréviation de receive (recevoir), qui bloquera l’exécution du thread principal et attendra jusqu’à ce qu’une valeur soit envoyée dans le canal. Une fois qu’une valeur est envoyée, recv la retournera dans un Result<T, E>. Quand l’émetteur se ferme, recv retournera une erreur pour signaler qu’il n’y aura plus de valeurs.
La méthode try_recv ne bloque pas, mais retourné immédiatement un Result<T, E> : une valeur Ok contenant un message s’il y en à un disponible et une valeur Err s’il n’y a pas de messages cette fois. Utiliser try_recv est utile si ce thread a d’autres tâches à effectuer en attendant les messages : nous pourrions écrire une boucle qui appelle try_recv régulièrement, gère un message s’il y en à un disponible, et sinon fait d’autres tâches pendant un petit moment avant de vérifier à nouveau.
Nous avons utilisé recv dans cet exemple par simplicité ; nous n’avons pas d’autre travail à faire pour le thread principal que d’attendre des messages, donc bloquer le thread principal est approprié.
Quand nous exécutons le code de l’encart 16-8, nous verrons la valeur affichée depuis le thread principal :
Got: hi
Parfait !
Transférer la possession via les canaux
Les règles de possession jouent un rôle vital dans l’envoi de messages car elles vous aident à écrire du code concurrent sûr. Prévenir les erreurs dans la programmation concurrente est l’avantage de penser à la possession tout au long de vos programmes Rust. Faisons une expérience pour montrer comment les canaux et la possession fonctionnent ensemble pour prévenir les problèmes : nous essaierons d’utiliser une valeur val dans le thread créé après l’avoir envoyée dans le canal. Essayez de compiler le code de l’encart 16-9 pour voir pourquoi ce code n’est pas autorisé.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val after we’ve sent it down the channelIci, nous essayons d’afficher val après l’avoir envoyé dans le canal via tx.send. Permettre cela serait une mauvaise idée : une fois que la valeur a été envoyée à un autre thread, ce thread pourrait la modifier ou la libérer avant que nous essayions d’utiliser la valeur à nouveau. Potentiellement, les modifications de l’autre thread pourraient causer des erreurs ou des résultats inattendus en raison de données incohérentes ou inexistantes. Cependant, Rust nous donne une erreur si nous essayons de compiler le code de l’encart 16-9 : console {{#include ../listings/ch16-fearless-concurrency/listing-16-09/output.txt}}
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:27
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
Notre erreur de concurrence a causé une erreur de compilation. La fonction send prend la possession de son paramètre, et quand la valeur est déplacée, le récepteur en prend la possession. Cela nous empêche d’utiliser accidentellement la valeur à nouveau après l’avoir envoyée ; le système de possession vérifie que tout est en ordre.
Envoyer plusieurs valeurs
Le code de l’encart 16-8 a compilé et s’est exécuté, mais il ne nous a pas clairement montré que deux threads séparés communiquaient entre eux via le canal.
Dans l’encart 16-10, nous avons fait quelques modifications qui prouveront que le code de l’encart 16-8 s’exécute de manière concurrente : le thread créé enverra maintenant plusieurs messages et fera une pause d’une seconde entre chaque message.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
Cette fois, le thread créé à un vecteur de chaînes de caractères que nous voulons envoyer au thread principal. Nous itérons dessus, en envoyant chacune individuellement, et faisons une pause entre chacune en appelant la fonction thread::sleep avec une valeur Duration d’une seconde.
Dans le thread principal, nous n’appelons plus la fonction recv explicitement : à la place, nous traitons rx comme un itérateur. Pour chaque valeur reçue, nous l’affichons. Quand le canal est fermé, l’itération se terminera.
Lors de l’exécution du code de l’encart 16-10, vous devriez voir la sortie suivante avec une pause d’une seconde entre chaque ligne :
Got: hi
Got: from
Got: the
Got: thread
Comme nous n’avons aucun code qui fait des pauses ou des délais dans la boucle for du thread principal, nous pouvons en déduire que le thread principal attend de recevoir des valeurs du thread créé.
Créer plusieurs producteurs
Plus tôt, nous avons mentionné que mpsc était un acronyme pour multiple producer, single consumer (multiples producteurs, consommateur unique). Mettons mpsc en pratique et enrichissons le code de l’encart 16-10 pour créer plusieurs threads qui envoient tous des valeurs au même récepteur. Nous pouvons le faire en clonant l’émetteur, comme montré dans l’encart 16-11.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
Cette fois, avant de créer le premier thread, nous appelons clone sur l’émetteur. Cela nous donnera un nouvel émetteur que nous pouvons passer au premier thread créé. Nous passons l’émetteur original à un second thread créé. Cela nous donne deux threads, chacun envoyant des messages différents au même récepteur.
Quand vous exécutez le code, votre sortie devrait ressembler à ceci :
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
Vous pourriez voir les valeurs dans un autre ordre, selon votre système. C’est ce qui rend la concurrence intéressante mais aussi difficile. Si vous expérimentez avec thread::sleep, en lui donnant des valeurs variées dans les différents threads, chaque exécution sera plus non déterministe et créera une sortie différente à chaque fois.
Maintenant que nous avons vu comment les canaux fonctionnent, examinons une méthode différente de concurrence.