w3resource

Rust Producer-Consumer Pattern with Channels

Rust Error Propagation: Exercise-4 with Solution

Write a Rust program that implements a producer-consumer pattern in Rust using channels. Create one producer thread and multiple consumer threads. The producer thread generates data and sends it through the channel, and the consumer threads receive and process the data.

Sample Solution:

Rust Code:

use std::sync::{Arc, Mutex}; // Importing necessary libraries for synchronization
use std::sync::mpsc; // Importing the channel for inter-thread communication
use std::thread; // Importing the threading functionality

const NUM_CONSUMERS: usize = 3; // Defining the number of consumer threads
const NUM_MESSAGES: usize = 10; // Defining the number of messages to be produced

fn main() {
    // Create a channel for communication between producer and consumers
    let (sender, receiver) = mpsc::channel();

    // Wrap the receiver in an Arc and a Mutex for safe sharing among multiple threads
    let receiver = Arc::new(Mutex::new(receiver));

    // Spawn multiple consumer threads
    let mut handles = vec![];
    for i in 0..NUM_CONSUMERS {
        let receiver = Arc::clone(&receiver);
        let handle = thread::spawn(move || {
            consume_messages(i, receiver);
        });
        handles.push(handle);
    }

    // Start the producer thread
    thread::spawn(move || {
        produce_messages(sender);
    });

    // Wait for all consumer threads to finish
    for handle in handles {
        handle.join().unwrap();
    }
}

// Function to simulate message production
fn produce_messages(sender: mpsc::Sender<String>) {
    for i in 0..NUM_MESSAGES {
        let message = format!("Message {}", i);
        sender.send(message.clone()).unwrap(); // Clone the message before sending
        println!("Producer sent: {}", message);
        thread::sleep(std::time::Duration::from_millis(500)); // Simulate message production delay
    }
}

// Function to consume messages received from the channel
fn consume_messages(id: usize, receiver: Arc<Mutex<mpsc::Receiver<String>>>) {
    loop {
        let message = receiver.lock().unwrap().recv(); // Receive message from the channel
        match message {
            Ok(msg) => {
                println!("Consumer {} received: {}", id, msg); // Print the received message
                thread::sleep(std::time::Duration::from_secs(1)); // Simulate message processing
            }
            Err(_) => break, // Break the loop if channel is closed
        }
    }
}

Output:

Producer sent: Message 0
Consumer 0 received: Message 0
Producer sent: Message 1
Consumer 1 received: Message 1
Producer sent: Message 2
Consumer 2 received: Message 2
Producer sent: Message 3
Consumer 0 received: Message 3
Producer sent: Message 4
Consumer 1 received: Message 4
Producer sent: Message 5
Consumer 2 received: Message 5
Producer sent: Message 6
Consumer 0 received: Message 6
Producer sent: Message 7
Consumer 1 received: Message 7
Producer sent: Message 8
Consumer 2 received: Message 8
Producer sent: Message 9
Consumer 0 received: Message 9

Explanation:

In the exercise above,

  • Imports:
    • The "Arc" and "Mutex" are used for safely sharing the receiver among multiple threads.
    • mpsc provides the multi-producer, single-consumer channel for inter-thread communication.
    • thread is used for creating threads.
  • Constants:
    • 'NUM_CONSUMERS' defines the number of consumer threads.
    • 'NUM_MESSAGES' defines the number of messages to be produced.
  • Main Function:
    • Create a channel (sender, receiver) for communication.
    • Wrap the receiver in an 'Arc' and 'Mutex' for safe sharing among threads.
    • Spawns multiple consumer threads, passing a clone of the receiver to each.
    • Starts the producer thread, passing the sender.
    • Waits for all consumer threads to finish.
  • Producer Function (produce_messages):
    • Generates messages and sends them through the channel.
    • Clone the message before sending.
    • Prints the sent message.
    • Introduce a delay to simulate message production.
  • Consumer Function (consume_messages):
    • Receives messages from the channel in a loop.
    • Prints the received message along with the consumer's ID.
    • Simulates message processing with a delay.
    • Breaks the loop if the channel is closed.

Rust Code Editor:

Previous: Rust Multi-Sender, Single-Receiver Communication.
Next: Simulating a Message Passing Network in Rust.

What is the difficulty level of this exercise?

Test your Programming skills with w3resource's quiz.



Follow us on Facebook and Twitter for latest update.