Advertisement
WhosYourDaddySec

OPSWAT AUTOMATED FILE SCANNER

Dec 17th, 2023
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.84 KB | None | 0 0
  1. # OPSWAT File Scanner
  2. # Tool and Tutorial by Michael Errington
  3.  
  4. # Step-by-Step Installation Guide
  5.  
  6. # Step 1: Install Python
  7. # Ensure that Python is installed on your system. If not, download the latest version from [Python Downloads](https://www.python.org/downloads/) and follow the installation instructions.
  8.  
  9. # Step 2: Install Dependencies
  10. # Open a terminal, navigate to the tool's directory, and install the required Python packages:
  11.  
  12. ```bash
  13. pip install requests tqdm prompt_toolkit
  14. ```
  15.  
  16. # Step 3: Acquire OPSWAT API Key
  17. # If you don't have an OPSWAT API key, sign up on the OPSWAT Portal to obtain one.
  18.  
  19. # Step 4: Run the Scanner
  20. # Open a terminal and run the scanner using the following command:
  21.  
  22. ```bash
  23. python scanner.py
  24. ```
  25.  
  26. # Step 5: Enter Your OPSWAT API Key
  27. # The tool will prompt you to enter your OPSWAT API key. Copy and paste it from the OPSWAT Portal and press Enter.
  28.  
  29. # Step 6: Enter the Path to the File
  30. # Provide the full path to the file you want to scan and press Enter. For best results, use the absolute path.
  31.  
  32. # Step 7: View Scan Results
  33. # The tool will upload the file for scanning and display the results. If the file has been scanned before, previous results will be shown; otherwise, a new scan will be initiated.
  34.  
  35. # Additional Options:
  36.  
  37. # - Retry Limit:** Adjust the number of retries when polling for scan status using the `--retry-limit` option:
  38.  
  39. ```bash
  40. python scanner.py FILE_PATH --api-key API_KEY --retry-limit RETRY_LIMIT
  41. ```
  42.  
  43. # Replace `FILE_PATH` with your file's path and `API_KEY` with your OPSWAT API key.
  44.  
  45. ---
  46.  
  47. # This step-by-step guide, accompanied by the OPSWAT File Scanner, has been created and provided by Michael Errington to facilitate a seamless user experience in utilizing the tool for file scanning purposes.
  48.  
  49. import os
  50. import hashlib
  51. import requests
  52. import time
  53. import argparse
  54. from tqdm import tqdm
  55. from requests.adapters import HTTPAdapter
  56. from requests.packages.urllib3.util.retry import Retry
  57. from prompt_toolkit import prompt
  58. from prompt_toolkit.shortcuts import radiolist_dialog
  59.  
  60. class MetadefenderScanner:
  61. def __init__(self, api_key, retry_limit = 3):
  62. self.api_key = api_key
  63. self.api_url = "https://api.metadefender.com/v4"
  64. self.headers = {
  65. "apikey": api_key
  66. }
  67. self.chunk_size = 4096
  68. self.retry_limit = retry_limit
  69. self.session = self._create_session()
  70.  
  71. def _create_session(self):
  72. """Create a requests session with retry logic."""
  73. session = requests.Session()
  74. retries = Retry(total = self.retry_limit, backoff_factor = 0.1, status_forcelist = [500, 502, 503, 504])
  75. adapter = HTTPAdapter(max_retries = retries)
  76. session.mount('http://', adapter)
  77. session.mount('https://', adapter)
  78. return session
  79.  
  80. def _calculate_checksum(self, file_path, algorithm = "md5"):
  81. """Calculate checksum (default: md5) of a file."""
  82. hasher = hashlib.new(algorithm)
  83. with open(file_path, "rb") as file:
  84. for chunk in iter(lambda: file.read(self.chunk_size), b""):
  85. hasher.update(chunk)
  86. return hasher.hexdigest()
  87.  
  88. def scan_file(self, file_path):
  89. """Scan a file using OPSWAT Metadefender API."""
  90. try:
  91. # Check if the file exists
  92. if not os.path.isfile(file_path):
  93. raise FileNotFoundError(f"File not found: {
  94. file_path
  95. }")
  96.  
  97. # Calculate MD5 checksum of the file
  98. md5_checksum = self._calculate_checksum(file_path)
  99.  
  100. # Check if the file has been previously scanned
  101. scan_results = self.get_scan_results(md5_checksum)
  102. if scan_results:
  103. self._display_scan_results(scan_results)
  104. else :
  105. # If not scanned before, upload the file for scanning
  106. response = self._upload_file(file_path)
  107. data_id = response["data_id"]
  108.  
  109. # Poll for scan results with progress bar
  110. scan_results = self._poll_scan_status(data_id)
  111. self._display_scan_results(scan_results)
  112.  
  113. except Exception as e:
  114. print(f"Error: {
  115. e
  116. }")
  117.  
  118. def _upload_file(self, file_path):
  119. """Upload a file for scanning."""
  120. api_endpoint = f" {
  121. self.api_url
  122. }/file"
  123. with open(file_path, "rb") as file:
  124. files = {
  125. "file": (os.path.basename(file_path), file)}
  126. response = self.session.post(api_endpoint, headers = self.headers, files = files)
  127. response.raise_for_status()
  128. return response.json()
  129.  
  130. def get_scan_results(self, md5_checksum):
  131. """Check if scan results for a file already exist."""
  132. api_endpoint = f" {
  133. self.api_url
  134. }/file/ {
  135. md5_checksum
  136. }"
  137. response = self.session.get(api_endpoint, headers = self.headers)
  138. if response.status_code == 200:
  139. return response.json()["scan_results"]
  140. elif response.status_code == 404:
  141. return None
  142. else :
  143. response.raise_for_status()
  144.  
  145. def _poll_scan_status(self, data_id):
  146. """Poll the scan status until it's completed with retry logic and progress bar."""
  147. api_endpoint = f" {
  148. self.api_url
  149. }/file/ {
  150. data_id
  151. }"
  152. with tqdm(total = 100, desc = "Scanning", unit = "%", leave = False) as pbar:
  153. retries = 0
  154. while retries < self.retry_limit:
  155. response = self.session.get(api_endpoint, headers = self.headers)
  156. response_data = response.json()
  157. if response_data["scan_results"]["progress_percentage"] == 100:
  158. pbar.update(100)
  159. return response_data["scan_results"]
  160. pbar.update(response_data["scan_results"]["progress_percentage"] - pbar.n)
  161. time.sleep(2)
  162. retries += 1
  163. raise TimeoutError("Scan timed out. Retry limit exceeded.")
  164.  
  165. def _display_scan_results(self, scan_results):
  166. """Display scan results."""
  167. print("\nScan Results:")
  168. print(f"Scan Result: {
  169. scan_results['scan_result_i']}")
  170. print(f"Def detected: {
  171. scan_results['def_time']}")
  172.  
  173. def prompt_for_api_key():
  174. api_key = prompt("Enter your OPSWAT API key: ")
  175. return api_key.strip()
  176.  
  177. def prompt_for_file_path():
  178. file_path = prompt("Enter the path to the file for scanning: ")
  179. return file_path.strip()
  180.  
  181. def main():
  182. api_key = prompt_for_api_key()
  183. file_path = prompt_for_file_path()
  184.  
  185. scanner = MetadefenderScanner(api_key)
  186. scanner.scan_file(file_path)
  187.  
  188. if __name__ == "__main__":
  189. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement