Advertisement
MetonymyQT

Tokio Rust Concurrency Attempt

Apr 6th, 2025 (edited)
619
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 4.08 KB | Source Code | 0 0
  1. use std::ops::Deref;
  2. use std::sync::Arc;
  3. use std::sync::atomic::{AtomicBool, Ordering};
  4. use std::thread;
  5. use std::time::Duration;
  6. use anyhow::{anyhow, Result};
  7. use signal_hook::consts::{SIGINT, SIGTERM};
  8. use tokio::fs::File;
  9. use tokio::io::AsyncWriteExt;
  10. use tokio::sync::mpsc;
  11. use tokio::sync::oneshot;
  12. use tokio::sync::mpsc::Sender;
  13. use tokio::task::JoinHandle;
  14. use signal_hook::iterator::Signals;
  15. use log::{debug, error, info};
  16. use tokio::time::{sleep, timeout};
  17.  
  18. fn setup_graceful_shutdown(cancelled: Arc<AtomicBool>) {
  19.     thread::spawn(move || {
  20.         let signals = Signals::new([SIGINT, SIGTERM]);
  21.         match signals {
  22.             Ok(mut signal_info) => {
  23.                 if signal_info.forever().next().is_some() {
  24.                     info!("user request cancellation");
  25.                     cancelled.store(true, Ordering::Relaxed);
  26.                 }
  27.             }
  28.             Err(error) => {
  29.                 error!("Failed to setup signal handler: {error}")
  30.             }
  31.         }
  32.     });
  33. }
  34.  
  35. async fn download_image(tx: Sender<Result<bytes::Bytes, anyhow::Error>>, url: &str) {
  36.     info!("Downloading image for {}", url);
  37.     let resp = reqwest::get(url)
  38.         .await;
  39.     if let Ok(resp) = resp {
  40.         let image_bytes = resp.bytes().await;
  41.         match image_bytes {
  42.             Ok(data) => {
  43.                 tx.send(Ok(data)).await.expect("failed to send message");
  44.             }
  45.             Err(err) => {
  46.                 tx.send(Err(anyhow!("failed to get image data: {}", err))).await.expect("failed to send message");
  47.             }
  48.         }
  49.     } else {
  50.         tx.send(Err(anyhow!("failed to get response"))).await.expect("failed to send message");
  51.     }
  52. }
  53.  
  54. #[tokio::main]
  55. async fn main() -> Result<()> {
  56.     env_logger::init();
  57.  
  58.     let task_timeout = Duration::from_millis(500);
  59.  
  60.     let (tx, mut rx) = mpsc::channel(32);
  61.     let cancelled = Arc::from(AtomicBool::new(false));
  62.     setup_graceful_shutdown(cancelled.clone());
  63.  
  64.     let urls: Vec<&str> = vec![
  65.         "https://images.unsplash.com/photo-1739117956532-8a37c76093dd?fm=jpg",
  66.         "https://images.unsplash.com/photo-1735930371721-d1243e8122ab?fm=jpg",
  67.         "https://images.unsplash.com/photo-1732204662871-f853329ce638?fm=jpg",
  68.         "https://images.unsplash.com/photo-1712677925320-5f93d7f853f5?fm=jpg",
  69.         "https://images.unsplash.com/photo-1712677925900-12574cb3cf15?fm=jpg",
  70.         "https://images.unsplash.com/photo-1710436000845-bb707af976a6?fm=jpg",
  71.         "https://images.unsplash.com/photo-1700386277812-5ff74eb03650?fm=jpg",
  72.         "https://images.unsplash.com/photo-1700386277415-c9c5270ee8b8?fm=jpg",
  73.         "https://images.unsplash.com/photo-1691947563165-28011f40d786?fm=jpg",
  74.         "https://images.unsplash.com/photo-1691782359338-97fcbcce8ce8?fm=jpg"
  75.     ];
  76.  
  77.     let mut handles: Vec<JoinHandle<()>> = Vec::new();
  78.     for url in urls {
  79.         let tx = tx.clone();
  80.         let handle = tokio::spawn(async move {
  81.             let timeout_status = timeout(task_timeout, download_image(tx, url)).await;
  82.             debug!("timeout status = {:?} for url {}", timeout_status, url)
  83.         });
  84.         handles.push(handle);
  85.     }
  86.  
  87.  
  88.     let wait_cancelled = cancelled.clone();
  89.     let wait_task = tokio::spawn(async move {
  90.        for handle in handles {
  91.            if wait_cancelled.load(Ordering::Relaxed) {
  92.                handle.abort();
  93.            } else {
  94.                let _ = handle.await;
  95.            }
  96.        }
  97.         drop(tx);
  98.     });
  99.  
  100.     let mut counter = 1;
  101.     while let Some(message) = rx.recv().await {
  102.         if cancelled.load(Ordering::Relaxed) {
  103.             break
  104.         }
  105.  
  106.         if let Ok(image_bytes) = message {
  107.             let file = File::create(format!("{}.jpg", counter)).await;
  108.             if let Ok(mut file) = file {
  109.                 let _ = file.write_all(&image_bytes).await;
  110.                 info!("Wrote image {}", counter);
  111.             } else {
  112.                 error!("Failed to write image {}", counter)
  113.             }
  114.  
  115.             counter += 1;
  116.         }
  117.     }
  118.  
  119.     let _ = wait_task.await;
  120.     Ok(())
  121. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement