Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import shutil
- import time
- import requests
- import re
- from datetime import datetime, timedelta
- from psd_tools import PSDImage
- from PIL import Image
- from duckduckgo_search import DDGS
- import logging
- from concurrent.futures import ThreadPoolExecutor
- # Setup logging
- logging.basicConfig(filename="process_log.txt", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
- logger = logging.getLogger()
- # Constants and Configuration
- MONTHS = {
- 1: "януари", 2: "февруари", 3: "март", 4: "април", 5: "май", 6: "юни",
- 7: "юли", 8: "август", 9: "септември", 10: "октомври", 11: "ноември", 12: "декември"
- }
- FOLDER_PATH = r"/mnt/c/Users/krist/Downloads/Шаблон"
- TXT_FILE_PATH = os.path.join(FOLDER_PATH, "Date.txt")
- PROMO_FILE_PATH = os.path.join(FOLDER_PATH, "promo_text.txt")
- LOG_FILE = os.path.join(FOLDER_PATH, "failed_downloads.txt")
- REMOVED_LAYERS_FILE = os.path.join(FOLDER_PATH, "removed_layers.txt")
- def sanitize_filename(filename):
- """Sanitize filename by removing forbidden characters."""
- return re.sub(r'[<>:"/\\|?*]', '_', filename)
- def download_image_with_retries(image_url, output_path, retries=3, timeout=10):
- """Download image with retries in case of failure."""
- attempt = 0
- while attempt < retries:
- try:
- logger.info(f"Attempt {attempt + 1} for downloading: {image_url}")
- response = requests.get(image_url, stream=True, timeout=timeout)
- if response.status_code == 200:
- with open(output_path, "wb") as img_file:
- for chunk in response.iter_content(1024):
- img_file.write(chunk)
- logger.info(f"Successfully downloaded: {output_path}")
- return True
- else:
- logger.warning(f"Failed to download {image_url}: {response.status_code}")
- except requests.exceptions.RequestException as e:
- logger.error(f"Error during download: {e}")
- attempt += 1
- return False
- def convert_to_png(image_path):
- """Convert image to PNG format if it's not already PNG."""
- if not image_path.lower().endswith(".png"):
- try:
- with Image.open(image_path) as img:
- png_path = os.path.splitext(image_path)[0] + ".png"
- img.save(png_path, "PNG")
- os.remove(image_path) # Remove the original file
- logger.info(f"Converted to PNG: {png_path}")
- return png_path
- except Exception as e:
- logger.error(f"Error converting to PNG: {e}")
- return image_path
- def download_images_from_psd(psd_file, output_directory):
- """Process PSD file, download images, and log removed layers."""
- os.makedirs(output_directory, exist_ok=True)
- failed_images = []
- try:
- psd = PSDImage.open(psd_file)
- all_layers = list(psd.descendants())
- # Filter out unwanted layers and sanitize names
- filtered_layers = [
- layer.name.replace("? лв/бр", "").replace("?", "").strip()
- for layer in all_layers
- if "Layer" not in layer.name and "Rectangle" not in layer.name and "Lorem Ipsum" not in layer.name
- ]
- # Download images for each layer
- with ThreadPoolExecutor(max_workers=5) as executor:
- futures = [executor.submit(download_image_for_layer, layer, output_directory, failed_images) for layer in filtered_layers]
- for future in futures:
- future.result()
- if failed_images:
- with open(LOG_FILE, "a", encoding="utf-8") as log:
- log.write(f"Failed downloads for {psd_file}:\n")
- for failed in failed_images:
- log.write(f"{failed}\n")
- logger.info(f"Failed downloads logged in {LOG_FILE}")
- except Exception as e:
- logger.error(f"Error processing PSD file: {e}")
- def download_image_for_layer(layer, output_directory, failed_images):
- """Helper function to download an image for a given layer."""
- try:
- logger.info(f"Searching image for layer: {layer}...")
- with DDGS() as ddgs:
- results = ddgs.images(layer, max_results=20)
- success = False
- for result in results:
- image_url = result['image']
- sanitized_title = sanitize_filename(layer)
- image_extension = os.path.splitext(image_url)[1]
- output_path = os.path.join(output_directory, f"{sanitized_title}{image_extension}")
- if download_image_with_retries(image_url, output_path):
- output_path = convert_to_png(output_path)
- success = True
- break
- if not success:
- logger.warning(f"Failed to download image for {layer}")
- failed_images.append(layer)
- except Exception as e:
- logger.error(f"Error downloading image for {layer}: {e}")
- failed_images.append(layer)
- def process_files():
- """Main function to process files and generate the necessary promo text."""
- try:
- # Read the start date from Date.txt
- with open(TXT_FILE_PATH, "r") as file:
- start_date_str = file.read().strip()
- start_date = datetime.strptime(start_date_str, "%d.%m.%Y")
- end_date = start_date + timedelta(days=14)
- final_date = end_date + timedelta(days=1)
- # Create promotional text
- if start_date.month == end_date.month:
- promo_text = f"Период на промоцията: от {start_date.day} до {end_date.day} {MONTHS[end_date.month]} {end_date.year}"
- else:
- promo_text = f"Период на промоцията: от {start_date.day} {MONTHS[start_date.month]} до {end_date.day} {MONTHS[end_date.month]} {end_date.year}"
- with open(PROMO_FILE_PATH, "w", encoding="utf-8") as promo_file:
- promo_file.write(promo_text)
- with open(TXT_FILE_PATH, "w") as file:
- file.write(final_date.strftime("%d.%m.%Y"))
- logger.info(f"Final date written to {TXT_FILE_PATH} and promo text written to {PROMO_FILE_PATH}")
- # Process the PSD files and download images
- original_files = [
- "Artboards 01-12.psd", "Artboards 02-03.psd", "Artboards 04-05.psd",
- "Artboards 06-07.psd", "Artboards 08-09.psd", "Artboards 10-11.psd"
- ]
- for original_file in original_files:
- original_file_path = os.path.join(FOLDER_PATH, original_file)
- if os.path.exists(original_file_path):
- new_folder_name = os.path.splitext(original_file)[0]
- new_folder_path = os.path.join(FOLDER_PATH, new_folder_name)
- os.makedirs(new_folder_path, exist_ok=True)
- shutil.copy(original_file_path, os.path.join(new_folder_path, original_file))
- logger.info(f"Original file {original_file} copied to {new_folder_path}")
- # Process the product file
- product_file = original_file.replace(".psd", "_product.psd")
- product_file_path = os.path.join(FOLDER_PATH, product_file)
- if os.path.exists(product_file_path):
- shutil.copy(product_file_path, os.path.join(new_folder_path, product_file))
- logger.info(f"Product file {product_file} copied to {new_folder_path}")
- download_images_from_psd(product_file_path, new_folder_path)
- else:
- logger.warning(f"Product file {product_file} not found.")
- else:
- logger.warning(f"Original file {original_file} not found.")
- except Exception as e:
- logger.error(f"Error processing files: {e}")
- def main():
- start_time = time.time()
- process_files()
- end_time = time.time()
- logger.info(f"Execution time: {end_time - start_time:.2f} seconds")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement