Advertisement
MetonymyQT

Tokio Rust Concurrency Attempt 2

Apr 6th, 2025
432
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 4.76 KB | Software | 0 0
  1. use anyhow::{Result, anyhow};
  2. use log::{debug, error, info};
  3. use signal_hook::consts::{SIGINT, SIGTERM};
  4. use signal_hook::iterator::Signals;
  5. use std::ops::Deref;
  6. use std::sync::Arc;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use std::thread;
  9. use std::time::Duration;
  10. use tokio::fs::File;
  11. use tokio::io::AsyncWriteExt;
  12. use tokio::sync::mpsc;
  13. use tokio::sync::mpsc::Sender;
  14. use tokio::sync::oneshot;
  15. use tokio::task::JoinHandle;
  16. use tokio::time::{sleep, timeout};
  17.  
  18. fn setup_graceful_shutdown(cancelled: oneshot::Sender<bool>)  {
  19.     thread::spawn(move || {
  20.         let signals = Signals::new([SIGINT, SIGTERM]);
  21.         match signals {
  22.             Ok(mut signal_info) => {
  23.                 if signal_info.forever().next().is_some() {
  24.                     info!("user request cancellation");
  25.                     let _ = cancelled.send(true);
  26.                     return;
  27.                 }
  28.             }
  29.             Err(error) => {
  30.                 error!("Failed to setup signal handler: {error}")
  31.             }
  32.         }
  33.     });
  34. }
  35.  
  36. async fn download_image(tx: Sender<Result<bytes::Bytes, anyhow::Error>>, url: &str) {
  37.     info!("Downloading image for {}", url);
  38.     let resp = reqwest::get(url).await;
  39.     if let Ok(resp) = resp {
  40.         let image_bytes = resp.bytes().await;
  41.         match image_bytes {
  42.             Ok(data) => {
  43.                 tx.send(Ok(data)).await.expect("failed to send message");
  44.             }
  45.             Err(err) => {
  46.                 let send_result = tx.send(Err(anyhow!("failed to get image data: {}", err)))
  47.                     .await;
  48.  
  49.                 debug!("download image ok {:?} for url {}", send_result, url)
  50.             }
  51.         }
  52.     } else {
  53.         let send_result = tx.send(Err(anyhow!("failed to get response")))
  54.             .await;
  55.  
  56.         debug!("download image error {:?}  for url {}", send_result, url)
  57.     }
  58. }
  59.  
  60. #[tokio::main]
  61. async fn main() -> Result<()> {
  62.     env_logger::init();
  63.  
  64.     let task_timeout = Duration::from_millis(500);
  65.  
  66.     let (tx, mut rx) = mpsc::channel(32);
  67.     let (cancel_tx, mut cancel_rx) = oneshot::channel::<bool>();
  68.     let (done_tx, mut done_rx) = oneshot::channel::<bool>();
  69.     setup_graceful_shutdown(cancel_tx);
  70.  
  71.     let urls: Vec<&str> = vec![
  72.         "https://images.unsplash.com/photo-1739117956532-8a37c76093dd?fm=jpg",
  73.         "https://images.unsplash.com/photo-1735930371721-d1243e8122ab?fm=jpg",
  74.         "https://images.unsplash.com/photo-1732204662871-f853329ce638?fm=jpg",
  75.         "https://images.unsplash.com/photo-1712677925320-5f93d7f853f5?fm=jpg",
  76.         "https://images.unsplash.com/photo-1712677925900-12574cb3cf15?fm=jpg",
  77.         "https://images.unsplash.com/photo-1710436000845-bb707af976a6?fm=jpg",
  78.         "https://images.unsplash.com/photo-1700386277812-5ff74eb03650?fm=jpg",
  79.         "https://images.unsplash.com/photo-1700386277415-c9c5270ee8b8?fm=jpg",
  80.         "https://images.unsplash.com/photo-1691947563165-28011f40d786?fm=jpg",
  81.         "https://images.unsplash.com/photo-1691782359338-97fcbcce8ce8?fm=jpg",
  82.     ];
  83.  
  84.     let mut handles: Vec<JoinHandle<()>> = Vec::new();
  85.     for url in urls {
  86.         let tx = tx.clone();
  87.         let handle = tokio::spawn(async move {
  88.             let timeout_status = timeout(task_timeout, download_image(tx, url)).await;
  89.             debug!("timeout status = {:?} for url {}", timeout_status, url)
  90.         });
  91.         handles.push(handle);
  92.     }
  93.  
  94.     let wait_task = tokio::spawn(async move {
  95.         for handle in handles {
  96.             let _ = handle.await;
  97.         }
  98.         drop(tx);
  99.         let _ = done_tx.send(true);
  100.     });
  101.  
  102.     let write_task = tokio::spawn(async move {
  103.         let mut counter = 1;
  104.         while let Some(message) = rx.recv().await {
  105.             if let Ok(image_bytes) = message {
  106.  
  107.                 let result = timeout(
  108.                     task_timeout,
  109.                     tokio::spawn(async move {
  110.                         let file = File::create(format!("{}.jpg", counter)).await;
  111.                         if let Ok(mut file) = file {
  112.                             let _ = file.write_all(&image_bytes).await;
  113.                             info!("Wrote image {}", counter);
  114.                         } else {
  115.                             error!("Failed to write image {}", counter)
  116.                         }
  117.                     }),
  118.                 )
  119.                 .await;
  120.  
  121.                 debug!("write task timeout result {:?}", task_timeout);
  122.  
  123.                 counter += 1;
  124.             }
  125.         }
  126.     });
  127.  
  128.     tokio::select! {
  129.         _ = cancel_rx => {
  130.             info!("computation cancelled");
  131.             wait_task.abort();
  132.             write_task.abort();
  133.         }
  134.         _ = done_rx => {
  135.             info!("operation completed");
  136.         }
  137.     }
  138.  
  139.     Ok(())
  140. }
  141.  
Tags: rust tokio
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement