Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- use std::sync::Arc;
- use tokio::sync::{mpsc, Mutex};
- use tokio::time::Duration;
- #[derive(Debug)]
- struct Task {
- id: u64,
- data: String,
- }
- impl Task {
- pub fn new(id: u64, data: String) -> Self {
- Task { id, data }
- }
- pub async fn execute(&self) -> Result<(), String> {
- println!("Processing task {}: {}", self.id, self.data);
- // Process the task (simulate with sleep)
- tokio::time::sleep(Duration::from_millis(100)).await;
- // For demonstration, we will consider odd task IDs to be successful and even ones to fail.
- if self.id % 2 == 0 {
- Err(format!("Task {} failed", self.id))
- } else {
- Ok(())
- }
- }
- }
- struct TaskQueue {
- tasks: Mutex<Vec<Task>>,
- dead_tasks: Mutex<Vec<Task>>,
- notifier: mpsc::Sender<()>,
- }
- impl TaskQueue {
- pub fn new() -> (Self, mpsc::Receiver<()>) {
- let (tx, rx) = mpsc::channel(100);
- (
- TaskQueue {
- tasks: Mutex::new(Vec::new()),
- dead_tasks: Mutex::new(Vec::new()),
- notifier: tx,
- },
- rx,
- )
- }
- pub async fn enqueue(&self, task: Task) {
- let mut tasks = self.tasks.lock().await;
- tasks.push(task);
- self.notifier.send(()).await.unwrap();
- }
- pub async fn dequeue(&self) -> Option<Task> {
- let mut tasks = self.tasks.lock().await;
- tasks.pop()
- }
- pub async fn add_dead_task(&self, task: Task) {
- let mut dead_tasks = self.dead_tasks.lock().await;
- dead_tasks.push(task);
- }
- }
- #[tokio::main]
- async fn main() {
- let (queue, mut task_receiver) = TaskQueue::new();
- let queue = Arc::new(queue);
- // Spawn 3 worker tasks
- let mut workers = vec![];
- for worker_id in 1..=3 {
- let queue_clone = Arc::clone(&queue);
- let mut task_receiver_clone = task_receiver.clone();
- let worker = tokio::spawn(async move {
- while let Some(_) = task_receiver_clone.recv().await {
- if let Some(task) = queue_clone.dequeue().await {
- let result = task.execute().await;
- if let Err(_) = result {
- println!("Worker {}: Task {} failed, adding to dead_tasks", worker_id, task.id);
- queue_clone.add_dead_task(task).await;
- } else {
- println!("Worker {}: Task {} completed", worker_id, task.id);
- }
- }
- }
- println!("Worker {} stopped.", worker_id);
- });
- workers.push(worker);
- }
- // Add tasks to the queue
- for i in 1..=10 {
- queue.enqueue(Task::new(i, format!("Task {}", i))).await;
- tokio::time::sleep(Duration::from_millis(
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement