Advertisement
kopyl

Untitled

Dec 9th, 2023
437
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.03 KB | None | 0 0
  1. import pika, sys, os
  2. import requests
  3. from multiprocessing import Pool
  4. from request_retrier import retry_request_till_success
  5. import PIL.Image as Image
  6. import PIL
  7.  
  8. dest_folder = "images/source-512/"
  9.  
  10.  
  11. def image_is_invalid(file_path):
  12.     try:
  13.         img = Image.open(file_path)
  14.         img.verify()
  15.         return False
  16.     except PIL.UnidentifiedImageError:
  17.         return True
  18.    
  19.  
  20. def file_exists_and_valid_image(file_path):
  21.     return os.path.exists(file_path) and not image_is_invalid(file_path)
  22.  
  23.  
  24. def download_file(icon_url):
  25.     file_name = os.path.basename(icon_url)
  26.     file_path = os.path.join(dest_folder, file_name)
  27.    
  28.     if file_exists_and_valid_image(file_path):
  29.         print(f"File already exists and is valid image: {file_name}")
  30.         # REMOVE FROM QUEUE
  31.         return
  32.    
  33.     icon_url_512 = icon_url.replace("/128/", "/512/")
  34.     response = retry_request_till_success(requests.get)(icon_url_512)
  35.     content = response.content
  36.  
  37.     with open(file_path, "wb") as f:
  38.         f.write(content)
  39.         file_size = sys.getsizeof(content)
  40.         print(f"File size: {file_size} bytes, Downloaded: {file_name}")
  41.         # REMOVE FROM QUEUE
  42.  
  43. def worker(icon_url):
  44.     try:
  45.         download_file(icon_url)
  46.     except Exception as e:
  47.         print(f"Error downloading file: {e}")
  48.  
  49. def main():
  50.     connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  51.     channel = connection.channel()
  52.     channel.queue_declare(queue='hello')
  53.  
  54.     pool = Pool(processes=100)
  55.  
  56.     def callback(ch, method, properties, body):
  57.         string = body.decode()
  58.         pool.apply_async(worker, (string,))
  59.  
  60.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
  61.  
  62.     print(' [*] Waiting for messages. To exit press CTRL+C')
  63.     channel.start_consuming()
  64.  
  65. if __name__ == '__main__':
  66.     try:
  67.         main()
  68.     except KeyboardInterrupt:
  69.         print('Interrupted')
  70.         try:
  71.             sys.exit(0)
  72.         except SystemExit:
  73.             os._exit(0)
  74.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement