Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- use std::ops::Deref;
- use std::sync::Arc;
- use std::sync::atomic::{AtomicBool, Ordering};
- use std::thread;
- use std::time::Duration;
- use anyhow::{anyhow, Result};
- use signal_hook::consts::{SIGINT, SIGTERM};
- use tokio::fs::File;
- use tokio::io::AsyncWriteExt;
- use tokio::sync::mpsc;
- use tokio::sync::oneshot;
- use tokio::sync::mpsc::Sender;
- use tokio::task::JoinHandle;
- use signal_hook::iterator::Signals;
- use log::{debug, error, info};
- use tokio::time::{sleep, timeout};
- fn setup_graceful_shutdown(cancelled: Arc<AtomicBool>) {
- thread::spawn(move || {
- let signals = Signals::new([SIGINT, SIGTERM]);
- match signals {
- Ok(mut signal_info) => {
- if signal_info.forever().next().is_some() {
- info!("user request cancellation");
- cancelled.store(true, Ordering::Relaxed);
- }
- }
- Err(error) => {
- error!("Failed to setup signal handler: {error}")
- }
- }
- });
- }
- async fn download_image(tx: Sender<Result<bytes::Bytes, anyhow::Error>>, url: &str) {
- info!("Downloading image for {}", url);
- let resp = reqwest::get(url)
- .await;
- if let Ok(resp) = resp {
- let image_bytes = resp.bytes().await;
- match image_bytes {
- Ok(data) => {
- tx.send(Ok(data)).await.expect("failed to send message");
- }
- Err(err) => {
- tx.send(Err(anyhow!("failed to get image data: {}", err))).await.expect("failed to send message");
- }
- }
- } else {
- tx.send(Err(anyhow!("failed to get response"))).await.expect("failed to send message");
- }
- }
- #[tokio::main]
- async fn main() -> Result<()> {
- env_logger::init();
- let task_timeout = Duration::from_millis(500);
- let (tx, mut rx) = mpsc::channel(32);
- let cancelled = Arc::from(AtomicBool::new(false));
- setup_graceful_shutdown(cancelled.clone());
- let urls: Vec<&str> = vec![
- "https://images.unsplash.com/photo-1739117956532-8a37c76093dd?fm=jpg",
- "https://images.unsplash.com/photo-1735930371721-d1243e8122ab?fm=jpg",
- "https://images.unsplash.com/photo-1732204662871-f853329ce638?fm=jpg",
- "https://images.unsplash.com/photo-1712677925320-5f93d7f853f5?fm=jpg",
- "https://images.unsplash.com/photo-1712677925900-12574cb3cf15?fm=jpg",
- "https://images.unsplash.com/photo-1710436000845-bb707af976a6?fm=jpg",
- "https://images.unsplash.com/photo-1700386277812-5ff74eb03650?fm=jpg",
- "https://images.unsplash.com/photo-1700386277415-c9c5270ee8b8?fm=jpg",
- "https://images.unsplash.com/photo-1691947563165-28011f40d786?fm=jpg",
- "https://images.unsplash.com/photo-1691782359338-97fcbcce8ce8?fm=jpg"
- ];
- let mut handles: Vec<JoinHandle<()>> = Vec::new();
- for url in urls {
- let tx = tx.clone();
- let handle = tokio::spawn(async move {
- let timeout_status = timeout(task_timeout, download_image(tx, url)).await;
- debug!("timeout status = {:?} for url {}", timeout_status, url)
- });
- handles.push(handle);
- }
- let wait_cancelled = cancelled.clone();
- let wait_task = tokio::spawn(async move {
- for handle in handles {
- if wait_cancelled.load(Ordering::Relaxed) {
- handle.abort();
- } else {
- let _ = handle.await;
- }
- }
- drop(tx);
- });
- let mut counter = 1;
- while let Some(message) = rx.recv().await {
- if cancelled.load(Ordering::Relaxed) {
- break
- }
- if let Ok(image_bytes) = message {
- let file = File::create(format!("{}.jpg", counter)).await;
- if let Ok(mut file) = file {
- let _ = file.write_all(&image_bytes).await;
- info!("Wrote image {}", counter);
- } else {
- error!("Failed to write image {}", counter)
- }
- counter += 1;
- }
- }
- let _ = wait_task.await;
- Ok(())
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement