Advertisement
kopyl

Untitled

Dec 9th, 2023
509
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.16 KB | None | 0 0
  1. import pika, sys, os
  2. import requests
  3. from multiprocessing import Pool
  4. from request_retrier import retry_request_till_success
  5. import PIL.Image as Image
  6. import PIL
  7.  
  8. dest_folder = "images/source-512/"
  9.  
  10. def image_is_invalid(file_path):
  11.     try:
  12.         img = Image.open(file_path)
  13.         img.verify()
  14.         return False
  15.     except PIL.UnidentifiedImageError:
  16.         return True
  17.    
  18.  
  19. def file_exists_and_valid_image(file_path):
  20.     return os.path.exists(file_path) and not image_is_invalid(file_path)
  21.  
  22.  
  23. def download_file(icon_url, channel, method):
  24.     file_name = os.path.basename(icon_url)
  25.     file_path = os.path.join(dest_folder, file_name)
  26.    
  27.     if file_exists_and_valid_image(file_path):
  28.         print(f"File already exists and is valid image: {file_name}")
  29.         channel.basic_ack(delivery_tag=method.delivery_tag)  # REMOVE FROM QUEUE
  30.         return
  31.    
  32.     icon_url_512 = icon_url.replace("/128/", "/512/")
  33.     response = retry_request_till_success(requests.get)(icon_url_512)
  34.     content = response.content
  35.  
  36.     with open(file_path, "wb") as f:
  37.         f.write(content)
  38.         file_size = sys.getsizeof(content)
  39.         print(f"File size: {file_size} bytes, Downloaded: {file_name}")
  40.         channel.basic_ack(delivery_tag=method.delivery_tag)  # REMOVE FROM QUEUE
  41.  
  42. def worker(args):
  43.     try:
  44.         download_file(*args)
  45.     except Exception as e:
  46.         print(f"Error downloading file: {e}")
  47.  
  48. def main():
  49.     connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  50.     channel = connection.channel()
  51.     channel.queue_declare(queue='hello')
  52.  
  53.     pool = Pool(processes=100)
  54.  
  55.     def callback(ch, method, properties, body):
  56.         string = body.decode()
  57.         pool.apply_async(worker, ((string, ch, method),))
  58.  
  59.     channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)
  60.  
  61.     print(' [*] Waiting for messages. To exit press CTRL+C')
  62.     channel.start_consuming()
  63.  
  64. if __name__ == '__main__':
  65.     try:
  66.         main()
  67.     except KeyboardInterrupt:
  68.         print('Interrupted')
  69.         try:
  70.             sys.exit(0)
  71.         except SystemExit:
  72.             os._exit(0)
  73.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement