Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Arrêt gracieux et nettoyage

Le code de l’encart 21-20 répond aux requêtes de manière asynchrone grâce à l’utilisation d’un thread pool, comme nous le souhaitions. Nous recevons des avertissements concernant les champs workers, id et thread que nous n’utilisons pas directement, ce qui nous rappelle que nous ne nettoyons rien. Quand nous utilisons la méthode peu élégante ctrl-C pour arrêter le thread principal, tous les autres threads sont arrêtés immédiatement aussi, même s’ils sont en train de traiter une requête.

Ensuite, nous allons implémenter le trait Drop pour appeler join sur chacun des threads du pool afin qu’ils puissent terminer les requêtes sur lesquelles ils travaillent avant de se fermer. Puis, nous implémenterons un moyen de dire aux threads qu’ils devraient arrêter d’accepter de nouvelles requêtes et s’arrêter. Pour voir ce code en action, nous modifierons notre serveur pour n’accepter que deux requêtes avant d’arrêter proprement son thread pool.

Une chose à remarquer au passage : rien de tout cela n’affecte les parties du code qui gèrent l’exécution des closures, donc tout ce qui est ici serait identique si nous utilisions un thread pool pour un runtime async.

Implémenter le trait Drop sur ThreadPool

Commençons par implémenter Drop sur notre thread pool. Quand le pool est libéré, tous nos threads devraient se joindre (join) pour s’assurer qu’ils terminent leur travail. L’encart 21-22 montre une première tentative d’implémentation de Drop ; ce code ne fonctionnera pas tout à fait encore.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-22: Joining each thread when the thread pool goes out of scope

D’abord, nous bouclons sur chaque worker du thread pool. Nous utilisons &mut pour cela car self est une référence mutable, et nous devons aussi pouvoir muter worker. Pour chaque worker, nous affichons un message disant que cette instance particulière de Worker s’arrête, puis nous appelons join sur le thread de cette instance de Worker. Si l’appel à join échoue, nous utilisons unwrap pour faire paniquer Rust et entrer dans un arrêt non propre.

Voici l’erreur que nous obtenons quand nous compilons ce code : console {{#include ../listings/ch21-web-server/listing-21-22/output.txt}}

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/1159e78c4747b02ef996e55082b704c09b970588/library/std/src/thread/mod.rs:1921:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

L’erreur nous dit que nous ne pouvons pas appeler join parce que nous n’avons qu’un emprunt mutable de chaque worker et que join prend la propriété de son argument. Pour résoudre ce problème, nous devons déplacer le thread hors de l’instance de Worker qui possède thread afin que join puisse consommer le thread. Une façon de faire cela est de prendre la même approche que celle de l’encart 18-15. Si Worker contenait un Option<thread::JoinHandle<()>>, nous pourrions appeler la méthode take sur l’Option pour déplacer la valeur hors de la variante Some et laisser une variante None à sa place. En d’autres termes, un Worker en cours d’exécution aurait une variante Some dans thread, et quand nous voudrions nettoyer un Worker, nous remplacerions Some par None pour que le Worker n’ait plus de thread à exécuter.

Cependant, le seul moment où cela se produirait serait lors de la libération du Worker. En contrepartie, nous devrions gérer un Option<thread::JoinHandle<()>> partout où nous accéderions à worker.thread. Le Rust idiomatique utilise beaucoup Option, mais quand vous vous retrouvez à envelopper quelque chose que vous savez toujours présent dans un Option comme solution de contournement comme celle-ci, c’est une bonne idée de chercher des approches alternatives pour rendre votre code plus propre et moins sujet aux erreurs.

Dans ce cas, une meilleure alternative existe : la méthode Vec::drain. Elle accepte un paramètre de plage pour spécifier quels éléments retirer du vecteur et retourné un itérateur de ces éléments. Passer la syntaxe de plage .. retirera chaque valeur du vecteur.

Donc, nous devons mettre à jour l’implémentation de drop du ThreadPool comme ceci :

Filename: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

Cela résout l’erreur du compilateur et ne nécessite aucune autre modification de notre code. Notez que, comme drop peut être appelé lors d’un panic, le unwrap pourrait aussi paniquer et causer un double panic, ce qui fait immédiatement planter le programme et met fin à tout nettoyage en cours. C’est acceptable pour un programme d’exemple, mais ce n’est pas recommandé pour du code en production.

Signaler aux threads d’arrêter d’écouter les tâches

Avec toutes les modifications que nous avons faites, notre code compilé sans aucun avertissement. Cependant, la mauvaise nouvelle est que ce code ne fonctionne pas encore comme nous le voulons. La clé est la logique dans les closures exécutées par les threads des instances de Worker : pour le moment, nous appelons join, mais cela n’arrêtera pas les threads, car ils bouclent (loop) indéfiniment à la recherche de tâches. Si nous essayons de libérer notre ThreadPool avec notre implémentation actuelle de drop, le thread principal se bloquera indéfiniment, en attendant que le premier thread se terminé.

Pour corriger ce problème, nous aurons besoin d’un changement dans l’implémentation de drop du ThreadPool puis d’un changement dans la boucle du Worker.

D’abord, nous allons modifier l’implémentation de drop du ThreadPool pour libérer explicitement le sender avant d’attendre que les threads se terminent. L’encart 21-23 montre les modifications apportées au ThreadPool pour libérer explicitement sender. Contrairement au thread, ici nous avons besoin d’utiliser un Option pour pouvoir déplacer sender hors du ThreadPool avec Option::take.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
Listing 21-23: Explicitly dropping sender before joining the Worker threads

Libérer sender ferme le canal, ce qui indique qu’aucun autre message ne sera envoyé. Quand cela se produit, tous les appels à recv que les instances de Worker font dans la boucle infinie retourneront une erreur. Dans l’encart 21-24, nous modifions la boucle du Worker pour sortir proprement de la boucle dans ce cas, ce qui signifie que les threads se termineront quand l’implémentation de drop du ThreadPool appellera join sur eux.

Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}
Listing 21-24: Explicitly breaking out of the loop when recv returns an error

Pour voir ce code en action, modifions main pour n’accepter que deux requêtes avant d’arrêter proprement le serveur, comme montré dans l’encart 21-25.

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r
Content-Length: {length}\r
\r
{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-25: Shutting down the server after serving two requests by exiting the loop

Vous ne voudriez pas qu’un serveur web réel s’arrête après avoir servi seulement deux requêtes. Ce code démontre simplement que l’arrêt propre et le nettoyage fonctionnent correctement.

La méthode take est définie dans le trait Iterator et limite l’itération aux deux premiers éléments au maximum. Le ThreadPool sortira de la portée à la fin de main, et l’implémentation de drop s’exécutera.

Démarrez le serveur avec cargo run et faites trois requêtes. La troisième requête devrait échouer, et dans votre terminal, vous devriez voir une sortie similaire à ceci :

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Vous pourriez voir un ordre différent des identifiants de Worker et des messages affichés. Nous pouvons voir comment ce code fonctionne d’après les messages : les instances de Worker 0 et 3 ont reçu les deux premières requêtes. Le serveur a arrêté d’accepter les connexions après la deuxième connexion, et l’implémentation de Drop sur ThreadPool commence à s’exécuter avant même que Worker 3 ne commence sa tâche. La libération du sender déconnecte toutes les instances de Worker et leur dit de s’arrêter. Les instances de Worker affichent chacune un message quand elles se déconnectent, puis le thread pool appelle join pour attendre que chaque thread Worker se terminé.

Remarquez un aspect intéressant de cette exécution particulière : le ThreadPool a libéré le sender, et avant qu’aucun Worker n’ait reçu d’erreur, nous avons essayé de joindre Worker 0. Worker 0 n’avait pas encore reçu d’erreur de recv, donc le thread principal s’est bloqué, en attendant que Worker 0 se terminé. Entre-temps, Worker 3 a reçu une tâche et ensuite tous les threads ont reçu une erreur. Quand Worker 0 a terminé, le thread principal a attendu que le reste des instances de Worker se terminé. À ce moment-là, elles avaient toutes quitté leurs boucles et s’étaient arrêtées.

Félicitations ! Nous avons maintenant terminé notre projet ; nous avons un serveur web basique qui utilise un thread pool pour répondre de manière asynchrone. Nous sommes capables d’effectuer un arrêt propre du serveur, qui nettoie tous les threads du pool.

Voici le code complet pour référence :

Filename: src/main.rs
use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r
Content-Length: {length}\r
\r
{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
Filename: src/lib.rs
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Nous pourrions faire plus ici ! Si vous voulez continuer à améliorer ce projet, voici quelques idées :

  • Ajouter plus de documentation à ThreadPool et à ses méthodes publiques.
  • Ajouter des tests de la fonctionnalité de la bibliothèque.
  • Remplacer les appels à unwrap par une gestion d’erreurs plus robuste.
  • Utiliser ThreadPool pour effectuer une tâche autre que servir des requêtes web.
  • Trouver une crate de thread pool sur crates.io et implémenter un serveur web similaire en utilisant cette crate à la place. Ensuite, comparer son API et sa robustesse au thread pool que nous avons implémenté.

Résumé

Bien joué ! Vous êtes arrivé à la fin du livre ! Nous voulons vous remercier de nous avoir accompagnés dans cette visite de Rust. Vous êtes maintenant prêt à implémenter vos propres projets Rust et à aider dans les projets des autres. Gardez à l’esprit qu’il existe une communauté accueillante d’autres Rustacés qui seraient ravis de vous aider avec tous les défis que vous rencontrerez dans votre parcours Rust.