Advertisement
umuro

Untitled

Apr 10th, 2023
2,274
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 2.83 KB | Software | 0 0
  1. use std::sync::Arc;
  2. use tokio::sync::{mpsc, Mutex};
  3. use tokio::time::Duration;
  4.  
  5. #[derive(Debug)]
  6. struct Task {
  7.     id: u64,
  8.     data: String,
  9. }
  10.  
  11. impl Task {
  12.     pub fn new(id: u64, data: String) -> Self {
  13.         Task { id, data }
  14.     }
  15.  
  16.     pub async fn execute(&self) -> Result<(), String> {
  17.         println!("Processing task {}: {}", self.id, self.data);
  18.         // Process the task (simulate with sleep)
  19.         tokio::time::sleep(Duration::from_millis(100)).await;
  20.  
  21.         // For demonstration, we will consider odd task IDs to be successful and even ones to fail.
  22.         if self.id % 2 == 0 {
  23.             Err(format!("Task {} failed", self.id))
  24.         } else {
  25.             Ok(())
  26.         }
  27.     }
  28. }
  29.  
  30. struct TaskQueue {
  31.     tasks: Mutex<Vec<Task>>,
  32.     dead_tasks: Mutex<Vec<Task>>,
  33.     notifier: mpsc::Sender<()>,
  34. }
  35.  
  36. impl TaskQueue {
  37.     pub fn new() -> (Self, mpsc::Receiver<()>) {
  38.         let (tx, rx) = mpsc::channel(100);
  39.         (
  40.             TaskQueue {
  41.                 tasks: Mutex::new(Vec::new()),
  42.                 dead_tasks: Mutex::new(Vec::new()),
  43.                 notifier: tx,
  44.             },
  45.             rx,
  46.         )
  47.     }
  48.  
  49.     pub async fn enqueue(&self, task: Task) {
  50.         let mut tasks = self.tasks.lock().await;
  51.         tasks.push(task);
  52.         self.notifier.send(()).await.unwrap();
  53.     }
  54.  
  55.     pub async fn dequeue(&self) -> Option<Task> {
  56.         let mut tasks = self.tasks.lock().await;
  57.         tasks.pop()
  58.     }
  59.  
  60.     pub async fn add_dead_task(&self, task: Task) {
  61.         let mut dead_tasks = self.dead_tasks.lock().await;
  62.         dead_tasks.push(task);
  63.     }
  64. }
  65.  
  66. #[tokio::main]
  67. async fn main() {
  68.     let (queue, mut task_receiver) = TaskQueue::new();
  69.     let queue = Arc::new(queue);
  70.  
  71.     // Spawn 3 worker tasks
  72.     let mut workers = vec![];
  73.  
  74.     for worker_id in 1..=3 {
  75.         let queue_clone = Arc::clone(&queue);
  76.         let mut task_receiver_clone = task_receiver.clone();
  77.         let worker = tokio::spawn(async move {
  78.             while let Some(_) = task_receiver_clone.recv().await {
  79.                 if let Some(task) = queue_clone.dequeue().await {
  80.                     let result = task.execute().await;
  81.                     if let Err(_) = result {
  82.                         println!("Worker {}: Task {} failed, adding to dead_tasks", worker_id, task.id);
  83.                         queue_clone.add_dead_task(task).await;
  84.                     } else {
  85.                         println!("Worker {}: Task {} completed", worker_id, task.id);
  86.                     }
  87.                 }
  88.             }
  89.             println!("Worker {} stopped.", worker_id);
  90.         });
  91.         workers.push(worker);
  92.     }
  93.  
  94.     // Add tasks to the queue
  95.     for i in 1..=10 {
  96.         queue.enqueue(Task::new(i, format!("Task {}", i))).await;
  97.         tokio::time::sleep(Duration::from_millis(
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement