Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # OPSWAT File Scanner
- # Tool and Tutorial by Michael Errington
- # Step-by-Step Installation Guide
- # Step 1: Install Python
- # Ensure that Python is installed on your system. If not, download the latest version from [Python Downloads](https://www.python.org/downloads/) and follow the installation instructions.
- # Step 2: Install Dependencies
- # Open a terminal, navigate to the tool's directory, and install the required Python packages:
- ```bash
- pip install requests tqdm prompt_toolkit
- ```
- # Step 3: Acquire OPSWAT API Key
- # If you don't have an OPSWAT API key, sign up on the OPSWAT Portal to obtain one.
- # Step 4: Run the Scanner
- # Open a terminal and run the scanner using the following command:
- ```bash
- python scanner.py
- ```
- # Step 5: Enter Your OPSWAT API Key
- # The tool will prompt you to enter your OPSWAT API key. Copy and paste it from the OPSWAT Portal and press Enter.
- # Step 6: Enter the Path to the File
- # Provide the full path to the file you want to scan and press Enter. For best results, use the absolute path.
- # Step 7: View Scan Results
- # The tool will upload the file for scanning and display the results. If the file has been scanned before, previous results will be shown; otherwise, a new scan will be initiated.
- # Additional Options:
- # - Retry Limit:** Adjust the number of retries when polling for scan status using the `--retry-limit` option:
- ```bash
- python scanner.py FILE_PATH --api-key API_KEY --retry-limit RETRY_LIMIT
- ```
- # Replace `FILE_PATH` with your file's path and `API_KEY` with your OPSWAT API key.
- ---
- # This step-by-step guide, accompanied by the OPSWAT File Scanner, has been created and provided by Michael Errington to facilitate a seamless user experience in utilizing the tool for file scanning purposes.
- import os
- import hashlib
- import requests
- import time
- import argparse
- from tqdm import tqdm
- from requests.adapters import HTTPAdapter
- from requests.packages.urllib3.util.retry import Retry
- from prompt_toolkit import prompt
- from prompt_toolkit.shortcuts import radiolist_dialog
- class MetadefenderScanner:
- def __init__(self, api_key, retry_limit = 3):
- self.api_key = api_key
- self.api_url = "https://api.metadefender.com/v4"
- self.headers = {
- "apikey": api_key
- }
- self.chunk_size = 4096
- self.retry_limit = retry_limit
- self.session = self._create_session()
- def _create_session(self):
- """Create a requests session with retry logic."""
- session = requests.Session()
- retries = Retry(total = self.retry_limit, backoff_factor = 0.1, status_forcelist = [500, 502, 503, 504])
- adapter = HTTPAdapter(max_retries = retries)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
- return session
- def _calculate_checksum(self, file_path, algorithm = "md5"):
- """Calculate checksum (default: md5) of a file."""
- hasher = hashlib.new(algorithm)
- with open(file_path, "rb") as file:
- for chunk in iter(lambda: file.read(self.chunk_size), b""):
- hasher.update(chunk)
- return hasher.hexdigest()
- def scan_file(self, file_path):
- """Scan a file using OPSWAT Metadefender API."""
- try:
- # Check if the file exists
- if not os.path.isfile(file_path):
- raise FileNotFoundError(f"File not found: {
- file_path
- }")
- # Calculate MD5 checksum of the file
- md5_checksum = self._calculate_checksum(file_path)
- # Check if the file has been previously scanned
- scan_results = self.get_scan_results(md5_checksum)
- if scan_results:
- self._display_scan_results(scan_results)
- else :
- # If not scanned before, upload the file for scanning
- response = self._upload_file(file_path)
- data_id = response["data_id"]
- # Poll for scan results with progress bar
- scan_results = self._poll_scan_status(data_id)
- self._display_scan_results(scan_results)
- except Exception as e:
- print(f"Error: {
- e
- }")
- def _upload_file(self, file_path):
- """Upload a file for scanning."""
- api_endpoint = f" {
- self.api_url
- }/file"
- with open(file_path, "rb") as file:
- files = {
- "file": (os.path.basename(file_path), file)}
- response = self.session.post(api_endpoint, headers = self.headers, files = files)
- response.raise_for_status()
- return response.json()
- def get_scan_results(self, md5_checksum):
- """Check if scan results for a file already exist."""
- api_endpoint = f" {
- self.api_url
- }/file/ {
- md5_checksum
- }"
- response = self.session.get(api_endpoint, headers = self.headers)
- if response.status_code == 200:
- return response.json()["scan_results"]
- elif response.status_code == 404:
- return None
- else :
- response.raise_for_status()
- def _poll_scan_status(self, data_id):
- """Poll the scan status until it's completed with retry logic and progress bar."""
- api_endpoint = f" {
- self.api_url
- }/file/ {
- data_id
- }"
- with tqdm(total = 100, desc = "Scanning", unit = "%", leave = False) as pbar:
- retries = 0
- while retries < self.retry_limit:
- response = self.session.get(api_endpoint, headers = self.headers)
- response_data = response.json()
- if response_data["scan_results"]["progress_percentage"] == 100:
- pbar.update(100)
- return response_data["scan_results"]
- pbar.update(response_data["scan_results"]["progress_percentage"] - pbar.n)
- time.sleep(2)
- retries += 1
- raise TimeoutError("Scan timed out. Retry limit exceeded.")
- def _display_scan_results(self, scan_results):
- """Display scan results."""
- print("\nScan Results:")
- print(f"Scan Result: {
- scan_results['scan_result_i']}")
- print(f"Def detected: {
- scan_results['def_time']}")
- def prompt_for_api_key():
- api_key = prompt("Enter your OPSWAT API key: ")
- return api_key.strip()
- def prompt_for_file_path():
- file_path = prompt("Enter the path to the file for scanning: ")
- return file_path.strip()
- def main():
- api_key = prompt_for_api_key()
- file_path = prompt_for_file_path()
- scanner = MetadefenderScanner(api_key)
- scanner.scan_file(file_path)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement