Advertisement
MetonymyQT

Tokio Rust Concurrency Attempt 4

Apr 6th, 2025
473
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 5.03 KB | None | 0 0
  1. use anyhow::{Result, anyhow};
  2. use log::{debug, error, info};
  3. use signal_hook::consts::{SIGINT, SIGTERM};
  4. use signal_hook::iterator::Signals;
  5. use std::ops::Deref;
  6. use std::sync::Arc;
  7. use std::sync::atomic::{AtomicBool, Ordering};
  8. use std::thread;
  9. use std::time::Duration;
  10. use tokio::fs::File;
  11. use tokio::io::AsyncWriteExt;
  12. use tokio::sync::mpsc;
  13. use tokio::sync::mpsc::Sender;
  14. use tokio::sync::oneshot;
  15. use tokio::task::{JoinError, JoinHandle};
  16. use tokio::time::{sleep, timeout};
  17. use tokio_util::task::TaskTracker;
  18.  
  19. fn setup_graceful_shutdown(cancelled: oneshot::Sender<bool>) -> JoinHandle<()> {
  20.     tokio::spawn(async move {
  21.         let signals = Signals::new([SIGINT, SIGTERM]);
  22.         match signals {
  23.             Ok(mut signal_info) => {
  24.                 loop {
  25.                     if signal_info.pending().next().is_some() {
  26.                         info!("user request cancellation");
  27.                         let _ = cancelled.send(true);
  28.                         return;
  29.                     }
  30.                     sleep(Duration::from_millis(250)).await;
  31.                 }
  32.             }
  33.             Err(error) => {
  34.                 error!("Failed to setup signal handler: {error}")
  35.             }
  36.         }
  37.     })
  38. }
  39.  
  40. async fn download_image(tx: Sender<Result<bytes::Bytes, anyhow::Error>>, url: &str) {
  41.     info!("Downloading image for {}", url);
  42.     let resp = reqwest::get(url).await;
  43.     if let Ok(resp) = resp {
  44.         let image_bytes = resp.bytes().await;
  45.         match image_bytes {
  46.             Ok(data) => {
  47.                 tx.send(Ok(data)).await.expect("failed to send message");
  48.             }
  49.             Err(err) => {
  50.                 let send_result = tx.send(Err(anyhow!("failed to get image data: {}", err)))
  51.                     .await;
  52.  
  53.                 debug!("download image ok {:?} for url {}", send_result, url)
  54.             }
  55.         }
  56.     } else {
  57.         let send_result = tx.send(Err(anyhow!("failed to get response")))
  58.             .await;
  59.  
  60.         debug!("download image error {:?}  for url {}", send_result, url)
  61.     }
  62. }
  63.  
  64. #[tokio::main]
  65. async fn main() -> Result<()> {
  66.     env_logger::init();
  67.  
  68.     let task_timeout = Duration::from_millis(60000);
  69.  
  70.     let (tx, mut rx) = mpsc::channel(32);
  71.     let (cancel_tx, mut cancel_rx) = oneshot::channel::<bool>();
  72.     let tracker = TaskTracker::new();
  73.     let signal_wait_handle = setup_graceful_shutdown(cancel_tx);
  74.  
  75.     let urls: Vec<&str> = vec![
  76.         "https://images.unsplash.com/photo-1739117956532-8a37c76093dd?fm=jpg",
  77.         "https://images.unsplash.com/photo-1735930371721-d1243e8122ab?fm=jpg",
  78.         "https://images.unsplash.com/photo-1732204662871-f853329ce638?fm=jpg",
  79.         "https://images.unsplash.com/photo-1712677925320-5f93d7f853f5?fm=jpg",
  80.         "https://images.unsplash.com/photo-1712677925900-12574cb3cf15?fm=jpg",
  81.         "https://images.unsplash.com/photo-1710436000845-bb707af976a6?fm=jpg",
  82.         "https://images.unsplash.com/photo-1700386277812-5ff74eb03650?fm=jpg",
  83.         "https://images.unsplash.com/photo-1700386277415-c9c5270ee8b8?fm=jpg",
  84.         "https://images.unsplash.com/photo-1691947563165-28011f40d786?fm=jpg",
  85.         "https://images.unsplash.com/photo-1691782359338-97fcbcce8ce8?fm=jpg",
  86.     ];
  87.  
  88.     let mut handles: Vec<JoinHandle<Result<(), JoinError>>> = Vec::new();
  89.     for url in urls {
  90.         let tx = tx.clone();
  91.         let handle = tracker.spawn(tokio::spawn(async move {
  92.             let timeout_status = timeout(task_timeout, download_image(tx, url)).await;
  93.             debug!("timeout status = {:?} for url {}", timeout_status, url)
  94.         }));
  95.         handles.push(handle);
  96.     }
  97.  
  98.     let wait_task = tracker.spawn(tokio::spawn(async move {
  99.         for handle in handles {
  100.             let _ = handle.await;
  101.         }
  102.         drop(tx);
  103.     }));
  104.  
  105.     let write_task = tracker.spawn(tokio::spawn(async move {
  106.         let mut counter = 1;
  107.         while let Some(message) = rx.recv().await {
  108.             if let Ok(image_bytes) = message {
  109.                 let result = timeout(
  110.                     task_timeout,
  111.                     tokio::spawn(async move {
  112.                         let file = File::create(format!("{}.jpg", counter)).await;
  113.                         if let Ok(mut file) = file {
  114.                             let _ = file.write_all(&image_bytes).await;
  115.                             info!("Wrote image {}", counter);
  116.                         } else {
  117.                             error!("Failed to write image {}", counter)
  118.                         }
  119.                     }),
  120.                 )
  121.                 .await;
  122.  
  123.                 debug!("write task timeout result {:?}", task_timeout);
  124.                 counter += 1;
  125.             }
  126.         }
  127.     }));
  128.  
  129.     tracker.close();
  130.     tokio::select! {
  131.         _ = cancel_rx => {
  132.             info!("computation cancelled");
  133.             wait_task.abort();
  134.             write_task.abort();
  135.         }
  136.         _ = tracker.wait() => {
  137.             info!("operation completed");
  138.         }
  139.     }
  140.     signal_wait_handle.abort();
  141.  
  142.     Ok(())
  143. }
  144.  
Tags: rust
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement