Advertisement
xosski

Psd image forensics

Dec 4th, 2024
19
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.05 KB | None | 0 0
  1. import os
  2. import shutil
  3. import time
  4. import requests
  5. import re
  6. from datetime import datetime, timedelta
  7. from psd_tools import PSDImage
  8. from PIL import Image
  9. from duckduckgo_search import DDGS
  10. import logging
  11. from concurrent.futures import ThreadPoolExecutor
  12.  
  13. # Setup logging
  14. logging.basicConfig(filename="process_log.txt", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
  15. logger = logging.getLogger()
  16.  
  17. # Constants and Configuration
  18. MONTHS = {
  19. 1: "януари", 2: "февруари", 3: "март", 4: "април", 5: "май", 6: "юни",
  20. 7: "юли", 8: "август", 9: "септември", 10: "октомври", 11: "ноември", 12: "декември"
  21. }
  22. FOLDER_PATH = r"/mnt/c/Users/krist/Downloads/Шаблон"
  23. TXT_FILE_PATH = os.path.join(FOLDER_PATH, "Date.txt")
  24. PROMO_FILE_PATH = os.path.join(FOLDER_PATH, "promo_text.txt")
  25. LOG_FILE = os.path.join(FOLDER_PATH, "failed_downloads.txt")
  26. REMOVED_LAYERS_FILE = os.path.join(FOLDER_PATH, "removed_layers.txt")
  27.  
  28.  
  29. def sanitize_filename(filename):
  30. """Sanitize filename by removing forbidden characters."""
  31. return re.sub(r'[<>:"/\\|?*]', '_', filename)
  32.  
  33.  
  34. def download_image_with_retries(image_url, output_path, retries=3, timeout=10):
  35. """Download image with retries in case of failure."""
  36. attempt = 0
  37. while attempt < retries:
  38. try:
  39. logger.info(f"Attempt {attempt + 1} for downloading: {image_url}")
  40. response = requests.get(image_url, stream=True, timeout=timeout)
  41. if response.status_code == 200:
  42. with open(output_path, "wb") as img_file:
  43. for chunk in response.iter_content(1024):
  44. img_file.write(chunk)
  45. logger.info(f"Successfully downloaded: {output_path}")
  46. return True
  47. else:
  48. logger.warning(f"Failed to download {image_url}: {response.status_code}")
  49. except requests.exceptions.RequestException as e:
  50. logger.error(f"Error during download: {e}")
  51. attempt += 1
  52. return False
  53.  
  54.  
  55. def convert_to_png(image_path):
  56. """Convert image to PNG format if it's not already PNG."""
  57. if not image_path.lower().endswith(".png"):
  58. try:
  59. with Image.open(image_path) as img:
  60. png_path = os.path.splitext(image_path)[0] + ".png"
  61. img.save(png_path, "PNG")
  62. os.remove(image_path) # Remove the original file
  63. logger.info(f"Converted to PNG: {png_path}")
  64. return png_path
  65. except Exception as e:
  66. logger.error(f"Error converting to PNG: {e}")
  67. return image_path
  68.  
  69.  
  70. def download_images_from_psd(psd_file, output_directory):
  71. """Process PSD file, download images, and log removed layers."""
  72. os.makedirs(output_directory, exist_ok=True)
  73. failed_images = []
  74. try:
  75. psd = PSDImage.open(psd_file)
  76. all_layers = list(psd.descendants())
  77.  
  78. # Filter out unwanted layers and sanitize names
  79. filtered_layers = [
  80. layer.name.replace("? лв/бр", "").replace("?", "").strip()
  81. for layer in all_layers
  82. if "Layer" not in layer.name and "Rectangle" not in layer.name and "Lorem Ipsum" not in layer.name
  83. ]
  84.  
  85. # Download images for each layer
  86. with ThreadPoolExecutor(max_workers=5) as executor:
  87. futures = [executor.submit(download_image_for_layer, layer, output_directory, failed_images) for layer in filtered_layers]
  88. for future in futures:
  89. future.result()
  90.  
  91. if failed_images:
  92. with open(LOG_FILE, "a", encoding="utf-8") as log:
  93. log.write(f"Failed downloads for {psd_file}:\n")
  94. for failed in failed_images:
  95. log.write(f"{failed}\n")
  96. logger.info(f"Failed downloads logged in {LOG_FILE}")
  97.  
  98. except Exception as e:
  99. logger.error(f"Error processing PSD file: {e}")
  100.  
  101.  
  102. def download_image_for_layer(layer, output_directory, failed_images):
  103. """Helper function to download an image for a given layer."""
  104. try:
  105. logger.info(f"Searching image for layer: {layer}...")
  106. with DDGS() as ddgs:
  107. results = ddgs.images(layer, max_results=20)
  108. success = False
  109. for result in results:
  110. image_url = result['image']
  111. sanitized_title = sanitize_filename(layer)
  112. image_extension = os.path.splitext(image_url)[1]
  113. output_path = os.path.join(output_directory, f"{sanitized_title}{image_extension}")
  114.  
  115. if download_image_with_retries(image_url, output_path):
  116. output_path = convert_to_png(output_path)
  117. success = True
  118. break
  119.  
  120. if not success:
  121. logger.warning(f"Failed to download image for {layer}")
  122. failed_images.append(layer)
  123.  
  124. except Exception as e:
  125. logger.error(f"Error downloading image for {layer}: {e}")
  126. failed_images.append(layer)
  127.  
  128.  
  129. def process_files():
  130. """Main function to process files and generate the necessary promo text."""
  131. try:
  132. # Read the start date from Date.txt
  133. with open(TXT_FILE_PATH, "r") as file:
  134. start_date_str = file.read().strip()
  135. start_date = datetime.strptime(start_date_str, "%d.%m.%Y")
  136.  
  137. end_date = start_date + timedelta(days=14)
  138. final_date = end_date + timedelta(days=1)
  139.  
  140. # Create promotional text
  141. if start_date.month == end_date.month:
  142. promo_text = f"Период на промоцията: от {start_date.day} до {end_date.day} {MONTHS[end_date.month]} {end_date.year}"
  143. else:
  144. promo_text = f"Период на промоцията: от {start_date.day} {MONTHS[start_date.month]} до {end_date.day} {MONTHS[end_date.month]} {end_date.year}"
  145.  
  146. with open(PROMO_FILE_PATH, "w", encoding="utf-8") as promo_file:
  147. promo_file.write(promo_text)
  148.  
  149. with open(TXT_FILE_PATH, "w") as file:
  150. file.write(final_date.strftime("%d.%m.%Y"))
  151.  
  152. logger.info(f"Final date written to {TXT_FILE_PATH} and promo text written to {PROMO_FILE_PATH}")
  153.  
  154. # Process the PSD files and download images
  155. original_files = [
  156. "Artboards 01-12.psd", "Artboards 02-03.psd", "Artboards 04-05.psd",
  157. "Artboards 06-07.psd", "Artboards 08-09.psd", "Artboards 10-11.psd"
  158. ]
  159.  
  160. for original_file in original_files:
  161. original_file_path = os.path.join(FOLDER_PATH, original_file)
  162. if os.path.exists(original_file_path):
  163. new_folder_name = os.path.splitext(original_file)[0]
  164. new_folder_path = os.path.join(FOLDER_PATH, new_folder_name)
  165. os.makedirs(new_folder_path, exist_ok=True)
  166. shutil.copy(original_file_path, os.path.join(new_folder_path, original_file))
  167.  
  168. logger.info(f"Original file {original_file} copied to {new_folder_path}")
  169.  
  170. # Process the product file
  171. product_file = original_file.replace(".psd", "_product.psd")
  172. product_file_path = os.path.join(FOLDER_PATH, product_file)
  173. if os.path.exists(product_file_path):
  174. shutil.copy(product_file_path, os.path.join(new_folder_path, product_file))
  175. logger.info(f"Product file {product_file} copied to {new_folder_path}")
  176. download_images_from_psd(product_file_path, new_folder_path)
  177. else:
  178. logger.warning(f"Product file {product_file} not found.")
  179. else:
  180. logger.warning(f"Original file {original_file} not found.")
  181.  
  182. except Exception as e:
  183. logger.error(f"Error processing files: {e}")
  184.  
  185.  
  186. def main():
  187. start_time = time.time()
  188. process_files()
  189. end_time = time.time()
  190. logger.info(f"Execution time: {end_time - start_time:.2f} seconds")
  191.  
  192.  
  193. if __name__ == "__main__":
  194. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement