Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import subprocess
- import click
- import tomli
- import toml
- from rich.console import Console
- from rich.table import Table
- console = Console()
- COLORS = {
- "red": "\033[31m",
- "green": "\033[32m",
- "yellow": "\033[33m",
- "blue": "\033[34m",
- "reset": "\033[0m",
- }
- SORTABLE_FIELDS = ["name", "description", "latest_version"]
- class ManagerPool:
- ALLOWED_EXTRA_OPTION = [
- "--verbose",
- "--all",
- ]
- class PackageManager:
- def __init__(self, manager_pool):
- self._manager_pool = manager_pool
- def get_pool(self):
- raise NotImplementedError()
- class PythonPackageManager(PackageManager):
- def __init__(self, manager_pool):
- super().__init__(manager_pool)
- def get_pool(self):
- command = ["pip", "search", "-v"]
- result = self._manager_pool.run_command(command)
- package_data = result.stdout.strip().split("\n")
- packages = []
- for package_str in package_data:
- package_str = package_str.strip()
- package_data = package_str.split(" ")
- name, description, version = (
- package_data[0],
- " ".join(package_data[1:-1]),
- package_data[-1],
- )
- packages.append({"name": name, "description": description, "latest_version": version})
- return packages
- class ManagerPool:
- ALLOWED_EXTRA_OPTION = [
- "--verbose",
- "--all",
- ]
- def run_command(self, command, capture_output=True):
- try:
- process = subprocess.run(
- command, check=True, capture_output=capture_output, text=True
- )
- return process
- except subprocess.CalledProcessError as e:
- console.print(f"Error: {e}\n{e.stderr}", style="red")
- raise click.Abort()
- def print_table(results, fields=None):
- if not fields:
- fields = results[0].keys()
- table = Table(show_header=True, header_style="bold")
- for field in fields:
- table.add_column(field)
- for result in results:
- row = []
- for field in fields:
- row.append(str(result.get(field, "")))
- table.add_row(*row)
- console.print(table)
- manager_pool = ManagerPool()
- @click.group()
- def cli():
- pass
- @cli.command()
- @click.argument("name")
- @click.option(
- "--version",
- help="Version to search for.",
- required=True,
- )
- def precise_search_cli(name, version, verbose=False):
- console.print(f"Searching for package {name} version {version}...")
- command = ["mpm", "search", name, "--version", version]
- if verbose:
- console.print(f"Running command {' '.join(command)}")
- result = manager_pool.run_command(command)
- if result.returncode == 0:
- print_table([{"name": name, "description": result.stdout}], fields=["name", "description"])
- else:
- console.print(f"Could not find package {name} version {version}", style="red")
- @cli.command()
- @click.option("-v", "--verbose", count=True, help="Increase verbosity of output")
- @click.argument("package")
- def install(verbose, package):
- """
- Install a package
- Example usage: `mpm install <package>`
- """
- console.print(f"Installing {package}...")
- command = ["mpm", "install", package]
- if verbose:
- console.print("Running the following command:\n", style="cyan")
- console.print(f" {' '.join(command)}", style="cyan")
- console.print("\n")
- @cli.command()
- @click.argument("name")
- @click.option(
- "--version",
- help="Version to search for.",
- required=True,
- )
- def precise_search_cli(name, version, verbose=False):
- console.print(f"Searching for package {name} version {version}...")
- command = ["mpm", "search", name, "--version", version]
- if verbose:
- console.print("Running the following command:\n", style="cyan")
- console.print(f" {' '.join(command)}", style="cyan")
- console.print("\n")
- run_command(command)
- @cli.command()
- @click.option(
- "--overwrite",
- is_flag=True,
- help="Overwrite the existing TOML file.",
- )
- @click.option(
- "--merge",
- is_flag=True,
- help="Merge the snapshot with the existing TOML file.",
- )
- @click.option(
- "--update-version",
- is_flag=True,
- help="Update the version numbers in the TOML file.",
- )
- @click.option(
- "-v",
- "--verbose",
- is_flag=True,
- help="Increase verbosity of output.",
- )
- @click.argument("toml_path", default="")
- def snapshot(toml_path, overwrite, merge, update_version, verbose=False):
- if not toml_path:
- toml_path = os.path.expanduser("~/.config/mpm/packages.toml")
- # If the TOML file does not exist or is empty, prompt the user to create a new snapshot or correct the TOML file manually.
- if not os.path.exists(toml_path) or os.path.getsize(toml_path) == 0:
- console.print(f"Error: {toml_path} does not exist or is empty. Create a new snapshot or correct the TOML file manually.", style="red")
- return
- try:
- with open(toml_path, "rb") as toml_file:
- tomli.load(toml_file)
- except tomli.TOMLDecodeError as e:
- console.print(f"Error: Invalid TOML data in {toml_path}. {str(e)}", style="red")
- return
- command = ["mpm", "snapshot"]
- if overwrite:
- command.append("--overwrite")
- if merge:
- command.append("--merge")
- if update_version:
- command.append("--update-version")
- command.append(toml_path)
- run_command(command, verbose=verbose)
- @cli.command()
- def remove_duplicates():
- command = ["mpm", "duplicates"]
- run_command(command)
- @cli.command()
- @click.argument("name")
- def search(name):
- command = ["mpm", "search", name]
- run_command(command)
- @cli.command()
- @click.argument("toml_path", default="")
- @click.option(
- "--overwrite",
- is_flag=True,
- help="Overwrite existing snapshot TOML file.",
- )
- @click.option(
- "--merge",
- is_flag=True,
- help="Merge existing snapshot TOML file with new snapshot.",
- )
- @click.option(
- "--update-version",
- is_flag=True,
- help="Update the package version of an existing package in the snapshot.",
- )
- @click.option(
- "--verbose",
- is_flag=True,
- help="Increase verbosity of output.",
- )
- def snapshot(toml_path, overwrite, merge, update_version, verbose):
- """
- Create or update a TOML file containing installed packages using mpm.
- Example usage: `mpm snapshot`
- """
- if not toml_path:
- toml_path = os.path.expanduser
- @cli.command()
- @click.option(
- "--overwrite",
- "-o",
- is_flag=True,
- default=False,
- help="Overwrite existing snapshot TOML file.",
- )
- @click.option(
- "--merge",
- "-m",
- is_flag=True,
- default=False,
- help="Merge existing snapshot TOML file with new snapshot.",
- )
- @click.option(
- "--update-version",
- "-u",
- is_flag=True,
- default=False,
- help="Update the package version of an existing package in the snapshot.",
- )
- @click.argument("toml_path", default="")
- @click.option(
- "--verbose",
- "-v",
- is_flag=True,
- default=False,
- help="Increase verbosity of output.",
- )
- def snapshot(toml_path, overwrite, merge, update_version, verbose):
- """
- Create or update a TOML file containing installed packages using mpm.
- Example usage: `mpm snapshot`
- """
- if not toml_path:
- toml_path = os.path.expanduser("~/.config/mpm/packages.toml")
- # If the TOML file does not exist or is empty, prompt the user to create a new snapshot or correct the TOML file manually.
- if not os.path.exists(toml_path) or os.path.getsize(toml_path) == 0:
- console.print(
- f"{COLORS['red']}Error: {toml_path} does not exist or is empty. Create a new snapshot or correct the TOML file manually.{COLORS['reset']}"
- )
- return
- try:
- with open(toml_path, "rb") as toml_file:
- tomli.load(toml_file)
- except tomli.TOMLDecodeError as e:
- console.print(
- f"{COLORS['red']}Error: Invalid TOML data in {toml_path}. {str(e)}{COLORS['reset']}"
- )
- return
- command = ["mpm", "snapshot"]
- if overwrite:
- command.append("--overwrite")
- if merge:
- command.append("--merge")
- if update_version:
- command.append("--update-version")
- command.append(toml_path)
- if verbose:
- console.print(click.style("Running the following command:", fg="cyan"))
- console.print(click.style(f" {' '.join(command)}", fg="cyan"))
- console.print("\n")
- run_command(command)
- @cli.command()
- def extra_options():
- """
- Run a command with extra options using mpm.
- Example usage: `mpm extra-options`
- """
- allowed_extra_options = ManagerPool.ALLOWED_EXTRA_OPTION
- console.print("Allowed extra options:")
- for option in allowed_extra_options:
- console.print(f"- {option}")
- selected_options = input("Enter the extra options you want to run (separated by spaces): ")
- extra_options = selected_options.split()
- for option in extra_options:
- if option not in allowed_extra_options:
- console.print(f"Invalid extra option: {option}")
- return
- command = ["mpm", "list", "--output-format", "table"]
- command.extend(extra_options)
- run_command(command)
- @cli.command()
- def console_print():
- """
- List all installed packages using mpm.
- Example usage: `mpm console-print`
- """
- command = ["mpm", "installed"]
- run_command(command)
- @cli.command()
- def precise_search():
- """
- Search for a specific package using mpm.
- Example usage: `mpm precise-search`
- """
- manager_pool = ManagerPool()
- package_manager = PackageManager(manager_pool)
- pool = package_manager.get_pool()
- package_name = input("Enter package name to search for: ")
- results = pool.search(package_name)
- print_table(results, fields=SORTABLE_FIELDS)
- if name == "main":
- cli()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement