NLinker

Working variant of the commands tree

Aug 2nd, 2020 (edited)
424
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 37.55 KB | None | 0 0
  1. // Copyright (c) 2020 DDN. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4.  
  5. use crate::diff::calculate_diff;
  6. use crate::error::ImlManagerCliError;
  7. use crate::gen_tree::{apply_diff, Item, Node, State, Tree};
  8. use futures::{future, FutureExt, TryFutureExt};
  9. use iml_api_utils::dependency_tree::{build_direct_dag, DependencyDAG, Deps, Rich};
  10. use iml_wire_types::{ApiList, AvailableAction, Command, EndpointName, FlatQuery, Host, Job, Step};
  11. use indicatif::{MultiProgress, ProgressBar, ProgressStyle, ProgressDrawTarget};
  12. use lazy_static::lazy_static;
  13. use regex::{Captures, Regex};
  14. use serde::export::Formatter;
  15. use std::collections::HashSet;
  16. use std::fmt::Display;
  17. use std::sync::atomic::AtomicBool;
  18. use std::sync::{Arc, Mutex};
  19. use std::{collections::HashMap, fmt::Debug, iter, time::Duration};
  20. use std::{fmt, fs};
  21. use tokio::task::JoinError;
  22. use tokio::{task::spawn_blocking, time::delay_for};
  23. use crate::var::kit::{Rng, get_action, Action};
  24. use std::cell::Cell;
  25. use itertools::Itertools;
  26. use console::style;
  27.  
  28. const ARROW: &'_ str = " ═➤ "; // variants: = ═ - ▬ > ▷ ▶ ► ➤
  29. const SPACE: &'_ str = "   ";
  30. const FETCH_DELAY_MS: u64 = 1000;
  31. const SHOW_DELAY_MS: u64 = 150;
  32.  
  33. type Job0 = Job<Option<serde_json::Value>>;
  34. type RichCommand = Rich<i32, Arc<Command>>;
  35. type RichJob = Rich<i32, Arc<Job0>>;
  36. type RichStep = Rich<i32, Arc<Step>>;
  37.  
  38. #[derive(Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Debug)]
  39. pub struct CmdId(i32);
  40.  
  41. #[derive(Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Debug)]
  42. pub struct JobId(i32);
  43.  
  44. // region declaration of types TypeId, State, Item<K>
  45. #[derive(Copy, Clone, Hash, PartialEq, Eq, Debug)]
  46. pub enum TypedId {
  47.     Cmd(i32),
  48.     Job(i32),
  49.     Step(i32),
  50. }
  51.  
  52. impl Default for TypedId {
  53.     fn default() -> Self {
  54.         TypedId::Cmd(0)
  55.     }
  56. }
  57.  
  58. impl Display for TypedId {
  59.     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
  60.        match self {
  61.            TypedId::Cmd(i) => write!(f, "c{}", i),
  62.            TypedId::Job(i) => write!(f, "j{}", i),
  63.            TypedId::Step(i) => write!(f, "s{}", i),
  64.        }
  65.    }
  66. }
  67.  
  68. #[derive(Clone, Eq, PartialEq, Debug)]
  69. pub struct Specific {
  70.    pub msg: String,
  71.    pub console: String,
  72.    pub backtrace: String,
  73. }
  74.  
  75. impl Display for Specific {
  76.    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
  77.         write!(f, "{}", self.msg)
  78.     }
  79. }
  80.  
  81. /// It is pretty expensive to set the style on the progress bar on each iteration,
  82. /// so we need to keep track what the style and whether it has been set for the progress bar.
  83. /// See [`set_progress_bar_message`] function.
  84. #[derive(Clone, Debug)]
  85. pub struct ProgressBarIndicator {
  86.     pub progress_bar: ProgressBar,
  87.     pub active_style: Cell<Option<bool>>,
  88. }
  89. // endregion
  90.  
  91. #[derive(Clone, Debug)]
  92. pub struct TreeState {
  93.     pub has_read: bool,
  94.     pub index: usize,
  95.     pub commands: Vec<Command>,
  96.     pub jobs: Vec<Job0>,
  97.     pub steps: Vec<Step>,
  98. }
  99.  
  100. lazy_static! {
  101.     static ref RNG: Mutex<Rng> = Mutex::new(Rng::new());
  102.     static ref TREE_STATE: Mutex<TreeState> = {
  103.         let str = fs::read_to_string("ops/commands/cmd-37_38.json").unwrap();
  104.         let mut command_list = serde_json::from_str::<ApiList<Command>>(&str).unwrap();
  105.         for cmd in &mut command_list.objects {
  106.             set_cmd_state(cmd, State::Progressing);
  107.         }
  108.         let str = fs::read_to_string("ops/jobs/jobs-227-242.json").unwrap();
  109.         let mut job_list = serde_json::from_str::<ApiList<Job0>>(&str).unwrap();
  110.         for job in &mut job_list.objects {
  111.             set_job_state(job, State::Progressing);
  112.         }
  113.         let str = fs::read_to_string("ops/steps/steps-400-x.json").unwrap();
  114.         let mut step_list = serde_json::from_str::<ApiList<Step>>(&str).unwrap();
  115.         for step in &mut step_list.objects {
  116.             set_step_state(step, State::Progressing);
  117.         }
  118.         Mutex::new(TreeState {
  119.             has_read: false,
  120.             index: 0,
  121.             commands: command_list.objects,
  122.             jobs: job_list.objects,
  123.             steps: step_list.objects,
  124.         })
  125.     };
  126. }
  127.  
  128. #[derive(serde::Serialize)]
  129. pub struct SendJob<T> {
  130.     pub class_name: String,
  131.     pub args: T,
  132. }
  133.  
  134. #[derive(serde::Serialize)]
  135. pub struct SendCmd<T> {
  136.     pub jobs: Vec<SendJob<T>>,
  137.     pub message: String,
  138. }
  139.  
  140. pub async fn create_command<T: serde::Serialize>(
  141.     cmd_body: SendCmd<T>,
  142. ) -> Result<Command, ImlManagerCliError> {
  143.     let resp = post(Command::endpoint_name(), cmd_body)
  144.         .await?
  145.         .error_for_status()?;
  146.  
  147.     let cmd = resp.json().await?;
  148.  
  149.     tracing::debug!("Resp JSON is {:?}", cmd);
  150.  
  151.     Ok(cmd)
  152. }
  153.  
  154. fn cmd_state(cmd: &Command) -> State {
  155.     if cmd.cancelled {
  156.         State::Cancelled
  157.     } else if cmd.errored {
  158.         State::Errored
  159.     } else if cmd.complete {
  160.         State::Completed
  161.     } else {
  162.         State::Progressing
  163.     }
  164. }
  165.  
  166. fn set_cmd_state(cmd: &mut Command, state: State) {
  167.     cmd.complete = false;
  168.     cmd.errored = false;
  169.     cmd.cancelled = false;
  170.     match state {
  171.         State::Cancelled => cmd.cancelled = true,
  172.         State::Errored => cmd.errored = true,
  173.         State::Completed => cmd.complete = true,
  174.         State::Progressing => {},
  175.     }
  176. }
  177.  
  178. fn job_state(job: &Job0) -> State {
  179.     // job.state can be "pending", "tasked" or "complete"
  180.     // if a job is errored or cancelled, it is also complete
  181.     if job.cancelled {
  182.         State::Cancelled
  183.     } else if job.errored {
  184.         State::Errored
  185.     } else if job.state == "complete" {
  186.         State::Completed
  187.     } else {
  188.         State::Progressing
  189.     }
  190. }
  191.  
  192. fn set_job_state(job: &mut Job0, state: State) {
  193.     job.cancelled = false;
  194.     job.errored = false;
  195.     job.state = "incomplete".to_string();
  196.     match state {
  197.         State::Cancelled => job.cancelled = true,
  198.         State::Errored => job.errored = true,
  199.         State::Completed => job.state = "complete".to_string(),
  200.         State::Progressing => {},
  201.     }
  202. }
  203.  
  204. fn step_state(step: &Step) -> State {
  205.     // step.state can be "success", "failed" or "incomplete"
  206.     match &step.state[..] {
  207.         "cancelled" => State::Cancelled,
  208.         "failed" => State::Errored,
  209.         "success" => State::Completed,
  210.         _ /* "incomplete" */ => State::Progressing,
  211.     }
  212. }
  213.  
  214. fn set_step_state(step: &mut Step, state: State) {
  215.     match state {
  216.         State::Cancelled => step.state = "cancelled".to_string(),
  217.         State::Errored => step.state = "failed".to_string(),
  218.         State::Completed => step.state = "success".to_string(),
  219.         State::Progressing => step.state = "incomplete".to_string(),
  220.     }
  221. }
  222.  
  223. fn cmd_finished(cmd: &Command) -> bool {
  224.     cmd_state(cmd) == State::Completed
  225. }
  226.  
  227. fn job_finished(job: &Job0) -> bool {
  228.     job_state(job) == State::Completed
  229. }
  230.  
  231. fn step_finished(step: &Step) -> bool {
  232.     step_state(step) != State::Progressing
  233. }
  234.  
  235. pub async fn wait_for_command(cmd: Command) -> Result<Command, ImlManagerCliError> {
  236.     loop {
  237.         if cmd_finished(&cmd) {
  238.             return Ok(cmd);
  239.         }
  240.  
  241.         delay_for(Duration::from_millis(1000)).await;
  242.  
  243.         let client = iml_manager_client::get_client()?;
  244.  
  245.         let cmd = iml_manager_client::get(
  246.             client,
  247.             &format!("command/{}", cmd.id),
  248.             Vec::<(String, String)>::new(),
  249.         )
  250.         .await?;
  251.  
  252.         if cmd_finished(&cmd) {
  253.             return Ok(cmd);
  254.         }
  255.     }
  256. }
  257.  
  258. pub async fn fetch_api_list<T>(ids: Vec<i32>) -> Result<ApiList<T>, ImlManagerCliError>
  259. where
  260.     T: EndpointName + serde::de::DeserializeOwned + std::fmt::Debug,
  261. {
  262.     let query: Vec<_> = ids
  263.         .into_iter()
  264.         .map(|x| ["id__in".into(), x.to_string()])
  265.         .chain(iter::once(["limit".into(), "0".into()]))
  266.         .collect();
  267.     get(T::endpoint_name(), query).await
  268. }
  269.  
  270. /// Waits for command completion and prints progress messages
  271. /// This *does not* error on command failure, it only tracks command
  272. /// completion
  273. pub async fn wait_for_commands(commands: &[Command]) -> Result<Vec<Command>, ImlManagerCliError> {
  274.     let multi_progress = Arc::new(MultiProgress::new());
  275.     multi_progress.set_draw_target(ProgressDrawTarget::stdout());
  276.     let sty_main = ProgressStyle::default_bar().template("{bar:60.green/yellow} {pos:>4}/{len:4}");
  277.     let main_pb = multi_progress.add(ProgressBar::new(commands.len() as u64));
  278.     main_pb.set_style(sty_main);
  279.     main_pb.tick();
  280.  
  281.     // `current_items` will have only commands at first
  282.     // and then will be extended after `fetch_and_update` succeeds
  283.     let (cmd_ids, cmds) = build_initial_commands(commands);
  284.     let tree = build_fresh_tree(&cmd_ids, &cmds, &HashMap::new(), &HashMap::new());
  285.     let mut fresh_items = tree.render();
  286.     let mut current_items_vec = Vec::new();
  287.     calculate_and_apply_diff(
  288.         &mut current_items_vec,
  289.         &mut fresh_items,
  290.         &tree,
  291.         &multi_progress,
  292.         &main_pb,
  293.     );
  294.  
  295.     let is_done = Arc::new(AtomicBool::new(false));
  296.     let current_items = Arc::new(tokio::sync::Mutex::new(current_items_vec));
  297.  
  298.     // multi-progress waiting loop
  299.     // fut1: ErrInto<Map<JoinHandle<Result<()>>, fn(Result<Result<(), Error>, JoinError>)
  300.     let fut1 = {
  301.         let multi_progress = Arc::clone(&multi_progress);
  302.         spawn_blocking(move || multi_progress.join()).map(
  303.             |r: Result<Result<(), std::io::Error>, JoinError>| {
  304.                 r.map_err(|e: JoinError| e.into())
  305.                     .and_then(std::convert::identity)
  306.             },
  307.         ).err_into()
  308.     };
  309.  
  310.     // updating loop
  311.     // fut2: impl Future<Output=Result<Vec<Command>, JoinError>>
  312.     let fut2 = {
  313.         let is_done = Arc::clone(&is_done);
  314.         let multi_progress = Arc::clone(&multi_progress);
  315.         let current_items = Arc::clone(&current_items);
  316.         async move {
  317.             let mut cmds: HashMap<i32, RichCommand> = cmds;
  318.             let mut jobs: HashMap<i32, RichJob> = HashMap::new();
  319.             let mut steps: HashMap<i32, RichStep> = HashMap::new();
  320.  
  321.             loop {
  322.                 if cmds.values().all(|cmd| cmd_state(cmd) != State::Progressing) {
  323.                     tracing::debug!("All commands complete. Returning");
  324.                     is_done.store(true, std::sync::atomic::Ordering::SeqCst);
  325.  
  326.                     // Unfortunately, there is no easy unsafe way to move out from Arc, so `clone`
  327.                     // may be needed.
  328.                     let mut commands: Vec<Command> = Vec::with_capacity(cmds.len());
  329.                     for id in cmd_ids {
  330.                         if let Some(rich_cmd) = cmds.remove(&id) {
  331.                             match Arc::try_unwrap(rich_cmd.inner) {
  332.                                 Ok(cmd) => commands.push(cmd),
  333.                                 Err(arc_cmd) => commands.push((*arc_cmd).clone()),
  334.                             }
  335.                         }
  336.                     }
  337.                     return Ok::<_, ImlManagerCliError>(commands);
  338.                 }
  339.  
  340.                 // network call goes here
  341.                 fetch_and_update(&cmd_ids, &mut cmds, &mut jobs, &mut steps).await?;
  342.  
  343.                 let tree = build_fresh_tree(&cmd_ids, &cmds, &HashMap::new(), &HashMap::new());
  344.                 let mut fresh_items = tree.render();
  345.                 calculate_and_apply_diff(
  346.                     &mut *current_items.lock().await,
  347.                     &mut fresh_items,
  348.                     &tree,
  349.                     &multi_progress,
  350.                     &main_pb,
  351.                 );
  352.  
  353.                 main_pb.set_length(tree.len() as u64);
  354.                 main_pb.set_position(tree.count_node_keys(|n| n.state != State::Progressing) as u64);
  355.  
  356.                 delay_for(Duration::from_millis(FETCH_DELAY_MS)).await;
  357.             }
  358.         }
  359.     };
  360.  
  361.     // showing loop
  362.     // fut3: impl Future<Output=Result<(), Error>>
  363.     let fut3 = {
  364.         let is_done = Arc::clone(&is_done);
  365.         let current_items = Arc::clone(&current_items);
  366.         async move {
  367.             while !is_done.load(std::sync::atomic::Ordering::SeqCst) {
  368.                 for it in current_items.lock().await.iter() {
  369.                     if it.state == State::Progressing {
  370.                         if let Some(ic) = &it.indicator {
  371.                             ic.progress_bar.inc(1);
  372.                         }
  373.                     }
  374.                 }
  375.                 delay_for(Duration::from_millis(SHOW_DELAY_MS)).await;
  376.             }
  377.             Ok(())
  378.         }
  379.     };
  380.  
  381.     let (_, cmds, _) = future::try_join3(fut1, fut2, fut3).await?;
  382.     Ok(cmds)
  383. }
  384.  
  385. /// wrap each command and build `cmd_ids` to maintain the order of the commands
  386. fn build_initial_commands(commands: &[Command]) -> (Vec<i32>, HashMap<i32, RichCommand>) {
  387.     let mut cmd_ids = Vec::new();
  388.     let mut cmds = HashMap::new();
  389.     for command in commands.iter() {
  390.         let (id, deps) = extract_children_from_cmd(command);
  391.         let inner = Arc::new(command.clone());
  392.         cmds.insert(id, Rich { id, deps, inner });
  393.         cmd_ids.push(id);
  394.     }
  395.     (cmd_ids, cmds)
  396. }
  397.  
  398. async fn fetch_and_update(
  399.     cmd_ids: &[i32],
  400.     commands: &mut HashMap<i32, RichCommand>,
  401.     jobs: &mut HashMap<i32, RichJob>,
  402.     steps: &mut HashMap<i32, RichStep>,
  403. ) -> Result<(), ImlManagerCliError> {
  404.     if !TREE_STATE.lock().unwrap().has_read {
  405.         TREE_STATE.lock().unwrap().has_read = true;
  406.         update_commands(commands, TREE_STATE.lock().unwrap().commands.clone());
  407.         update_jobs(jobs, TREE_STATE.lock().unwrap().jobs.clone());
  408.         update_steps(steps, TREE_STATE.lock().unwrap().steps.clone());
  409.     } else {
  410.         let trees = build_trees(cmd_ids, commands, jobs, steps);
  411.         for tree in trees {
  412.             let action: Option<Action> = get_action(&mut RNG.lock().unwrap(), &tree);
  413.             match action {
  414.                 None => {},
  415.                 Some(Action::KeepProgress) => {},
  416.                 Some(Action::AddNode(typed_id, name)) => match typed_id {
  417.                     TypedId::Job(j) => {
  418.                         let new_job_id = jobs.keys().copied().max().unwrap_or(300) + 1;
  419.                         let mut parent_job = (*jobs[&j].inner).clone();
  420.                         parent_job.wait_for.push(format!("/api/job/{}/", new_job_id));
  421.                         let new_job = Job0 {
  422.                             id: new_job_id,
  423.                             resource_uri: format!("/api/job/{}/", new_job_id),
  424.                             cancelled: false,
  425.                             errored: false,
  426.                             state: "incomplete".to_string(),
  427.                             class_name: format!("{}-Class", name.to_uppercase()),
  428.                             commands: parent_job.commands.clone(),
  429.                             description: name.to_string(),
  430.                             created_at: parent_job.created_at.clone(),
  431.                             modified_at: parent_job.modified_at.clone(),
  432.                             step_results: Default::default(),
  433.                             wait_for: vec![],
  434.                             steps: vec![],
  435.                             available_transitions: vec![],
  436.                             read_locks: vec![],
  437.                             write_locks: vec![]
  438.                         };
  439.                         update_jobs(jobs, vec![parent_job, new_job]);
  440.                     },
  441.                     _ => {},
  442.                 },
  443.                 Some(Action::CompleteNode(typed_id)) => match typed_id {
  444.                     TypedId::Cmd(x) => {
  445.                         let mut cmd = (*commands[&x].inner).clone();
  446.                         set_cmd_state(&mut cmd, State::Completed);
  447.                         update_commands(commands, vec![cmd]);
  448.                     },
  449.                     TypedId::Job(x) => {
  450.                         let mut job = (*jobs[&x].inner).clone();
  451.                         set_job_state(&mut job, State::Completed);
  452.                         update_jobs(jobs, vec![job]);
  453.                     },
  454.                     TypedId::Step(x) => {
  455.                         let mut step = (*steps[&x].inner).clone();
  456.                         set_step_state(&mut step, State::Completed);
  457.                         update_steps(steps, vec![step]);
  458.                     },
  459.                 },
  460.                 Some(Action::FailNode(typed_id)) => match typed_id {
  461.                     TypedId::Cmd(x) => {
  462.                         let mut cmd = (*commands[&x].inner).clone();
  463.                         set_cmd_state(&mut cmd, State::Errored);
  464.                         update_commands(commands, vec![cmd]);
  465.                     },
  466.                     TypedId::Job(x) => {
  467.                         let mut job = (*jobs[&x].inner).clone();
  468.                         set_job_state(&mut job, State::Errored);
  469.                         update_jobs(jobs, vec![job]);
  470.                     },
  471.                     TypedId::Step(x) => {
  472.                         let mut step = (*steps[&x].inner).clone();
  473.                         set_step_state(&mut step, State::Errored);
  474.                         step.console = CONSOLE.to_owned();
  475.                         step.backtrace = BACKTRACE.to_owned();
  476.                         update_steps(steps, vec![step]);
  477.                     },
  478.                 },
  479.             }
  480.         }
  481.     }
  482.     // let (load_cmd_ids, load_job_ids, load_step_ids) = extract_ids_to_load(&commands, &jobs, &steps);
  483.     // let loaded_cmds: ApiList<Command> = fetch_api_list(load_cmd_ids).await?;
  484.     // let loaded_jobs: ApiList<Job0> = fetch_api_list(load_job_ids).await?;
  485.     // let loaded_steps: ApiList<Step> = fetch_api_list(load_step_ids).await?;
  486.     // update_commands(commands, loaded_cmds.objects);
  487.     // update_jobs(jobs, loaded_jobs.objects);
  488.     // update_steps(steps, loaded_steps.objects);
  489.     Ok(())
  490. }
  491.  
  492. fn update_commands(commands: &mut HashMap<i32, RichCommand>, loaded_cmds: Vec<Command>) {
  493.     let new_commands = loaded_cmds
  494.         .into_iter()
  495.         .map(|t| {
  496.             let (id, deps) = extract_children_from_cmd(&t);
  497.             let inner = Arc::new(t);
  498.             (id, Rich { id, deps, inner })
  499.         })
  500.         .collect::<HashMap<i32, RichCommand>>();
  501.     commands.extend(new_commands);
  502. }
  503.  
  504. fn update_jobs(jobs: &mut HashMap<i32, RichJob>, loaded_jobs: Vec<Job0>) {
  505.     let new_jobs = loaded_jobs
  506.         .into_iter()
  507.         .map(|t| {
  508.             let (id, deps) = extract_children_from_job(&t);
  509.             let inner = Arc::new(t);
  510.             (id, Rich { id, deps, inner })
  511.         })
  512.         .collect::<HashMap<i32, RichJob>>();
  513.     jobs.extend(new_jobs);
  514. }
  515.  
  516. fn update_steps(steps: &mut HashMap<i32, RichStep>, loaded_steps: Vec<Step>) {
  517.     let new_steps = loaded_steps
  518.         .into_iter()
  519.         .map(|t| {
  520.             let (id, deps) = extract_children_from_step(&t);
  521.             let inner = Arc::new(t);
  522.             (id, Rich { id, deps, inner })
  523.         })
  524.         .collect::<HashMap<i32, RichStep>>();
  525.     steps.extend(new_steps);
  526. }
  527.  
  528. fn extract_sorted_keys<T>(hm: &HashMap<i32, T>) -> Vec<i32> {
  529.     let mut ids = hm.keys().copied().collect::<Vec<_>>();
  530.     ids.sort();
  531.     ids
  532. }
  533.  
  534. fn extract_ids_to_load(
  535.     commands: &HashMap<i32, RichCommand>,
  536.     jobs: &HashMap<i32, RichJob>,
  537.     steps: &HashMap<i32, RichStep>,
  538. ) -> (Vec<i32>, Vec<i32>, Vec<i32>) {
  539.     let load_cmd_ids = extract_sorted_keys(&commands)
  540.         .into_iter()
  541.         .filter(|c| {
  542.             commands
  543.                 .get(c)
  544.                 .map(|cmd| !cmd_finished(cmd))
  545.                 .unwrap_or(true)
  546.         })
  547.         .collect::<Vec<i32>>();
  548.     let load_job_ids = load_cmd_ids
  549.         .iter()
  550.         .filter(|c| commands.contains_key(c))
  551.         .flat_map(|c| commands[c].deps())
  552.         .filter(|j| jobs.get(j).map(|job| !job_finished(job)).unwrap_or(true))
  553.         .copied()
  554.         .collect::<Vec<i32>>();
  555.     let load_step_ids = load_job_ids
  556.         .iter()
  557.         .filter(|j| jobs.contains_key(j))
  558.         .flat_map(|j| jobs[j].deps())
  559.         .filter(|s| {
  560.             steps
  561.                 .get(s)
  562.                 .map(|step| !step_finished(step))
  563.                 .unwrap_or(true)
  564.         })
  565.         .copied()
  566.         .collect::<Vec<i32>>();
  567.     (load_cmd_ids, load_job_ids, load_step_ids)
  568. }
  569.  
  570. pub fn print_error(tree: &Tree<TypedId, Specific>, id: TypedId, print: impl Fn(&str)) {
  571.     let path = tree.get_path_from_root(id);
  572.     let caption = path
  573.         .iter()
  574.         .filter_map(|id| tree.get_node(*id))
  575.         .map(|n| n.inner.msg.clone())
  576.         .join(ARROW);
  577.     print(&caption);
  578.     if let Some(node) = tree.get_node(id) {
  579.         if !node.inner.console.is_empty() {
  580.             print("Console:");
  581.             for line in node.inner.console.lines() {
  582.                 print(&format!("{}{}", SPACE, style(line).red()));
  583.             }
  584.         }
  585.         if !node.inner.backtrace.is_empty() {
  586.             print("Backtrace:");
  587.             for line in node.inner.backtrace.lines() {
  588.                 print(&format!("{}{}", SPACE, style(line).red()));
  589.             }
  590.         }
  591.     }
  592. }
  593.  
  594. /// Waits for command completion and prints progress messages.
  595. /// This will error on command failure and print failed commands in the error message.
  596. pub async fn wait_for_cmds_success(cmds: &[Command]) -> Result<Vec<Command>, ImlManagerCliError> {
  597.     let cmds = wait_for_commands(cmds).await?;
  598.  
  599.     let (failed, passed): (Vec<_>, Vec<_>) =
  600.         cmds.into_iter().partition(|x| x.errored || x.cancelled);
  601.  
  602.     if !failed.is_empty() {
  603.         Err(failed.into())
  604.     } else {
  605.         Ok(passed)
  606.     }
  607. }
  608.  
  609. pub async fn get_available_actions(
  610.     id: u32,
  611.     content_type_id: u32,
  612. ) -> Result<ApiList<AvailableAction>, ImlManagerCliError> {
  613.     get(
  614.         AvailableAction::endpoint_name(),
  615.         vec![
  616.             (
  617.                 "composite_ids",
  618.                 format!("{}:{}", content_type_id, id).as_ref(),
  619.             ),
  620.             ("limit", "0"),
  621.         ],
  622.     )
  623.     .await
  624. }
  625.  
  626. /// Given an `ApiList`, this fn returns the first item or errors.
  627. pub fn first<T: EndpointName>(x: ApiList<T>) -> Result<T, ImlManagerCliError> {
  628.     x.objects
  629.         .into_iter()
  630.         .next()
  631.         .ok_or_else(|| ImlManagerCliError::DoesNotExist(T::endpoint_name()))
  632. }
  633.  
  634. /// Wrapper for a `GET` to the Api.
  635. pub async fn get<T: serde::de::DeserializeOwned + std::fmt::Debug>(
  636.     endpoint: &str,
  637.     query: impl serde::Serialize,
  638. ) -> Result<T, ImlManagerCliError> {
  639.     let client = iml_manager_client::get_client()?;
  640.  
  641.     iml_manager_client::get(client, endpoint, query)
  642.         .await
  643.         .map_err(|e| e.into())
  644. }
  645.  
  646. /// Wrapper for a `POST` to the Api.
  647. pub async fn post(
  648.     endpoint: &str,
  649.     body: impl serde::Serialize,
  650. ) -> Result<iml_manager_client::Response, ImlManagerCliError> {
  651.     let client = iml_manager_client::get_client()?;
  652.  
  653.     iml_manager_client::post(client, endpoint, body)
  654.         .await
  655.         .map_err(|e| e.into())
  656. }
  657.  
  658. /// Wrapper for a `PUT` to the Api.
  659. pub async fn put(
  660.     endpoint: &str,
  661.     body: impl serde::Serialize,
  662. ) -> Result<iml_manager_client::Response, ImlManagerCliError> {
  663.     let client = iml_manager_client::get_client()?;
  664.     iml_manager_client::put(client, endpoint, body)
  665.         .await
  666.         .map_err(|e| e.into())
  667. }
  668.  
  669. /// Wrapper for a `DELETE` to the Api.
  670. pub async fn delete(
  671.     endpoint: &str,
  672.     query: impl serde::Serialize,
  673. ) -> Result<iml_manager_client::Response, ImlManagerCliError> {
  674.     let client = iml_manager_client::get_client().expect("Could not create API client");
  675.     iml_manager_client::delete(client, endpoint, query)
  676.         .await
  677.         .map_err(|e| e.into())
  678. }
  679.  
  680. pub async fn get_hosts() -> Result<ApiList<Host>, ImlManagerCliError> {
  681.     get(Host::endpoint_name(), Host::query()).await
  682. }
  683.  
  684. pub async fn get_all<T: EndpointName + FlatQuery + Debug + serde::de::DeserializeOwned>(
  685. ) -> Result<ApiList<T>, ImlManagerCliError> {
  686.     get(T::endpoint_name(), T::query()).await
  687. }
  688.  
  689. pub async fn get_one<T: EndpointName + FlatQuery + Debug + serde::de::DeserializeOwned>(
  690.     query: Vec<(&str, &str)>,
  691. ) -> Result<T, ImlManagerCliError> {
  692.     let mut q = T::query();
  693.     q.extend(query);
  694.     first(get(T::endpoint_name(), q).await?)
  695. }
  696.  
  697. pub async fn get_influx<T: serde::de::DeserializeOwned + std::fmt::Debug>(
  698.     db: &str,
  699.     influxql: &str,
  700. ) -> Result<T, ImlManagerCliError> {
  701.     let client = iml_manager_client::get_client()?;
  702.     iml_manager_client::get_influx(client, db, influxql)
  703.         .await
  704.         .map_err(|e| e.into())
  705. }
  706.  
  707. // TODO debug only
  708. fn build_trees(
  709.     cmd_ids: &[i32],
  710.     commands: &HashMap<i32, RichCommand>,
  711.     jobs: &HashMap<i32, RichJob>,
  712.     steps: &HashMap<i32, RichStep>,
  713. ) -> Vec<Tree<TypedId, Specific>> {
  714.     let mut trees = Vec::with_capacity(cmd_ids.len());
  715.     for c in cmd_ids {
  716.         let cmd = &commands[&c];
  717.         if cmd.deps().iter().all(|j| jobs.contains_key(j)) {
  718.             let extract_fun = |job: &Arc<Job0>| extract_wait_fors_from_job(job, &jobs);
  719.             let jobs_graph_data = cmd
  720.                 .deps()
  721.                 .iter()
  722.                 .map(|k| RichJob::new(Arc::clone(&jobs[k].inner), extract_fun))
  723.                 .collect::<Vec<RichJob>>();
  724.             let dag = build_direct_dag(&jobs_graph_data);
  725.             let tree = build_gen_tree(cmd, &dag, &steps);
  726.             trees.push(tree);
  727.         }
  728.     }
  729.     trees
  730. }
  731.  
  732. // region functions build_fresh_items / build_gen_tree
  733. fn build_fresh_tree(
  734.     cmd_ids: &[i32],
  735.     commands: &HashMap<i32, RichCommand>,
  736.     jobs: &HashMap<i32, RichJob>,
  737.     steps: &HashMap<i32, RichStep>,
  738. ) -> Tree<TypedId, Specific> {
  739.     let mut full_tree = Tree::new();
  740.     for c in cmd_ids {
  741.         let cmd = &commands[&c];
  742.         if cmd.deps().iter().all(|j| jobs.contains_key(j)) {
  743.             let extract_fun = |job: &Arc<Job0>| extract_wait_fors_from_job(job, &jobs);
  744.             let jobs_graph_data = cmd
  745.                 .deps()
  746.                 .iter()
  747.                 .map(|k| RichJob::new(Arc::clone(&jobs[k].inner), extract_fun))
  748.                 .collect::<Vec<RichJob>>();
  749.             let dag = build_direct_dag(&jobs_graph_data);
  750.             let mut tree = build_gen_tree(cmd, &dag, &steps);
  751.             // The collapsing is needed to reduce some deep levels of the
  752.             // tree so that all the trees fit into terminal screens.
  753.             let pairs = tree.calculate_states_to_level(2);
  754.             for (id, s) in pairs {
  755.                 tree.get_node_mut(id).map(|n| {
  756.                     n.collapsed = true;
  757.                     n.state = s;
  758.                 });
  759.             }
  760.             full_tree.push(&mut tree);
  761.         } else {
  762.             let mut tree = Tree::new();
  763.             let node = Node {
  764.                 key: TypedId::Cmd(cmd.id),
  765.                 parent: None,
  766.                 deps: Vec::with_capacity(cmd.deps.len()),
  767.                 collapsed: false,
  768.                 state: cmd_state(cmd),
  769.                 inner: Specific {
  770.                     msg: cmd.message.clone(),
  771.                     console: String::new(),
  772.                     backtrace: String::new(),
  773.                 },
  774.             };
  775.             tree.add_child_node(None, node);
  776.             full_tree.push(&mut tree);
  777.         }
  778.     }
  779.     full_tree
  780. }
  781.  
  782. fn build_gen_tree(
  783.     cmd: &RichCommand,
  784.     graph: &DependencyDAG<i32, RichJob>,
  785.     steps: &HashMap<i32, RichStep>,
  786. ) -> Tree<TypedId, Specific> {
  787.     fn traverse(
  788.         graph: &DependencyDAG<i32, RichJob>,
  789.         job: Arc<RichJob>,
  790.         steps: &HashMap<i32, RichStep>,
  791.         parent: Option<TypedId>,
  792.         visited: &mut HashSet<TypedId>,
  793.         tree: &mut Tree<TypedId, Specific>,
  794.     ) {
  795.         let is_new = visited.insert(TypedId::Job(job.id));
  796.         let node = Node {
  797.             key: TypedId::Job(job.id),
  798.             parent: None,
  799.             deps: Vec::with_capacity(job.deps.len()),
  800.             collapsed: false,
  801.             state: job_state(&job),
  802.             inner: Specific {
  803.                 msg: job.description.clone(),
  804.                 console: String::new(),
  805.                 backtrace: String::new(),
  806.             },
  807.         };
  808.         let pk = tree.add_child_node(parent, node);
  809.         let new_parent = Some(pk);
  810.  
  811.         // add child jobs to the tree
  812.         if let Some(deps) = graph.links.get(&job.id()) {
  813.             if is_new {
  814.                 for d in deps {
  815.                     traverse(graph, Arc::clone(d), steps, new_parent, visited, tree);
  816.                 }
  817.             }
  818.         }
  819.         // add steps if any
  820.         for step_id in &job.steps {
  821.             if let Some(step_id) = extract_uri_id::<Step>(step_id) {
  822.                 if let Some(step) = steps.get(&step_id) {
  823.                     let node = Node {
  824.                         key: TypedId::Step(step_id),
  825.                         parent: None,
  826.                         collapsed: false,
  827.                         deps: Vec::new(),
  828.                         state: step_state(step),
  829.                         inner: Specific {
  830.                             msg: step.class_name.clone(),
  831.                             console: step.console.clone(),
  832.                             backtrace: step.backtrace.clone(),
  833.                         },
  834.                     };
  835.                     tree.add_child_node(new_parent, node);
  836.                 }
  837.             }
  838.         }
  839.     }
  840.     let mut tree = Tree::new();
  841.     let p = tree.add_child_node(
  842.         None,
  843.         Node {
  844.             key: TypedId::Cmd(cmd.id),
  845.             parent: None,
  846.             collapsed: false,
  847.             deps: vec![],
  848.             state: cmd_state(cmd),
  849.             inner: Specific {
  850.                 msg: cmd.message.clone(),
  851.                 console: String::new(),
  852.                 backtrace: String::new(),
  853.             },
  854.         },
  855.     );
  856.     tree.roots = vec![p];
  857.     let mut visited = HashSet::new();
  858.     for r in &graph.roots {
  859.         traverse(
  860.             graph,
  861.             Arc::clone(r),
  862.             steps,
  863.             Some(p),
  864.             &mut visited,
  865.             &mut tree,
  866.         );
  867.     }
  868.     tree
  869. }
  870.  
  871. pub fn calculate_and_apply_diff(
  872.     current_items: &mut Vec<Item<TypedId, Specific, ProgressBarIndicator>>,
  873.     fresh_items: &mut Vec<Item<TypedId, Specific, ProgressBarIndicator>>,
  874.     tree: &Tree<TypedId, Specific>,
  875.     multi_progress: &MultiProgress,
  876.     main_pb: &ProgressBar,
  877. ) {
  878.     let diff = calculate_diff(current_items, fresh_items);
  879.     let mut error_ids = Vec::new();
  880.     apply_diff(
  881.         current_items,
  882.         fresh_items,
  883.         &diff,
  884.         |i, y| {
  885.             let indi = ProgressBarIndicator {
  886.                 progress_bar: multi_progress.insert(i, ProgressBar::new(1_000_000)),
  887.                 active_style: Cell::new(None),
  888.             };
  889.             if y.state == State::Errored {
  890.                 error_ids.push(y.id);
  891.             }
  892.             set_progress_bar_message(&indi, y);
  893.             indi
  894.         },
  895.         |_, pb, y| set_progress_bar_message(pb, y),
  896.         |_, pb| multi_progress.remove(&pb.progress_bar),
  897.     );
  898.     // show errors, it is done with `progress_bar.println()`, just find the most upper one
  899.     if let Some(maybe_indi) = current_items.first().map(|it| &it.indicator) {
  900.         let pbi = ProgressBarIndicator {
  901.             progress_bar: main_pb.clone(),
  902.             active_style: Cell::new(None),
  903.         };
  904.         let pb = maybe_indi.as_ref().unwrap_or(&pbi);
  905.         for eid in error_ids {
  906.             if tree.contains_key(eid) {
  907.                 print_error(&tree, eid, |s| pb.progress_bar.println(s));
  908.             }
  909.         }
  910.     }
  911. }
  912.  
  913. fn set_progress_bar_message(
  914.     ind: &ProgressBarIndicator,
  915.     item: &Item<TypedId, Specific, ProgressBarIndicator>,
  916. ) {
  917.     // two styles are applied because indicatif doesn't able to set the spinner symbol
  918.     // after the progress bar completed.
  919.     let sty_aux = ProgressStyle::default_bar().template("{prefix} {spinner:.green} {msg}");
  920.     let sty_aux_finish = ProgressStyle::default_bar().template("{prefix} {msg}");
  921.  
  922.     match item.state {
  923.         State::Progressing => {
  924.             if ind.active_style.get() != Some(true) {
  925.                 ind.progress_bar.set_style(sty_aux.clone());
  926.                 ind.active_style.set(Some(true));
  927.             }
  928.             ind.progress_bar.set_prefix(&item.indent);
  929.             ind.progress_bar.set_message(&format!("{}", item.outer));
  930.         }
  931.         _ => {
  932.             if ind.active_style.get() != Some(false) {
  933.                 ind.progress_bar.set_style(sty_aux_finish.clone());
  934.                 ind.active_style.set(Some(false));
  935.             }
  936.             ind.progress_bar.set_prefix(&item.indent);
  937.             ind.progress_bar.set_message(&format!("{} {}", item.state, item.outer));
  938.         }
  939.     }
  940. }
  941. // endregion
  942.  
  943. pub fn extract_uri_id<T: EndpointName>(input: &str) -> Option<i32> {
  944.     lazy_static::lazy_static! {
  945.         static ref RE: Regex = Regex::new(r"/api/(\w+)/(\d+)/").unwrap();
  946.     }
  947.     RE.captures(input).and_then(|cap: Captures| {
  948.         let s = cap.get(1).unwrap().as_str();
  949.         let t = cap.get(2).unwrap().as_str();
  950.         if s == T::endpoint_name() {
  951.             t.parse::<i32>().ok()
  952.         } else {
  953.             None
  954.         }
  955.     })
  956. }
  957.  
  958. pub fn extract_children_from_cmd(cmd: &Command) -> (i32, Vec<i32>) {
  959.     let mut deps = cmd
  960.         .jobs
  961.         .iter()
  962.         .filter_map(|s| extract_uri_id::<Job0>(s))
  963.         .collect::<Vec<i32>>();
  964.     deps.sort();
  965.     (cmd.id, deps)
  966. }
  967.  
  968. pub fn extract_children_from_job(job: &Job0) -> (i32, Vec<i32>) {
  969.     let mut deps = job
  970.         .steps
  971.         .iter()
  972.         .filter_map(|s| extract_uri_id::<Step>(s))
  973.         .collect::<Vec<i32>>();
  974.     deps.sort();
  975.     (job.id, deps)
  976. }
  977.  
  978. pub fn extract_children_from_step(step: &Step) -> (i32, Vec<i32>) {
  979.     (step.id, Vec::new()) // steps have no descendants
  980. }
  981.  
  982. pub fn extract_wait_fors_from_job(job: &Job0, jobs: &HashMap<i32, RichJob>) -> (i32, Vec<i32>) {
  983.     // Extract the interdependencies between jobs.
  984.     // See [command_modal::tests::test_jobs_ordering]
  985.     let mut deps = job
  986.         .wait_for
  987.         .iter()
  988.         .filter_map(|s| extract_uri_id::<Job0>(s))
  989.         .collect::<Vec<i32>>();
  990.     let triple = |id: &i32| {
  991.         jobs
  992.             .get(id)
  993.             .map(|arj| (-(arj.deps.len() as i32), &arj.description[..], arj.id))
  994.             .unwrap_or((0, "", *id))
  995.     };
  996.     deps.sort_by(|i1, i2| {
  997.         let t1 = triple(i1);
  998.         let t2 = triple(i2);
  999.         t1.cmp(&t2)
  1000.     });
  1001.     (job.id, deps)
  1002. }
  1003.  
  1004. mod tests {
  1005.     use super::*;
  1006.     use crate::gen_tree::{iterate_items, is_valid};
  1007.  
  1008.     fn convert_items_to_string<K, U: Display, B>(items: &[Item<K, U, B>]) -> String {
  1009.         let mut acc = String::with_capacity(64);
  1010.         iterate_items(items, |_, s| {
  1011.             acc.push_str(&s);
  1012.             acc.push('\n');
  1013.         });
  1014.         acc
  1015.     }
  1016.  
  1017.     #[test]
  1018.     fn test_job_tree() {
  1019.         let mut commands = HashMap::new();
  1020.         let mut jobs = HashMap::new();
  1021.         let mut steps = HashMap::new();
  1022.         update_commands(&mut commands, TREE_STATE.lock().unwrap().commands.clone());
  1023.         update_jobs(&mut jobs, TREE_STATE.lock().unwrap().jobs.clone());
  1024.         update_steps(&mut steps, TREE_STATE.lock().unwrap().steps.clone());
  1025.         let cmd = commands.get(&37).unwrap();
  1026.  
  1027.         let extract_fun = |job: &Arc<Job0>| extract_wait_fors_from_job(job, &jobs);
  1028.         let jobs_graph_data = cmd
  1029.             .deps()
  1030.             .iter()
  1031.             .map(|k| RichJob::new(Arc::clone(&jobs[k].inner), extract_fun))
  1032.             .collect::<Vec<RichJob>>();
  1033.         let dag = build_direct_dag(&jobs_graph_data);
  1034.  
  1035.         let mut items: Vec<Item<TypedId, Specific, ()>> = Vec::new();
  1036.  
  1037.         for cmd in commands.values() {
  1038.             let tree = build_gen_tree(cmd, &dag, &steps);
  1039.             assert_eq!(is_valid(&tree), true);
  1040.             let cmd_items = tree.render::<Specific, ()>();
  1041.             cmd_items.into_iter().for_each(|it| items.push(it));
  1042.         }
  1043.         let output = convert_items_to_string(&items);
  1044.         println!("{}", output);
  1045.     }
  1046. }
  1047.  
  1048. const CONSOLE: &'_ str = r#"-- Logs begin at Wed 2019-07-10 16:12:42 UTC, end at Wed 2019-07-10 16:52:46 UTC. --
  1049. Jul 10 16:12:50 adm.local systemd[1]: Started IML Agent Comms Service.
  1050. Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [INFO  iml_rabbit] creating client
  1051. Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [INFO  iml_agent_comms] Starting iml-agent-comms on V4(127.0.0.1:8003)
  1052. Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [INFO  iml_rabbit] creating client
  1053. Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [ERROR iml_agent_comms] Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
  1054. Jul 10 16:12:50 adm.local iml-agent-comms[3069]: [ERROR iml_rabbit] There was an error connecting to rabbit: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
  1055. Jul 10 16:12:50 adm.local systemd[1]: iml-agent-comms.service holdoff time over, scheduling restart.
  1056. Jul 10 16:12:50 adm.local systemd[1]: Stopped IML Agent Comms Service.
  1057. Jul 10 16:12:53 adm.local systemd[1]: Started IML Agent Comms Service.
  1058. Jul 10 16:12:53 adm.local iml-agent-comms[3191]: [INFO  iml_rabbit] creating client"#;
  1059.  
  1060. const BACKTRACE: &'_ str = r#"Traceback (most recent call last):
  1061.  File "greetings.py", line 10, in greet_many
  1062.    greet(person)
  1063.  File "greetings.py", line 5, in greet
  1064.    print(greeting + ', ' + who_to_greet(someone))
  1065. TypeError: must be str, not int
  1066. During handling of the above exception, another exception occurred:
  1067. Traceback (most recent call last):
  1068.  File "greetings.py", line 14, in <module>
  1069.    greet_many(['Chad', 'Dan', 1])
  1070.  File "greetings.py", line 12, in greet_many
  1071.    print('hi, ' + person)
  1072. TypeError: must be str, not int"#;
  1073.  
  1074.  
Add Comment
Please, Sign In to add comment