Advertisement
NLinker

Simple async service

Feb 2nd, 2020
964
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 2.39 KB | None | 0 0
  1. // https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=fef597285e9b04c52512daf255ec988c
  2.  
  3. use num_cpus::get;
  4. use std::fmt::Display;
  5. use std::net::{TcpStream, ToSocketAddrs};
  6. use std::sync::mpsc::{Receiver, SyncSender};
  7. use std::sync::{mpsc, Arc};
  8. use std::thread::spawn;
  9. use tokio; // 0.2.9
  10. use tokio::runtime::Builder;
  11. use tokio::sync::Semaphore;
  12.  
  13. #[derive(Debug, Clone)]
  14. struct Response {
  15.     result: String,
  16.     hostname: String,
  17.     process_time: String,
  18.     status: bool,
  19. }
  20. async fn process_host<A>(
  21.     hostname: A,
  22.     tx: SyncSender<Response>,
  23.     connection_pool: Arc<Semaphore>,
  24. ) -> Response
  25. where
  26.     A: ToSocketAddrs + Display,
  27. {
  28.     let guard = connection_pool.acquire().await;
  29.     let res = Response {
  30.         result: "none".to_string(),
  31.         hostname: hostname.to_string(),
  32.         process_time: "sometime".to_string(),
  33.         status: false,
  34.     };
  35.     let tcp = match TcpStream::connect(&hostname) {
  36.         Ok(a) => a,
  37.         Err(e) => {
  38.             let res = Response {
  39.                 result: e.to_string(),
  40.                 hostname: hostname.to_string(),
  41.                 process_time: "sometime".to_string(),
  42.                 status: false,
  43.             };
  44.             tx.send(res.clone()).unwrap();
  45.             return res;
  46.         }
  47.     };
  48.     // chain of async calls.
  49.     res
  50. }
  51.  
  52. fn main() {
  53.     let hosts = vec!["1.1.1.1:22", "8.8.8.8:22" , "8.8.4.4:22"];
  54.     let num_of_threads = Arc::new(Semaphore::new(10));
  55.     let mut reactor = Builder::new()
  56.         .enable_all()
  57.         .threaded_scheduler()
  58.         .core_threads(get())
  59.         .build()
  60.         .unwrap();
  61.     let (tx, rx): (SyncSender<Response>, Receiver<Response>) = mpsc::sync_channel(0);
  62.     let queue_len =100;
  63.     spawn(move || incremental_save(rx,  queue_len));
  64.  
  65.     let tasks: Vec<_> = hosts.into_iter().map(|host| {
  66.         reactor.spawn(process_host(
  67.             host,
  68.             tx.clone(),
  69.             num_of_threads.clone(),
  70.         ))
  71.     }).collect();
  72.    
  73.     reactor.block_on(futures::future::join_all(tasks));
  74. }
  75. fn incremental_save(rx: Receiver<Response>, queue_len: u64) {
  76.     for _ in 0..=queue_len {
  77.         let received = match rx.recv() {
  78.             Ok(a) => a,
  79.             Err(e) => {
  80.                 eprintln!("incremental_save: {}", e);
  81.                 break;
  82.             }
  83.         };
  84.         println! {"{:?}", received};
  85.     }
  86. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement