Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import speedtest
- import git
- from git.remote import RemoteProgress
- import os
- import datetime
- import time
- import shutil
- from pathlib import Path
- import traceback
- import csv
- def run_speed_test(max_retries=3, retry_delay=5):
- last_error = None
- for attempt in range(max_retries):
- try:
- st = speedtest.Speedtest(secure=True) # Force HTTPS
- st.get_servers() # Get list of servers
- st.get_best_server() # Get best server
- # Run tests
- download_speed = st.download() / 1_000_000 # Convert to Mbps
- upload_speed = st.upload() / 1_000_000 # Convert to Mbps
- ping = st.results.ping
- return download_speed, upload_speed, ping
- except Exception as e:
- last_error = str(e)
- if "403" in str(e):
- print(f"Attempt {attempt + 1}/{max_retries}: Speedtest.net rate limit or authentication error")
- else:
- print(f"Attempt {attempt + 1}/{max_retries}: Speed test failed - {str(e)}")
- if attempt < max_retries - 1:
- time.sleep(retry_delay * (attempt + 1)) # Exponential backoff
- raise Exception(f"Speed test failed after {max_retries} attempts. Last error: {last_error}")
- def clone_repository(repo_url, target_dir, file_handle):
- try:
- log_message(f"Creating directory {target_dir}", file_handle)
- os.makedirs(target_dir)
- # Create progress printer for Git operations
- stage_times = {}
- stage_start = None
- current_stage = None
- last_line_length = 0
- def progress_printer(op_code, cur_count, max_count=None, message=''):
- nonlocal stage_start, current_stage, last_line_length
- # Determine the current stage
- new_stage = []
- if op_code & RemoteProgress.COUNTING:
- new_stage.append("Counting")
- if op_code & RemoteProgress.COMPRESSING:
- new_stage.append("Compressing")
- if op_code & RemoteProgress.WRITING:
- new_stage.append("Writing")
- if op_code & RemoteProgress.RECEIVING:
- new_stage.append("Receiving")
- if op_code & RemoteProgress.RESOLVING:
- new_stage.append("Resolving")
- if op_code & RemoteProgress.FINDING_SOURCES:
- new_stage.append("Finding sources")
- new_stage = " & ".join(new_stage) if new_stage else f"Stage {op_code}"
- # If stage changed, record timing and log the new stage
- if new_stage != current_stage:
- if stage_start and current_stage:
- duration = time.time() - stage_start
- if current_stage in stage_times:
- stage_times[current_stage] += duration
- else:
- stage_times[current_stage] = duration
- # Clear the last progress line and log the completed stage
- print(' ' * last_line_length + '\r', end='')
- log_message(f"Completed {current_stage}", file_handle)
- current_stage = new_stage
- stage_start = time.time()
- print(f"\nStarting {current_stage}...")
- # Update progress percentage on same line
- if max_count:
- progress = f"{current_stage}: {cur_count}/{max_count} ({cur_count/max_count*100:.1f}%)\r"
- print(progress, end='')
- last_line_length = len(progress)
- # Clone with timing information
- log_message("Starting Git clone operation...", file_handle)
- clone_start = time.time()
- repo = git.Repo.clone_from(repo_url, target_dir, progress=progress_printer)
- print() # Add newline after progress display
- # Record final stage timing
- if stage_start and current_stage:
- duration = time.time() - stage_start
- if current_stage in stage_times:
- stage_times[current_stage] += duration
- else:
- stage_times[current_stage] = duration
- # Log timing summary
- total_time = time.time() - clone_start
- log_message("\nGit clone timing summary:", file_handle)
- for stage, duration in stage_times.items():
- log_message(f"{stage}: {duration:.2f} seconds", file_handle)
- log_message(f"Total clone time: {total_time:.2f} seconds", file_handle)
- return True, "Repository cloned successfully", repo
- except Exception as e:
- log_message(f"Clone error details:\n{traceback.format_exc()}", file_handle)
- return False, str(e), None
- def log_message(message, file_handle):
- print(message)
- file_handle.write(message + "\n")
- file_handle.flush()
- def do_speed_test(f):
- # Speed test
- log_message("Running speed test...", f)
- try:
- download, upload, ping = run_speed_test()
- log_message(f"Download Speed: {download:.2f} Mbps", f)
- log_message(f"Upload Speed: {upload:.2f} Mbps", f)
- log_message(f"Ping: {ping:.2f} ms", f)
- except Exception as e:
- log_message(f"Speed test failed: {str(e)}", f)
- log_message("\n" + "-" * 50, f)
- log_message("", f)
- def cleanup_directory(directory, repo, file_handle):
- max_attempts = 4
- delay = 2
- for attempt in range(max_attempts):
- try:
- if os.path.exists(directory):
- # Clean up Git repository resources
- if repo is not None:
- try:
- for submodule in repo.submodules:
- submodule.remove()
- repo.git.clear_cache()
- repo.close()
- except Exception as git_error:
- log_message(f"Git cleanup warning: {str(git_error)}\n{traceback.format_exc()}", file_handle)
- time.sleep(delay * attempt)
- try:
- # Try removing files first
- for root, dirs, files in os.walk(directory, topdown=False):
- for name in files:
- file_path = os.path.join(root, name)
- try:
- os.chmod(file_path, 0o777) # Ensure we have permission
- os.unlink(file_path)
- except Exception as e:
- log_message(f"Failed to remove file {file_path}: {str(e)}", file_handle)
- # Then try rmtree
- shutil.rmtree(directory, ignore_errors=False) # Changed to False to get error details
- if not os.path.exists(directory):
- log_message(f"Successfully removed directory at {directory}", file_handle)
- return True
- except Exception as remove_error:
- log_message(f"Detailed removal error: {str(remove_error)}\n{traceback.format_exc()}", file_handle)
- else:
- return True
- except Exception as e:
- log_message(f"Cleanup attempt {attempt + 1} failed: {str(e)}\n{traceback.format_exc()}", file_handle)
- time.sleep(delay)
- return False
- def write_metrics_to_csv(metrics):
- csv_file = "network_tests.csv"
- file_exists = os.path.exists(csv_file)
- with open(csv_file, 'a', newline='') as f:
- writer = csv.writer(f)
- if not file_exists:
- writer.writerow([
- 'Start Time', 'Log File',
- 'Initial Ping (ms)', 'Initial Download (Mbps)', 'Initial Upload (Mbps)',
- 'Git Clone Status', 'Git Execution Time (s)',
- 'Final Ping (ms)', 'Final Download (Mbps)', 'Final Upload (Mbps)'
- ])
- writer.writerow(metrics)
- def main(repo_url):
- # Setup logging
- start_time = datetime.datetime.now()
- timestamp = start_time.strftime("%Y%m%d_%H%M%S")
- log_file = f"network_test_{timestamp}.log"
- # Initialize metrics
- initial_ping = initial_download = initial_upload = 0
- final_ping = final_download = final_upload = 0
- git_success = False
- git_execution_time = 0
- with open(log_file, "w") as f:
- log_message(f"Git Network Test - {start_time}", f)
- log_message("-" * 50, f)
- log_message("", f)
- # Initial speed test with retry
- log_message("Running initial speed test...", f)
- try:
- initial_download, initial_upload, initial_ping = run_speed_test()
- log_message(f"Download Speed: {initial_download:.2f} Mbps", f)
- log_message(f"Upload Speed: {initial_upload:.2f} Mbps", f)
- log_message(f"Ping: {initial_ping:.2f} ms", f)
- except Exception as e:
- log_message(f"Speed test failed: {str(e)}", f)
- log_message("\n" + "-" * 50 + "\n", f)
- # Git clone test
- git_start_time = time.time()
- target_dir = f"test_clone_{timestamp}"
- log_message(f"Attempting to clone repository: {repo_url}", f)
- success, message, repo = clone_repository(repo_url, target_dir, f)
- log_message(f"Clone result: {message}", f)
- log_message(f"Clone success: {success}", f)
- git_success = success
- git_execution_time = time.time() - git_start_time
- log_message(f"\nTotal execution time: {git_execution_time:.2f} seconds", f)
- # Cleanup after successful clone
- log_message(f"Attempting to remove cloned directory at {target_dir}", f)
- if not cleanup_directory(target_dir, repo if success else None, f):
- log_message(f"WARNING: Failed to remove directory {target_dir}", f)
- log_message("\n" + "-" * 50 + "\n", f)
- # Final speed test with retry
- log_message("Running final speed test...", f)
- try:
- final_download, final_upload, final_ping = run_speed_test()
- log_message(f"Download Speed: {final_download:.2f} Mbps", f)
- log_message(f"Upload Speed: {final_upload:.2f} Mbps", f)
- log_message(f"Ping: {final_ping:.2f} ms", f)
- except Exception as e:
- log_message(f"Speed test failed: {str(e)}", f)
- log_message("\n" + "-" * 50 + "\n", f)
- # Write metrics to CSV
- metrics = [
- start_time.isoformat(),
- log_file,
- f"{initial_ping:.2f}",
- f"{initial_download:.2f}",
- f"{initial_upload:.2f}",
- "Success" if git_success else "Fail",
- f"{git_execution_time:.2f}",
- f"{final_ping:.2f}",
- f"{final_download:.2f}",
- f"{final_upload:.2f}"
- ]
- write_metrics_to_csv(metrics)
- if __name__ == "__main__":
- main("https://github.com/skia4delphi/skia4delphi.git")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement