Advertisement
MetonymyQT

Tokio Rust Concurrency Attempt 3

Apr 6th, 2025
392
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 4.95 KB | Software | 0 0
  1. use anyhow::{Result, anyhow};
  2. use log::{debug, error, info};
  3. use signal_hook::consts::{SIGINT, SIGTERM};
  4. use signal_hook::iterator::Signals;
  5. use std::ops::Deref;
  6. use std::sync::Arc;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use std::thread;
  9. use std::time::Duration;
  10. use tokio::fs::File;
  11. use tokio::io::AsyncWriteExt;
  12. use tokio::sync::mpsc;
  13. use tokio::sync::mpsc::Sender;
  14. use tokio::sync::oneshot;
  15. use tokio::task::JoinHandle;
  16. use tokio::time::{sleep, timeout};
  17.  
  18. fn setup_graceful_shutdown(cancelled: oneshot::Sender<bool>) -> JoinHandle<()> {
  19.     tokio::spawn(async move {
  20.         let signals = Signals::new([SIGINT, SIGTERM]);
  21.         match signals {
  22.             Ok(mut signal_info) => {
  23.                 loop {
  24.                     if signal_info.pending().next().is_some() {
  25.                         info!("user request cancellation");
  26.                         let _ = cancelled.send(true);
  27.                         return;
  28.                     }
  29.                     sleep(Duration::from_millis(250)).await;
  30.                 }
  31.             }
  32.             Err(error) => {
  33.                 error!("Failed to setup signal handler: {error}")
  34.             }
  35.         }
  36.     })
  37. }
  38.  
  39. async fn download_image(tx: Sender<Result<bytes::Bytes, anyhow::Error>>, url: &str) {
  40.     info!("Downloading image for {}", url);
  41.     let resp = reqwest::get(url).await;
  42.     if let Ok(resp) = resp {
  43.         let image_bytes = resp.bytes().await;
  44.         match image_bytes {
  45.             Ok(data) => {
  46.                 tx.send(Ok(data)).await.expect("failed to send message");
  47.             }
  48.             Err(err) => {
  49.                 let send_result = tx.send(Err(anyhow!("failed to get image data: {}", err)))
  50.                     .await;
  51.  
  52.                 debug!("download image ok {:?} for url {}", send_result, url)
  53.             }
  54.         }
  55.     } else {
  56.         let send_result = tx.send(Err(anyhow!("failed to get response")))
  57.             .await;
  58.  
  59.         debug!("download image error {:?}  for url {}", send_result, url)
  60.     }
  61. }
  62.  
  63. #[tokio::main]
  64. async fn main() -> Result<()> {
  65.     env_logger::init();
  66.  
  67.     let task_timeout = Duration::from_millis(60000);
  68.  
  69.     let (tx, mut rx) = mpsc::channel(32);
  70.     let (cancel_tx, mut cancel_rx) = oneshot::channel::<bool>();
  71.     let (done_tx, mut done_rx) = oneshot::channel::<bool>();
  72.     let signal_wait_handle = setup_graceful_shutdown(cancel_tx);
  73.  
  74.     let urls: Vec<&str> = vec![
  75.         "https://images.unsplash.com/photo-1739117956532-8a37c76093dd?fm=jpg",
  76.         "https://images.unsplash.com/photo-1735930371721-d1243e8122ab?fm=jpg",
  77.         "https://images.unsplash.com/photo-1732204662871-f853329ce638?fm=jpg",
  78.         "https://images.unsplash.com/photo-1712677925320-5f93d7f853f5?fm=jpg",
  79.         "https://images.unsplash.com/photo-1712677925900-12574cb3cf15?fm=jpg",
  80.         "https://images.unsplash.com/photo-1710436000845-bb707af976a6?fm=jpg",
  81.         "https://images.unsplash.com/photo-1700386277812-5ff74eb03650?fm=jpg",
  82.         "https://images.unsplash.com/photo-1700386277415-c9c5270ee8b8?fm=jpg",
  83.         "https://images.unsplash.com/photo-1691947563165-28011f40d786?fm=jpg",
  84.         "https://images.unsplash.com/photo-1691782359338-97fcbcce8ce8?fm=jpg",
  85.     ];
  86.  
  87.     let mut handles: Vec<JoinHandle<()>> = Vec::new();
  88.     for url in urls {
  89.         let tx = tx.clone();
  90.         let handle = tokio::spawn(async move {
  91.             let timeout_status = timeout(task_timeout, download_image(tx, url)).await;
  92.             debug!("timeout status = {:?} for url {}", timeout_status, url)
  93.         });
  94.         handles.push(handle);
  95.     }
  96.  
  97.     let wait_task = tokio::spawn(async move {
  98.         for handle in handles {
  99.             let _ = handle.await;
  100.         }
  101.         drop(tx);
  102.     });
  103.  
  104.     let write_task = tokio::spawn(async move {
  105.         let mut counter = 1;
  106.         while let Some(message) = rx.recv().await {
  107.             if let Ok(image_bytes) = message {
  108.                 let result = timeout(
  109.                     task_timeout,
  110.                     tokio::spawn(async move {
  111.                         let file = File::create(format!("{}.jpg", counter)).await;
  112.                         if let Ok(mut file) = file {
  113.                             let _ = file.write_all(&image_bytes).await;
  114.                             info!("Wrote image {}", counter);
  115.                         } else {
  116.                             error!("Failed to write image {}", counter)
  117.                         }
  118.                     }),
  119.                 )
  120.                 .await;
  121.  
  122.                 debug!("write task timeout result {:?}", task_timeout);
  123.                 counter += 1;
  124.             }
  125.         }
  126.         let _ = done_tx.send(true);
  127.     });
  128.  
  129.     tokio::select! {
  130.         _ = cancel_rx => {
  131.             info!("computation cancelled");
  132.             wait_task.abort();
  133.             write_task.abort();
  134.         }
  135.         _ = done_rx => {
  136.             info!("operation completed");
  137.         }
  138.     }
  139.     signal_wait_handle.abort();
  140.  
  141.     Ok(())
  142. }
  143.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement