D’un serveur monothread à un serveur multithreadé
Pour l’instant, le serveur traitera chaque requête à tour de rôle, ce qui signifie qu’il ne traitera pas une deuxième connexion tant que la première connexion n’aura pas fini d’être traitée. Si le serveur recevait de plus en plus de requêtes, cette exécution séquentielle serait de moins en moins optimale. Si le serveur reçoit une requête qui prend beaucoup de temps à traiter, les requêtes suivantes devront attendre que la longue requête soit terminée, même si les nouvelles requêtes peuvent être traitées rapidement. Nous devons corriger cela, mais d’abord nous allons observer le problème en action.
Simuler une requête lente
Nous allons voir comment une requête traitée lentement peut affecter les autres requêtes faites à notre implémentation actuelle du serveur. L’encart 21-10 implémente la gestion d’une requête vers /sleep avec une réponse lente simulée qui fera dormir le serveur pendant cinq secondes avant de répondre.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --snip--
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// --snip--
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --snip--
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r
Content-Length: {length}\r
\r
{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Nous sommes passés de if à match maintenant que nous avons trois cas. Nous devons faire correspondre explicitement sur une slice de request_line pour faire du pattern matching avec les valeurs littérales de chaîne ; match ne fait pas de référencement et déréférencement automatique, contrairement à la méthode d’égalité.
Le premier bras est le même que le bloc if de l’encart 21-9. Le deuxième bras correspond à une requête vers /sleep. Quand cette requête est reçue, le serveur dormira pendant cinq secondes avant d’afficher la page HTML de succès. Le troisième bras est le même que le bloc else de l’encart 21-9.
Vous pouvez voir à quel point notre serveur est primitif : de vraies bibliothèques géreraient la reconnaissance de multiples requêtes de manière bien moins verbeuse !
Démarrez le serveur en utilisant cargo run. Ensuite, ouvrez deux fenêtres de navigateur : une pour http://127.0.0.1:7878 et l’autre pour http://127.0.0.1:7878/sleep. Si vous entrez l’URI / quelques fois, comme avant, vous verrez qu’il répond rapidement. Mais si vous entrez /sleep puis chargez /, vous verrez que / attend que sleep ait dormi pendant ses cinq secondes complètes avant de se charger.
Il existe plusieurs techniques que nous pourrions utiliser pour éviter que les requêtes ne s’accumulent derrière une requête lente, notamment l’utilisation d’async comme nous l’avons fait au chapitre 17 ; celle que nous allons implémenter est un groupe de threads (thread pool).
Améliorer le débit avec un groupe de threads
Un thread pool (groupe de threads) est un groupe de threads créés qui sont prêts et en attente de traiter une tâche. Quand le programme reçoit une nouvelle tâche, il assigne l’un des threads du pool à la tâche, et ce thread traitera la tâche. Les threads restants dans le pool sont disponibles pour gérer toute autre tâche qui arrive pendant que le premier thread est en cours de traitement. Quand le premier thread a terminé de traiter sa tâche, il est renvoyé dans le pool de threads inactifs, prêt à traiter une nouvelle tâche. Un thread pool vous permet de traiter des connexions de manière concurrente, augmentant le débit de votre serveur.
Nous limiterons le nombre de threads dans le pool à un petit nombre pour nous protéger des attaques DoS ; si notre programme créait un nouveau thread pour chaque requête entrante, quelqu’un faisant 10 millions de requêtes à notre serveur pourrait semer le chaos en épuisant toutes les ressources de notre serveur et en paralysant le traitement des requêtes.
Plutôt que de créer un nombre illimité de threads, nous aurons donc un nombre fixe de threads en attente dans le pool. Les requêtes entrantes sont envoyées au pool pour traitement. Le pool maintiendra une file d’attente de requêtes entrantes. Chaque thread du pool retirera une requête de cette file, traitera la requête, puis demandera une autre requête à la file. Avec cette conception, nous pouvons traiter jusqu’à N requêtes de manière concurrente, où N est le nombre de threads. Si chaque thread répond à une requête de longue durée, les requêtes suivantes peuvent toujours s’accumuler dans la file, mais nous avons augmenté le nombre de requêtes de longue durée que nous pouvons gérer avant d’atteindre ce point.
Cette technique n’est qu’un des nombreux moyens d’améliorer le débit d’un serveur web. D’autres options que vous pourriez explorer sont le modèle fork/join, le modèle d’E/S async monothread et le modèle d’E/S async multithreadé. Si ce sujet vous intéresse, vous pouvez en lire davantage sur les autres solutions et essayer de les implémenter ; avec un langage de bas niveau comme Rust, toutes ces options sont possibles.
Avant de commencer à implémenter un thread pool, parlons de ce à quoi l’utilisation du pool devrait ressembler. Quand vous essayez de concevoir du code, écrire l’interface client en premier peut aider à guider votre conception. Écrivez l’API du code de sorte qu’elle soit structurée de la manière dont vous voulez l’appeler ; ensuite, implémentez la fonctionnalité au sein de cette structure plutôt que d’implémenter la fonctionnalité puis de concevoir l’API publique.
De manière similaire à la façon dont nous avons utilisé le développement piloté par les tests dans le projet du chapitre 12, nous utiliserons ici le développement piloté par le compilateur. Nous écrirons le code qui appelle les fonctions que nous voulons, puis nous examinerons les erreurs du compilateur pour déterminer ce que nous devons changer ensuite pour que le code fonctionne. Avant de faire cela, cependant, nous explorerons la technique que nous n’allons pas utiliser comme point de départ.
Créer un thread pour chaque requête
D’abord, explorons à quoi notre code pourrait ressembler s’il créait un nouveau thread pour chaque connexion. Comme mentionné précédemment, ce n’est pas notre plan final en raison des problèmes liés à la création potentiellement illimitée de threads, mais c’est un point de départ pour obtenir d’abord un serveur multithreadé fonctionnel. Ensuite, nous ajouterons le thread pool comme amélioration, et comparer les deux solutions sera plus facile.
L’encart 21-11 montre les modifications à apporter à main pour créer un nouveau thread afin de gérer chaque flux dans la boucle for.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r
Content-Length: {length}\r
\r
{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Comme vous l’avez appris au chapitre 16, thread::spawn créera un nouveau thread puis exécutera le code de la closure dans le nouveau thread. Si vous exécutez ce code et chargez /sleep dans votre navigateur, puis / dans deux autres onglets du navigateur, vous verrez effectivement que les requêtes vers / n’ont pas besoin d’attendre que /sleep se terminé. Cependant, comme nous l’avons mentionné, cela finira par submerger le système car vous créeriez de nouveaux threads sans aucune limite.
Vous vous souvenez peut-être aussi du chapitre 17 que c’est exactement le type de situation où async et await brillent vraiment ! Gardez cela à l’esprit pendant que nous construisons le thread pool et réfléchissez à ce qui serait différent ou identique avec async.
Créer un nombre fini de threads
Nous voulons que notre thread pool fonctionne de manière similaire et familière afin que passer des threads à un thread pool ne nécessite pas de grands changements dans le code qui utilise notre API. L’encart 21-12 montre l’interface hypothétique d’une structure ThreadPool que nous voulons utiliser à la place de thread::spawn.
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r
Content-Length: {length}\r
\r
{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
ThreadPool interfaceNous utilisons ThreadPool::new pour créer un nouveau thread pool avec un nombre configurable de threads, dans ce cas quatre. Ensuite, dans la boucle for, pool.execute à une interface similaire à thread::spawn en ce qu’il prend une closure que le pool devrait exécuter pour chaque flux. Nous devons implémenter pool.execute de sorte qu’il prenne la closure et la donne à un thread du pool pour l’exécuter. Ce code ne compilera pas encore, mais nous allons essayer pour que le compilateur puisse nous guider dans la correction.
Construire ThreadPool en utilisant le développement piloté par le compilateur
Effectuez les modifications de l’encart 21-12 dans src/main.rs, puis utilisons les erreurs du compilateur de cargo check pour piloter notre développement. Voici la première erreur que nous obtenons : console {{#include ../listings/ch21-web-server/listing-21-12/output.txt}}
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Parfait ! Cette erreur nous dit que nous avons besoin d’un type ou module ThreadPool, donc nous allons en construire un maintenant. Notre implémentation de ThreadPool sera indépendante du type de travail que notre serveur web effectue. Donc, transformons le crate hello d’un crate binaire en un crate de bibliothèque pour héberger notre implémentation de ThreadPool. Après être passé à un crate de bibliothèque, nous pourrons aussi utiliser la bibliothèque de thread pool séparée pour tout travail que nous voulons faire en utilisant un thread pool, pas seulement pour servir des requêtes web.
Créez un fichier src/lib.rs qui contient ce qui suit, qui est la définition la plus simple d’une structure ThreadPool que nous puissions avoir pour le moment :
pub struct ThreadPool;
Ensuite, modifiez le fichier main.rs pour importer ThreadPool dans la portée depuis le crate de bibliothèque en ajoutant le code suivant en haut de src/main.rs :
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r
Content-Length: {length}\r
\r
{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Ce code ne fonctionnera toujours pas, mais vérifions-le à nouveau pour obtenir la prochaine erreur que nous devons traiter : console {{#include ../listings/ch21-web-server/no-listing-01-define-threadpool-struct/output.txt}}
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Cette erreur indique qu’ensuite nous devons créer une fonction associée nommée new pour ThreadPool. Nous savons aussi que new doit avoir un paramètre qui peut accepter 4 comme argument et devrait retourner une instance de ThreadPool. Implémentons la fonction new la plus simple qui aura ces caractéristiques :
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
Nous avons choisi usize comme type du paramètre size car nous savons qu’un nombre négatif de threads n’à aucun sens. Nous savons aussi que nous utiliserons ce 4 comme nombre d’éléments dans une collection de threads, ce qui est la raison d’être du type usize, comme discuté dans la section [« Les types d’entiers »][integer-types] du chapitre 3.
Vérifions le code à nouveau : console {{#include ../listings/ch21-web-server/no-listing-02-impl-threadpool-new/output.txt}}
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Maintenant l’erreur survient parce que nous n’avons pas de méthode execute sur ThreadPool. Rappelez-vous de la section « Créer un nombre fini de threads » que nous avions décidé que notre thread pool devrait avoir une interface similaire à thread::spawn. De plus, nous allons implémenter la fonction execute de sorte qu’elle prenne la closure qu’on lui donne et la transmette à un thread inactif du pool pour l’exécuter.
Nous allons définir la méthode execute sur ThreadPool pour prendre une closure comme paramètre. Rappelez-vous de la section [« Déplacer les valeurs capturées hors des closures »][moving-out-of-closures] du chapitre 13 que nous pouvons prendre des closures comme paramètres avec trois traits différents : Fn, FnMut et FnOnce. Nous devons décider quel type de closure utiliser ici. Nous savons que nous finirons par faire quelque chose de similaire à l’implémentation de thread::spawn de la bibliothèque standard, donc nous pouvons regarder quelles contraintes la signature de thread::spawn a sur son paramètre. La documentation nous montre ce qui suit :
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Le paramètre de type F est celui qui nous intéresse ici ; le paramètre de type T est lié à la valeur de retour, et cela ne nous concerne pas. Nous pouvons voir que spawn utilise FnOnce comme contrainte de trait sur F. C’est probablement ce que nous voulons aussi, car nous finirons par passer l’argument que nous recevons dans execute à spawn. Nous pouvons être encore plus confiants que FnOnce est le trait que nous voulons utiliser parce que le thread exécutant une requête n’exécutera la closure de cette requête qu’une seule fois, ce qui correspond au Once de FnOnce.
Le paramètre de type F a aussi la contrainte de trait Send et la contrainte de durée de vie 'static, qui sont utiles dans notre situation : nous avons besoin de Send pour transférer la closure d’un thread à un autre et de 'static parce que nous ne savons pas combien de temps le thread prendra pour s’exécuter. Créons une méthode execute sur ThreadPool qui prendra un paramètre générique de type F avec ces contraintes :
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
Nous utilisons toujours () après FnOnce parce que ce FnOnce représente une closure qui ne prend pas de paramètres et retourné le type unitaire (). Tout comme les définitions de fonctions, le type de retour peut être omis de la signature, mais même si nous n’avons pas de paramètres, nous avons toujours besoin des parenthèses.
Encore une fois, c’est l’implémentation la plus simple de la méthode execute : elle ne fait rien, mais nous essayons seulement de faire compiler notre code. Vérifions-le à nouveau : console {{#include ../listings/ch21-web-server/no-listing-03-define-execute/output.txt}}
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Ça compilé ! Mais notez que si vous essayez cargo run et faites une requête dans le navigateur, vous verrez les erreurs dans le navigateur que nous avons vues au début du chapitre. Notre bibliothèque n’appelle pas encore réellement la closure passée à execute !
Remarque : un dicton que vous pourriez entendre à propos des langages avec des compilateurs stricts, comme Haskell et Rust, est « Si le code compilé, il fonctionne. » Mais ce dicton n’est pas universellement vrai. Notre projet compilé, mais il ne fait absolument rien ! Si nous construisions un vrai projet complet, ce serait le bon moment pour commencer à écrire des tests unitaires pour vérifier que le code compilé et à le comportement que nous voulons.
Réfléchissez : qu’est-ce qui serait différent ici si nous allions exécuter une future au lieu d’une closure ?
Valider le nombre de threads dans new
Nous ne faisons rien avec les paramètres de new et execute. Implémentons les corps de ces fonctions avec le comportement que nous voulons. Pour commencer, pensons à new. Plus tôt, nous avons choisi un type non signé pour le paramètre size parce qu’un pool avec un nombre négatif de threads n’à aucun sens. Cependant, un pool avec zéro thread n’a pas de sens non plus, pourtant zéro est une valeur usize parfaitement valide. Nous ajouterons du code pour vérifier que size est supérieur à zéro avant de retourner une instance de ThreadPool, et nous ferons paniquer le programme s’il reçoit zéro en utilisant la macro assert!, comme montré dans l’encart 21-13.
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool::new to panic if size is zeroNous avons aussi ajouté de la documentation pour notre ThreadPool avec des commentaires de documentation. Notez que nous avons suivi les bonnes pratiques de documentation en ajoutant une section qui signale les situations dans lesquelles notre fonction peut paniquer, comme discuté au chapitre 14. Essayez d’exécuter cargo doc --open et de cliquer sur la structure ThreadPool pour voir à quoi ressemble la documentation générée pour new !
Au lieu d’ajouter la macro assert! comme nous l’avons fait ici, nous pourrions transformer new en build et retourner un Result comme nous l’avons fait avec Config::build dans le projet d’E/S de l’encart 12-9. Mais nous avons décidé dans ce cas qu’essayer de créer un thread pool sans aucun thread devrait être une erreur irrécupérable. Si vous êtes ambitieux, essayez d’écrire une fonction nommée build avec la signature suivante pour la comparer avec la fonction new :
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
Créer l’espace pour stocker les threads
Maintenant que nous avons un moyen de savoir que nous avons un nombre valide de threads à stocker dans le pool, nous pouvons créer ces threads et les stocker dans la structure ThreadPool avant de retourner la structure. Mais comment « stocker » un thread ? Regardons à nouveau la signature de thread::spawn :
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
La fonction spawn retourné un JoinHandle<T>, où T est le type que la closure retourné. Essayons d’utiliser JoinHandle aussi et voyons ce qui se passe. Dans notre cas, les closures que nous passons au thread pool géreront la connexion et ne retourneront rien, donc T sera le type unitaire ().
Le code de l’encart 21-14 compilera, mais il ne crée pas encore de threads. Nous avons changé la définition de ThreadPool pour contenir un vecteur d’instances thread::JoinHandle<()>, initialisé le vecteur avec une capacité de size, mis en place une boucle for qui exécutera du code pour créer les threads, et retourné une instance de ThreadPool les contenant.
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
ThreadPool to hold the threadsNous avons importé std::thread dans la portée du crate de bibliothèque car nous utilisons thread::JoinHandle comme type des éléments du vecteur dans ThreadPool.
Une fois qu’une taille valide est reçue, notre ThreadPool crée un nouveau vecteur qui peut contenir size éléments. La fonction with_capacity effectue la même tâche que Vec::new mais avec une différence importante : elle pré-alloue l’espace dans le vecteur. Parce que nous savons que nous devons stocker size éléments dans le vecteur, faire cette allocation à l’avance est légèrement plus efficace que d’utiliser Vec::new, qui se redimensionne au fur et à mesure que les éléments sont insérés.
Quand vous exécutez cargo check à nouveau, cela devrait réussir.
Envoyer du code du ThreadPool vers un thread
Nous avons laissé un commentaire dans la boucle for de l’encart 21-14 concernant la création de threads. Ici, nous allons voir comment nous créons réellement les threads. La bibliothèque standard fournit thread::spawn comme moyen de créer des threads, et thread::spawn s’attend à recevoir du code que le thread devrait exécuter dès que le thread est créé. Cependant, dans notre cas, nous voulons créer les threads et les faire attendre du code que nous enverrons plus tard. L’implémentation des threads de la bibliothèque standard n’inclut aucun moyen de faire cela ; nous devons l’implémenter manuellement.
Nous allons implémenter ce comportement en introduisant une nouvelle structure de données entre le ThreadPool et les threads qui gérera ce nouveau comportement. Nous appellerons cette structure de données Worker, qui est un terme courant dans les implémentations de pools. Le Worker récupère le code qui doit être exécuté et l’exécute dans son thread.
Pensez aux personnes travaillant en cuisine dans un restaurant : les workers attendent que les commandes arrivent des clients, puis ils sont responsables de prendre ces commandes et de les exécuter.
Au lieu de stocker un vecteur d’instances JoinHandle<()> dans le thread pool, nous stockerons des instances de la structure Worker. Chaque Worker stockera une seule instance de JoinHandle<()>. Ensuite, nous implémenterons une méthode sur Worker qui prendra une closure de code à exécuter et l’enverra au thread déjà en cours d’exécution pour traitement. Nous donnerons aussi à chaque Worker un id pour que nous puissions distinguer les différentes instances de Worker dans le pool lors de la journalisation ou du débogage.
Voici le nouveau processus qui se produira quand nous créerons un ThreadPool. Nous implémenterons le code qui envoie la closure au thread après avoir mis en place le Worker de cette manière : 1. Définir une structure Worker qui contient un id et un JoinHandle<()>. 2. Modifier ThreadPool pour contenir un vecteur d’instances de Worker. 3. Définir une fonction Worker::new qui prend un numéro d’id et retourné une instance de Worker qui contient l’id et un thread créé avec une closure vide. 4. Dans ThreadPool::new, utiliser le compteur de la boucle for pour générer un id, créer un nouveau Worker avec cet id, et stocker le Worker dans le vecteur.
- Définir une structure
Workerqui contient unidet unJoinHandle<()>. - Modifier
ThreadPoolpour contenir un vecteur d’instances deWorker. - Définir une fonction
Worker::newqui prend un numéro d’idet retourné une instance deWorkerqui contient l’idet un thread créé avec une closure vide. - Dans
ThreadPool::new, utiliser le compteur de la boucleforpour générer unid, créer un nouveauWorkeravec cetid, et stocker leWorkerdans le vecteur.
Si vous aimez les défis, essayez d’implémenter ces changements par vous-même avant de regarder le code de l’encart 21-15.
Prêt ? Voici l’encart 21-15 avec une façon de faire les modifications précédentes.
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool to hold Worker instances instead of holding threads directlyNous avons changé le nom du champ sur ThreadPool de threads à workers car il contient maintenant des instances de Worker au lieu d’instances de JoinHandle<()>. Nous utilisons le compteur dans la boucle for comme argument pour Worker::new, et nous stockons chaque nouveau Worker dans le vecteur nommé workers.
Le code externe (comme notre serveur dans src/main.rs) n’a pas besoin de connaître les détails d’implémentation concernant l’utilisation d’une structure Worker au sein de ThreadPool, donc nous rendons la structure Worker et sa fonction new privées. La fonction Worker::new utilise l’id que nous lui donnons et stocké une instance de JoinHandle<()> qui est créée en lançant un nouveau thread avec une closure vide.
Remarque : si le système d’exploitation ne peut pas créer un thread parce qu’il n’y a pas assez de ressources système,
thread::spawnpaniquera. Cela fera paniquer tout notre serveur, même si la création de certains threads aurait pu réussir. Par souci de simplicité, ce comportement est acceptable, mais dans une implémentation de thread pool en production, vous voudriez probablement utiliser [std::thread::Builder][builder] et sa méthode [spawn][builder-spawn] qui retourné unResultà la place.
Ce code compilera et stockera le nombre d’instances de Worker que nous avons spécifié comme argument de ThreadPool::new. Mais nous ne traitons toujours pas la closure que nous recevons dans execute. Voyons comment faire cela ensuite.
Envoyer des requêtes aux threads via des canaux
Le prochain problème que nous allons aborder est que les closures données à thread::spawn ne font absolument rien. Actuellement, nous recevons la closure que nous voulons exécuter dans la méthode execute. Mais nous devons donner à thread::spawn une closure à exécuter quand nous créons chaque Worker lors de la création du ThreadPool.
Nous voulons que les structures Worker que nous venons de créer récupèrent le code à exécuter depuis une file d’attente détenue par le ThreadPool et envoient ce code à leur thread pour l’exécuter.
Les canaux que nous avons appris au chapitre 16 – un moyen simple de communiquer entre deux threads – seraient parfaits pour ce cas d’utilisation. Nous utiliserons un canal pour fonctionner comme la file d’attente de tâches, et execute enverra une tâche du ThreadPool aux instances de Worker, qui enverront la tâche à leur thread. Voici le plan : 1. Le ThreadPool créera un canal et conservera l’émetteur (sender). 2. Chaque Worker conservera le récepteur (receiver). 3. Nous créerons une nouvelle structure Job qui contiendra les closures que nous voulons envoyer à travers le canal. 4. La méthode execute enverra la tâche qu’elle veut exécuter à travers l’émetteur. 5. Dans son thread, le Worker bouclera sur son récepteur et exécutera les closures de toutes les tâches qu’il reçoit.
- Le
ThreadPoolcréera un canal et conservera l’émetteur (sender). - Chaque
Workerconservera le récepteur (receiver). - Nous créerons une nouvelle structure
Jobqui contiendra les closures que nous voulons envoyer à travers le canal. - La méthode
executeenverra la tâche qu’elle veut exécuter à travers l’émetteur. - Dans son thread, le
Workerbouclera sur son récepteur et exécutera les closures de toutes les tâches qu’il reçoit.
Commençons par créer un canal dans ThreadPool::new et en conservant l’émetteur dans l’instance de ThreadPool, comme montré dans l’encart 21-16. La structure Job ne contient rien pour le moment mais sera le type d’élément que nous enverrons à travers le canal.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
ThreadPool to store the sender of a channel that transmits Job instancesDans ThreadPool::new, nous créons notre nouveau canal et faisons en sorte que le pool conserve l’émetteur. Cela compilera avec succès.
Essayons de passer un récepteur du canal à chaque Worker au moment où le thread pool crée le canal. Nous savons que nous voulons utiliser le récepteur dans le thread que les instances de Worker créent, donc nous référencerons le paramètre receiver dans la closure. Le code de l’encart 21-17 ne compilera pas tout à fait encore.
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
WorkerNous avons fait quelques petits changements simples : nous passons le récepteur à Worker::new, puis nous l’utilisons à l’intérieur de la closure.
Quand nous essayons de vérifier ce code, nous obtenons cette erreur : console {{#include ../listings/ch21-web-server/listing-21-17/output.txt}}
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
Le code essaie de passer receiver à plusieurs instances de Worker. Cela ne fonctionnera pas, comme vous vous en souviendrez du chapitre 16 : l’implémentation de canal que Rust fournit est à producteurs multiples, consommateur unique. Cela signifie que nous ne pouvons pas simplement cloner l’extrémité consommatrice du canal pour corriger ce code. Nous ne voulons pas non plus envoyer un message plusieurs fois à plusieurs consommateurs ; nous voulons une liste de messages avec plusieurs instances de Worker de sorte que chaque message soit traité une seule fois.
De plus, retirer une tâche de la file d’attente du canal implique de muter le receiver, donc les threads ont besoin d’un moyen sûr de partager et modifier receiver ; sinon, nous pourrions obtenir des conditions de concurrence (comme abordé au chapitre 16).
Rappelez-vous les pointeurs intelligents thread-safe discutés au chapitre 16 : pour partager la propriété entre plusieurs threads et permettre aux threads de muter la valeur, nous devons utiliser Arc<Mutex<T>>. Le type Arc permettra à plusieurs instances de Worker de posséder le récepteur, et Mutex garantira qu’un seul Worker obtient une tâche du récepteur à la fois. L’encart 21-18 montre les changements que nous devons effectuer.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Worker instances using Arc and MutexDans ThreadPool::new, nous plaçons le récepteur dans un Arc et un Mutex. Pour chaque nouveau Worker, nous clonons l’Arc pour incrémenter le compteur de références afin que les instances de Worker puissent partager la propriété du récepteur.
Avec ces changements, le code compilé ! Nous y arrivons !
Implémenter la méthode execute
Implémentons enfin la méthode execute sur ThreadPool. Nous allons aussi changer Job d’une structure vers un alias de type pour un objet trait qui contient le type de closure que execute reçoit. Comme discuté dans la section [« Synonymes de types et alias de types »][type-aliases] du chapitre 20, les alias de types nous permettent de raccourcir les types longs pour faciliter leur utilisation. Regardez l’encart 21-19.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
Job type alias for a Box that holds each closure and then sending the job down the channelAprès avoir créé une nouvelle instance de Job en utilisant la closure que nous recevons dans execute, nous envoyons cette tâche par l’extrémité émettrice du canal. Nous appelons unwrap sur send pour le cas où l’envoi échouerait. Cela pourrait se produire si, par exemple, nous arrêtions tous nos threads, ce qui signifierait que l’extrémité réceptrice a cessé de recevoir de nouveaux messages. Pour le moment, nous ne pouvons pas arrêter nos threads : nos threads continuent de s’exécuter tant que le pool existe. La raison pour laquelle nous utilisons unwrap est que nous savons que le cas d’échec ne se produira pas, mais le compilateur ne le sait pas.
Mais nous n’avons pas tout à fait fini ! Dans le Worker, notre closure passée à thread::spawn ne fait encore que référencer l’extrémité réceptrice du canal. Au lieu de cela, nous avons besoin que la closure boucle indéfiniment, en demandant à l’extrémité réceptrice du canal une tâche et en exécutant la tâche quand elle en obtient une. Effectuons la modification montrée dans l’encart 21-20 sur Worker::new.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker instance’s threadIci, nous appelons d’abord lock sur le receiver pour acquérir le mutex, puis nous appelons unwrap pour paniquer en cas d’erreur. L’acquisition d’un verrou peut échouer si le mutex est dans un état empoisonné (poisoned), ce qui peut se produire si un autre thread a paniqué tout en détenant le verrou au lieu de le libérer. Dans cette situation, appeler unwrap pour faire paniquer ce thread est l’action correcte à prendre. N’hésitez pas à remplacer cet unwrap par un expect avec un message d’erreur qui a du sens pour vous.
Si nous obtenons le verrou sur le mutex, nous appelons recv pour recevoir un Job du canal. Un dernier unwrap passe outre toute erreur ici aussi, qui pourrait survenir si le thread détenant l’émetteur s’est arrêté, de manière similaire à la façon dont la méthode send retourné Err si le récepteur s’arrête.
L’appel à recv bloque, donc s’il n’y a pas encore de tâche, le thread courant attendra qu’une tâche devienne disponible. Le Mutex<T> garantit qu’un seul thread Worker à la fois essaie de demander une tâche.
Notre thread pool est maintenant en état de fonctionnement ! Lancez un cargo run et faites quelques requêtes :
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Succès ! Nous avons maintenant un thread pool qui exécute les connexions de manière asynchrone. Il n’y a jamais plus de quatre threads créés, donc notre système ne sera pas surchargé si le serveur reçoit beaucoup de requêtes. Si nous faisons une requête vers /sleep, le serveur pourra servir d’autres requêtes en faisant exécuter celles-ci par un autre thread.
Remarque : si vous ouvrez /sleep dans plusieurs fenêtres de navigateur simultanément, elles pourraient se charger une à la fois à des intervalles de cinq secondes. Certains navigateurs web exécutent séquentiellement plusieurs instances de la même requête pour des raisons de mise en cache. Cette limitation n’est pas causée par notre serveur web.
C’est un bon moment pour faire une pause et considérer comment le code des encarts 21-18, 21-19 et 21-20 serait différent si nous utilisions des futures au lieu d’une closure pour le travail à effectuer. Quels types changeraient ? Comment les signatures des méthodes seraient-elles différentes, si elles l’étaient ? Quelles parties du code resteraient les mêmes ?
Après avoir appris la boucle while let aux chapitres 17 et 19, vous vous demandez peut-être pourquoi nous n’avons pas écrit le code du thread Worker comme montré dans l’encart 21-21.
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
Worker::new using while letCe code compilé et s’exécute mais ne produit pas le comportement de threading souhaité : une requête lente fera toujours attendre les autres requêtes pour être traitées. La raison est assez subtile : la structure Mutex n’a pas de méthode publique unlock car la propriété du verrou est basée sur la durée de vie du MutexGuard<T> au sein du LockResult<MutexGuard<T>> que la méthode lock retourné. Au moment de la compilation, le vérificateur d’emprunt peut alors appliquer la règle selon laquelle une ressource protégée par un Mutex ne peut pas être accédée à moins que nous détenions le verrou. Cependant, cette implémentation peut aussi faire en sorte que le verrou soit détenu plus longtemps que prévu si nous ne sommes pas attentifs à la durée de vie du MutexGuard<T>.
Le code de l’encart 21-20 qui utilise let job = receiver.lock().unwrap().recv().unwrap(); fonctionne car avec let, toutes les valeurs temporaires utilisées dans l’expression du côté droit du signé égal sont immédiatement libérées quand l’instruction let se terminé. Cependant, while let (et if let et match) ne libère pas les valeurs temporaires avant la fin du bloc associé. Dans l’encart 21-21, le verrou reste détenu pendant toute la durée de l’appel à job(), ce qui signifie que les autres instances de Worker ne peuvent pas recevoir de tâches.