Canales

Un canal es una forma fácil de utilizar muchos hilos que envían información a un único punto. Son muy populares porque son simples de usar. En Rust, se puede crear un canal mediante std::sync::mpsc. mpsc significa "múltiple productor, solo un consumidor". Es decir, "muchos hilos enviando a un único lugar". Para iniciar un canal se utiliza channel(), que sirve para rear un Sender (un proceso que envía) y un Receiver (un proceso que recibe) que están unidos. Se puede observar en la declaración de la función:


#![allow(unused)]
fn main() {
// 🚧
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
}

Normalmente, se asignan a unas variables cada elemento de la tupla con nombres como sender y receiver. O, en español, emisor y receptor. Como se trata de una función genérica, si se escribe solo lo siguiente:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel(); // ⚠️
}

Rust no conocerá el tipo que se envía y recibe. Así que el compilador indica:

error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
  --> src\main.rs:30:30
   |
30 |     let (sender, receiver) = channel();
   |         ------------------   ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
   |         |
   |         consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified

Que indica que es necesario añadir un tipo para el Sender y el Receiver. Se puede hacer de la siguiente forma:

use std::sync::mpsc::{channel, Sender, Receiver};

fn main() {
    let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel();
}

o

use std::sync::mpsc::{channel, Sender, Receiver};

fn main() {
    let (sender, receiver) = channel::<i32>();
}

o simplemente, se envía algo a través del Sender en el código, para que Rust pueda inferir su tipo:

use std::sync::mpsc::{channel, Sender, Receiver};

fn main() {
    let (sender, receiver) = channel();
    sender.send(5);
    receiver.recv(); //recv es la función que sirve para recibir el valor. En este caso, no se usa.
}

Así, el compilador conoce el tipo. sender es de tipo Result<(), SendError<i32>> y receiver es de tipo Result<(), RecvError>. Se puede usar unwrap() para ver si el envío ha funcionado. Por ejemplo, se el siguiente código comprueba si ha funcionado el envío:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5).unwrap();
    println!("{}", receiver.recv().unwrap());
}

Que imprime 5.

Un canal es como un Arc ya que se puede clonar y enviar los clones a otros hilos. En el siguiente ejemplo se crean dos hilos que envían valores a un receiver. Este código funcionará, aunque no exactamente como se pretende:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();

    std::thread::spawn(move|| { // mueve un sender 
        sender.send("Envía un &str esta vez").unwrap();
    });

    std::thread::spawn(move|| { // mueve un sender_clone
        sender_clone.send("Y aquí otro &str").unwrap();
    });

    println!("{}", receiver.recv().unwrap());   
}

Los dos hilos comienzan a enviar (en cualquier orden) y después se imprime lo primero que se recibe. Por lo tanto, el resultado de la impresión puede variar cada vez, dependiendo del hilo que envíe primero.

Para ver el resultado completo de cada hilo, resulta conveniente guardar los manejadores de ejecutar tantas recepciones como sean necesarias.

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![]; // Se guardan los manejadores

    handle_vec.push(std::thread::spawn(move|| {  // push del primero
        sender.send("Envía un &str esta vez").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {  // y push del segundo
        sender_clone.send("Y aquí otro &str").unwrap();
    }));

    for _ in handle_vec { // ahora handle_vec tiene 2 elementos. Como cada uno envía un dato, se hacen tantos print como manejadores
        println!("{:?}", receiver.recv().unwrap());
    }
}

El orden de los str puede variar en función del orden en que se ejecutan los envíos.

En el siguiente ejemplo, se crea un vector con los resultados, en lugar de imprimirlos directamente:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![]; // Se guardan los manejadores
    let mut results_vec = vec![];

    handle_vec.push(std::thread::spawn(move|| {  // push del primero
        sender.send("Envía un &str esta vez").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {  // y push del segundo
        sender_clone.send("Y aquí otro &str").unwrap();
    }));

    for _ in handle_vec { // ahora handle_vec tiene 2 elementos. Como cada uno envía un dato, alamcenan tantos como manejadores
        results_vec.push(receiver.recv().unwrap());
    }

    println!("{:?}", results_vec);
}

El resultado del vector es ["Envía un &str esta vez", "Y aquí otro &str"].

El siguiente ejemplo utiliza diez hilos para modificar un vector de un millón de elementos. Al comienzo, todos los elementos del vector valen cero. Se modifican todos los valores a 1. Cada hilo hará la décima parte del trabjo. Se creará un nuevo vector y se usará .extend() para guardar el resultado de cada hilo.

use std::sync::mpsc::channel;
use std::thread::spawn;

fn main() {
    let (sender, receiver) = channel();
    let hugevec = vec![0; 1_000_000];
    let mut newvec = vec![];
    let mut handle_vec = vec![];

    for i in 0..10 {
        let sender_clone = sender.clone();
        let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // el nuevo vec en el que guardar el resultado de la décima parte del trabajo
        work.extend(&hugevec[i*100_000..(i+1)*100_000]); // el primero trabaja de 0..100_000, el siguiente de 100_000..200_000, etc.
        let handle =spawn(move || { // crea un manejador

            for number in work.iter_mut() { // efectúa el trabajo
                *number += 1;
            };
            sender_clone.send(work).unwrap(); // utiliza el sender_clone para enviar el trabajo al receptor
        });
        handle_vec.push(handle);
    }
    
    for handle in handle_vec { // espera a que todos los hilos terminen
        handle.join().unwrap();
    }
    
    while let Ok(results) = receiver.try_recv() {
        newvec.push(results); // guarda los resultados en receiver.recv() en el vector final
    }

    // Ahora se tiene un Vec<Vec<u8>>. se debe aplanar con .flatten()
    let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // Ahora es un vector de 1_000_000 u8 números. El orden de cada décima parte puede no coincidir aunque en este caso no se nota debido a que todos valen cero inicialmente
    
    println!("{:?}, {:?}, longitud total: {}", // se imprimen algunos para comprobar que contienen un 1
        &newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // y se muestra que la longitud es de  1_000_000 de elementos
    );
    
    for number in newvec { // Si algúnvalor no fuese 1 se entra en pánico
        if number != 1 {
            panic!();
        }
    }
}

Para que este ejemplo mantuviese el orden de los elementos originales (en caso de no fuesen todos con el mismo valor), sería necesario que también se trabajara con un índice de cada sección para reordenar en el resultado antes de aplanar el vector final.