Advertisement
killtdj

Untitled

Nov 20th, 2024
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 13.65 KB | None | 0 0
  1. use mysql::*;
  2. use mysql::prelude::*;
  3. use std::collections::{HashMap, HashSet, VecDeque};
  4. use std::sync::{Arc, Mutex};
  5. use tokio::sync::Semaphore;
  6. use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
  7. use chrono::NaiveDateTime;
  8. use std::error::Error;
  9. use futures::stream::{StreamExt, FuturesUnordered};
  10. use dialoguer::{Input, Select, MultiSelect, theme::ColorfulTheme};
  11. use std::time::{Instant, Duration};
  12.  
  13. const BATCH_SIZE: usize = 10000;
  14. const BUFFER_SIZE: usize = 1000;
  15. const MAX_CONCURRENT_TABLES: usize = 4;
  16. const RETRY_ATTEMPTS: u32 = 3;
  17. const RETRY_DELAY: Duration = Duration::from_secs(2);
  18. const DROP_EXISTING: bool = true;
  19.  
  20. #[derive(Debug)]
  21. struct TableReference {
  22.     table_name: String,
  23.     referenced_tables: Vec<String>,
  24. }
  25.  
  26. #[derive(Debug)]
  27. struct TableStats {
  28.     row_count: i64,
  29.     size_mb: f64,
  30.     estimated_time: Duration,
  31. }
  32.  
  33. struct InsertBuffer {
  34.     rows: VecDeque<Vec<Value>>,
  35.     capacity: usize,
  36. }
  37.  
  38. impl InsertBuffer {
  39.     fn new(capacity: usize) -> Self {
  40.         Self {
  41.             rows: VecDeque::with_capacity(capacity),
  42.             capacity,
  43.         }
  44.     }
  45.  
  46.     fn push(&mut self, row: Vec<Value>) -> bool {
  47.         self.rows.push_back(row);
  48.         self.rows.len() >= self.capacity
  49.     }
  50.  
  51.     fn flush(&mut self) -> Vec<Vec<Value>> {
  52.         let mut rows = VecDeque::with_capacity(self.capacity);
  53.         std::mem::swap(&mut self.rows, &mut rows);
  54.         rows.into_iter().collect()
  55.     }
  56. }
  57.  
  58. #[tokio::main]
  59. async fn main() -> Result<(), Box<dyn Error>> {
  60.     println!("MySQL Database Copier v2.0");
  61.     println!("-------------------------");
  62.  
  63.     let source_url = Input::<String>::new()
  64.         .with_prompt("Source database URL")
  65.         .default("mysql://root:password@localhost:3306/source_db".to_string())
  66.         .interact()?;
  67.    
  68.     let target_url = Input::<String>::new()
  69.         .with_prompt("Target database URL")
  70.         .default("mysql://root:password@localhost:3306/target_db".to_string())
  71.         .interact()?;
  72.    
  73.     let source_pool = Pool::new(&source_url)?;
  74.     let target_pool = Pool::new(&target_url)?;
  75.    
  76.     let all_tables = get_sorted_tables(&source_pool)?;
  77.    
  78.     println!("\nAvailable tables (in dependency order):");
  79.     println!("-------------------------------------");
  80.    
  81.     let mut total_size = 0.0;
  82.     for (i, table) in all_tables.iter().enumerate() {
  83.         let stats = get_table_stats(&source_pool, table)?;
  84.         total_size += stats.size_mb;
  85.         println!("{}. {} ({:.2} MB, {} rows, est. time: {:.1}s)",
  86.             i + 1, table, stats.size_mb, stats.row_count, stats.estimated_time.as_secs_f64());
  87.     }
  88.    
  89.     println!("\nTotal size: {:.2} MB", total_size);
  90.  
  91.     let options = vec!["Copy all tables", "Select specific tables"];
  92.     let selection = Select::with_theme(&ColorfulTheme::default())
  93.         .with_prompt("\nSelect operation mode")
  94.         .items(&options)
  95.         .default(0)
  96.         .interact()?;
  97.    
  98.     let tables_to_copy = if selection == 0 {
  99.         all_tables.clone()
  100.     } else {
  101.         let selections = MultiSelect::with_theme(&ColorfulTheme::default())
  102.             .with_prompt("Select tables to copy")
  103.             .items(&all_tables)
  104.             .interact()?;
  105.            
  106.         selections.into_iter()
  107.             .map(|i| all_tables[i].clone())
  108.             .collect()
  109.     };
  110.    
  111.     let start_time = Instant::now();
  112.     let multi_progress = Arc::new(MultiProgress::new());
  113.    
  114.     let total_tables = tables_to_copy.len();
  115.     let overall_progress = multi_progress.add(ProgressBar::new(total_tables as u64));
  116.     overall_progress.set_style(ProgressStyle::default_bar()
  117.         .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} tables ({percent}%) {msg}")
  118.         .progress_chars("##-"));
  119.     overall_progress.set_message("Overall progress");
  120.    
  121.     let buffer = Arc::new(Mutex::new(InsertBuffer::new(BUFFER_SIZE)));
  122.    
  123.     let result = copy_tables_parallel(
  124.         &source_pool,
  125.         &target_pool,
  126.         &tables_to_copy,
  127.         multi_progress.clone(),
  128.         overall_progress.clone(),
  129.         buffer
  130.     ).await;
  131.    
  132.     match result {
  133.         Ok(_) => {
  134.             let duration = start_time.elapsed();
  135.             println!("\nCopy completed successfully!");
  136.             println!("Total time: {:.2} seconds", duration.as_secs_f64());
  137.             println!("Average speed: {:.2} MB/s", total_size / duration.as_secs_f64());
  138.             println!("Tables copied: {}", total_tables);
  139.         },
  140.         Err(e) => {
  141.             println!("\nError during copy: {}", e);
  142.             println!("Check logs for details");
  143.         }
  144.     }
  145.    
  146.     Ok(())
  147. }
  148.  
  149. // ... [Previous functions: copy_tables_parallel, copy_table, handle_datetime_value, etc.]
  150. async fn copy_tables_parallel(
  151.     source_pool: &Pool,
  152.     target_pool: &Pool,
  153.     tables: &[String],
  154.     multi_progress: Arc<MultiProgress>,
  155.     overall_progress: ProgressBar,
  156.     buffer: Arc<Mutex<InsertBuffer>>,
  157. ) -> Result<(), Box<dyn Error>> {
  158.     let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_TABLES));
  159.     let mut tasks = FuturesUnordered::new();
  160.  
  161.     for table in tables {
  162.         let table = table.clone();
  163.         let source_pool = source_pool.clone();
  164.         let target_pool = target_pool.clone();
  165.         let semaphore = Arc::clone(&semaphore);
  166.         let multi_progress = Arc::clone(&multi_progress);
  167.         let buffer = Arc::clone(&buffer);
  168.  
  169.         let task = tokio::spawn(async move {
  170.             let _permit = semaphore.acquire().await?;
  171.             let result = copy_table_with_retry(
  172.                 &source_pool,
  173.                 &target_pool,
  174.                 &table,
  175.                 &multi_progress,
  176.                 &buffer
  177.             ).await;
  178.             result
  179.         });
  180.  
  181.         tasks.push(task);
  182.     }
  183.  
  184.     while let Some(result) = tasks.next().await {
  185.         result??;
  186.         overall_progress.inc(1);
  187.     }
  188.  
  189.     overall_progress.finish_with_message("All tables copied successfully!");
  190.     Ok(())
  191. }
  192.  
  193. async fn copy_table_with_retry(
  194.     source_pool: &Pool,
  195.     target_pool: &Pool,
  196.     table: &str,
  197.     multi_progress: &MultiProgress,
  198.     buffer: &Arc<Mutex<InsertBuffer>>,
  199. ) -> Result<(), Box<dyn Error>> {
  200.     let mut attempts = 0;
  201.     loop {
  202.         match copy_table(source_pool, target_pool, table, multi_progress, buffer).await {
  203.             Ok(_) => return Ok(()),
  204.             Err(e) => {
  205.                 attempts += 1;
  206.                 if attempts >= RETRY_ATTEMPTS {
  207.                     return Err(e);
  208.                 }
  209.                 println!("Retrying table {} ({}/{}): {}",
  210.                     table, attempts, RETRY_ATTEMPTS, e);
  211.                 tokio::time::sleep(RETRY_DELAY).await;
  212.             }
  213.         }
  214.     }
  215. }
  216.  
  217. async fn copy_table(
  218.     source_pool: &Pool,
  219.     target_pool: &Pool,
  220.     table: &str,
  221.     multi_progress: &MultiProgress,
  222.     buffer: &Arc<Mutex<InsertBuffer>>,
  223. ) -> Result<(), Box<dyn Error>> {
  224.     let mut source_conn = source_pool.get_conn()?;
  225.     let mut target_conn = target_pool.get_conn()?;
  226.  
  227.     // Handle existing table
  228.     let table_exists: bool = target_conn
  229.         .query_first(format!(
  230.             "SELECT 1 FROM information_schema.tables
  231.             WHERE table_schema = DATABASE()
  232.             AND table_name = '{}'",
  233.             table
  234.         ))?
  235.         .unwrap_or(0);
  236.  
  237.     if table_exists {
  238.         if DROP_EXISTING {
  239.             target_conn.query_drop(format!("DROP TABLE {}", table))?;
  240.         } else {
  241.             target_conn.query_drop(format!("TRUNCATE TABLE {}", table))?;
  242.         }
  243.     }
  244.  
  245.     // Create table
  246.     let create_stmt: String = source_conn
  247.         .query_first(format!("SHOW CREATE TABLE {}", table))?
  248.         .unwrap_or_default();
  249.     target_conn.query_drop(&create_stmt)?;
  250.  
  251.     let count: i64 = source_conn
  252.         .query_first(format!("SELECT COUNT(*) FROM {}", table))?
  253.         .unwrap_or(0);
  254.  
  255.     let progress = multi_progress.add(ProgressBar::new(count as u64));
  256.     progress.set_style(ProgressStyle::default_bar()
  257.         .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} ({percent}%) {msg}")
  258.         .progress_chars("##-"));
  259.     progress.set_message(format!("Copying {}", table));
  260.  
  261.     let mut offset = 0;
  262.     loop {
  263.         let batch: Vec<Row> = source_conn.query(format!(
  264.             "SELECT * FROM {} LIMIT {}, {}",
  265.             table, offset, BATCH_SIZE
  266.         ))?;
  267.  
  268.         if batch.is_empty() {
  269.             break;
  270.         }
  271.  
  272.         for row in batch {
  273.             let mut row_values = Vec::new();
  274.             for (i, column) in row.columns().iter().enumerate() {
  275.                 let value = match row.get_raw(i) {
  276.                     Some(val) => {
  277.                         if column.column_type() == ColumnType::DATETIME {
  278.                             handle_datetime_value(val)?
  279.                         } else {
  280.                             val.clone()
  281.                         }
  282.                     }
  283.                     None => Value::NULL,
  284.                 };
  285.                 row_values.push(value);
  286.             }
  287.  
  288.             let mut buffer = buffer.lock().unwrap();
  289.             if buffer.push(row_values) {
  290.                 flush_buffer(&mut target_conn, table, buffer.flush())?;
  291.             }
  292.         }
  293.  
  294.         offset += BATCH_SIZE;
  295.         progress.set_position(offset as u64);
  296.     }
  297.  
  298.     // Flush remaining rows
  299.     let mut buffer = buffer.lock().unwrap();
  300.     if !buffer.rows.is_empty() {
  301.         flush_buffer(&mut target_conn, table, buffer.flush())?;
  302.     }
  303.  
  304.     progress.finish_with_message(format!("Completed {}", table));
  305.     Ok(())
  306. }
  307.  
  308. fn handle_datetime_value(val: Value) -> Result<Value, Box<dyn Error>> {
  309.     match val {
  310.         Value::Date(y, m, d, h, i, s, _) => {
  311.             if y == 0 || m == 0 || d == 0 {
  312.                 Ok(Value::NULL)
  313.             } else {
  314.                 match NaiveDateTime::parse_from_str(
  315.                     &format!("{:04}-{:02}-{:02} {:02}:{:02}:{:02}", y, m, d, h, i, s),
  316.                     "%Y-%m-%d %H:%M:%S"
  317.                 ) {
  318.                     Ok(_) => Ok(val),
  319.                     Err(_) => Ok(Value::NULL),
  320.                 }
  321.             }
  322.         }
  323.         _ => Ok(val),
  324.     }
  325. }
  326.  
  327. fn get_sorted_tables(pool: &Pool) -> Result<Vec<String>, Box<dyn Error>> {
  328.     let mut conn = pool.get_conn()?;
  329.     let mut table_refs: Vec<TableReference> = Vec::new();
  330.    
  331.     let tables: Vec<String> = conn.query("SHOW TABLES")?;
  332.    
  333.     for table in &tables {
  334.         let refs: Vec<String> = conn.query(format!(
  335.             "SELECT REFERENCED_TABLE_NAME
  336.             FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
  337.             WHERE TABLE_SCHEMA = DATABASE()
  338.             AND TABLE_NAME = '{}'
  339.             AND REFERENCED_TABLE_NAME IS NOT NULL", table))?;
  340.        
  341.         table_refs.push(TableReference {
  342.             table_name: table.clone(),
  343.             referenced_tables: refs,
  344.         });
  345.     }
  346.    
  347.     Ok(topological_sort(table_refs)?)
  348. }
  349.  
  350. fn topological_sort(table_refs: Vec<TableReference>) -> Result<Vec<String>, Box<dyn Error>> {
  351.     let mut result = Vec::new();
  352.     let mut visited = HashSet::new();
  353.     let mut temp_visited = HashSet::new();
  354.    
  355.     let ref_map: HashMap<_, _> = table_refs.iter()
  356.         .map(|r| (r.table_name.clone(), r.referenced_tables.clone()))
  357.         .collect();
  358.    
  359.     fn visit(
  360.         table: &str,
  361.         ref_map: &HashMap<String, Vec<String>>,
  362.         visited: &mut HashSet<String>,
  363.         temp_visited: &mut HashSet<String>,
  364.         result: &mut Vec<String>,
  365.     ) -> Result<(), Box<dyn Error>> {
  366.         if temp_visited.contains(table) {
  367.             return Err("Circular dependency detected".into());
  368.         }
  369.         if visited.contains(table) {
  370.             return Ok(());
  371.         }
  372.        
  373.         temp_visited.insert(table.to_string());
  374.        
  375.         if let Some(refs) = ref_map.get(table) {
  376.             for ref_table in refs {
  377.                 visit(ref_table, ref_map, visited, temp_visited, result)?;
  378.             }
  379.         }
  380.        
  381.         temp_visited.remove(table);
  382.         visited.insert(table.to_string());
  383.         result.push(table.to_string());
  384.        
  385.         Ok(())
  386.     }
  387.    
  388.     for table in ref_map.keys() {
  389.         if !visited.contains(table.as_str()) {
  390.             visit(table, &ref_map, &mut visited, &mut temp_visited, &mut result)?;
  391.         }
  392.     }
  393.    
  394.     Ok(result)
  395. }
  396.  
  397. fn get_table_stats(pool: &Pool, table: &str) -> Result<TableStats, Box<dyn Error>> {
  398.     let mut conn = pool.get_conn()?;
  399.     let (row_count, size_bytes): (i64, i64) = conn.query_first(format!(
  400.         "SELECT table_rows, data_length + index_length
  401.         FROM information_schema.tables
  402.         WHERE table_schema = DATABASE()
  403.         AND table_name = '{}'", table
  404.     ))?.unwrap_or((0, 0));
  405.  
  406.     let size_mb = size_bytes as f64 / (1024.0 * 1024.0);
  407.     let estimated_time = Duration::from_secs((size_mb * 0.5) as u64); // Rough estimate
  408.  
  409.     Ok(TableStats {
  410.         row_count,
  411.         size_mb,
  412.         estimated_time,
  413.     })
  414. }
  415.  
  416. fn flush_buffer(
  417.     conn: &mut Conn,
  418.     table: &str,
  419.     rows: Vec<Vec<Value>>,
  420. ) -> Result<(), Box<dyn Error>> {
  421.     if rows.is_empty() {
  422.         return Ok(());
  423.     }
  424.  
  425.     let placeholders = format!(
  426.         "({})",
  427.         (0..rows[0].len())
  428.             .map(|_| "?")
  429.             .collect::<Vec<_>>()
  430.             .join(", ")
  431.     );
  432.  
  433.     let values_str = format!(
  434.         "{}",
  435.         vec![placeholders.as_str(); rows.len()].join(", ")
  436.     );
  437.  
  438.     let insert_query = format!(
  439.         "INSERT INTO {} VALUES {}",
  440.         table, values_str
  441.     );
  442.  
  443.     let flat_values: Vec<Value> = rows.into_iter().flatten().collect();
  444.     conn.exec_batch(&insert_query, flat_values)?;
  445.  
  446.     Ok(())
  447. }
  448.  
  449.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement