Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import requests
- import smtplib
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.mime.base import MIMEBase
- from email import encoders
- import logging
- from datetime import datetime, timedelta
- import random
- import json
- import uuid
- from PIL import Image
- # Configure logging
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
- # Load sensitive data from environment variables
- SERVER_URL = "http://default.server.url"
- EMAIL = "default_email@example.com"
- PASSWORD = "default_password" #immich password
- SMTP_PASSWORD = "default_smtp_password"
- TRACKED_IMAGES_FILE = 'tracked_images.txt' # file to keep track of whats been sent
- PERSON_ID = 'default_person_id'
- recipient_emails = [
- "recipient1@example.com", "recipient2@example.com",
- "recipient3@example.com", "recipient4@example.com",
- "recipient5@example.com"
- ]
- def compress_image(file_path, quality=85, output_folder='compressed_images'):
- original_size = os.path.getsize(file_path)
- max_size = 25 * 1024 * 1024 # 25 MB
- if original_size > max_size:
- os.makedirs(output_folder, exist_ok=True)
- image = Image.open(file_path)
- original_basename = os.path.basename(file_path)
- new_file_path = os.path.join(output_folder, f"compressed_{original_basename}")
- image.save(new_file_path, format='JPEG', quality=quality)
- if os.path.exists(new_file_path):
- os.remove(file_path)
- return new_file_path
- else:
- return file_path
- else:
- return file_path
- def download_asset(server_url, photo_id, token, query_key=None):
- file_path = f'{photo_id}.jpg'
- if not os.path.exists(file_path):
- logging.info(f"Downloading image: {photo_id}")
- download_url = f"{server_url}/api/assets/{photo_id}/original"
- headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/octet-stream'}
- params = {'key': query_key} if query_key else {}
- try:
- response = requests.get(download_url, headers=headers, params=params)
- if response.status_code == 200:
- content_type = response.headers.get('Content-Type')
- extension = content_type.split('/')[-1]
- file_path = f"{photo_id}.{extension}"
- with open(file_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- f.write(chunk)
- logging.info(f"Image downloaded: {photo_id}")
- else:
- logging.error(f"Failed to download image. Status code: {response.status_code}")
- return None
- except requests.exceptions.RequestException as e:
- logging.error(f"Error during image download: {e}")
- return None
- else:
- logging.info(f"Image already exists: {photo_id}")
- file_path = compress_image(file_path) if 'image' in content_type else file_path
- return file_path
- def login_to_api(server_url, email, password):
- logging.info("Attempting to log in...")
- login_url = f"{server_url}/api/auth/login"
- login_payload = json.dumps({"email": email, "password": password})
- login_headers = {'Content-Type': 'application/json'}
- try:
- response = requests.post(login_url, headers=login_headers, data=login_payload)
- if response.status_code in [200, 201]:
- logging.info("Login successful")
- return response.json()
- else:
- logging.error(f"Failed to login. Status code: {response.status_code}")
- return None
- except requests.exceptions.RequestException as e:
- logging.error(f"Error during login: {e}")
- return None
- def validate_token(server_url, token):
- url = f"{server_url}/api/auth/validateToken"
- headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/json'}
- try:
- response = requests.post(url, headers=headers)
- if response.status_code == 200:
- return response.json().get('authStatus', False)
- else:
- logging.error(f"Failed to validate token. Status code: {response.status_code}")
- return False
- except requests.exceptions.RequestException as e:
- logging.error(f"Error during token validation: {e}")
- return False
- def get_time_buckets(server_url, token, user_id, size='MONTH'):
- logging.info("Fetching time buckets...")
- url = f"{server_url}/api/timeline/buckets"
- headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/json'}
- params = {'userId': user_id, 'size': size}
- try:
- response = requests.get(url, headers=headers, params=params)
- if response.status_code == 200:
- logging.info("Time buckets fetched successfully.")
- return response.json()
- else:
- logging.error(f"Failed to fetch time buckets. Status code: {response.status_code}")
- return []
- except requests.exceptions.RequestException as e:
- logging.error(f"Error fetching time buckets: {e}")
- return []
- def fetch_assets(server_url, token, person_id):
- logging.info("Fetching assets...")
- url = f"{server_url}/api/people/{person_id}/assets"
- headers = {'Authorization': f'Bearer {token}', 'Accept': 'application/json'}
- try:
- response = requests.get(url, headers=headers)
- if response.status_code == 200:
- logging.info("Assets fetched successfully")
- assets = response.json()
- last_month_date = datetime.now() - timedelta(days=30)
- format_str = '%Y-%m-%dT%H:%M:%S.%fZ'
- recent_assets = [
- asset for asset in assets
- if datetime.strptime(asset['fileCreatedAt'], format_str) >= last_month_date
- ]
- return recent_assets
- else:
- logging.error(f"Failed to fetch assets. Status code: {response.status_code}")
- return []
- except requests.exceptions.RequestException as e:
- logging.error(f"Error fetching assets: {e}")
- return []
- def record_downloaded_image(image_id):
- with open(TRACKED_IMAGES_FILE, 'a') as file:
- file.write(f"{image_id}\n")
- logging.info(f"Recorded image {image_id} as downloaded")
- def has_image_been_downloaded(image_id):
- if os.path.exists(TRACKED_IMAGES_FILE):
- with open(TRACKED_IMAGES_FILE, 'r') as file):
- downloaded_images = {line.strip() for line in file}
- return image_id in downloaded_images
- return False
- def send_email_with_attachment(subject, body, recipient_emails, attachment_path):
- msg = MIMEMultipart()
- msg['From'] = EMAIL
- msg['To'] = ', '.join(recipient_emails)
- msg['Subject'] = subject
- msg.attach(MIMEText(body, 'plain'))
- with open(attachment_path, 'rb') as file:
- part = MIMEBase('application', 'octet-stream')
- part.set_payload(file.read())
- encoders.encode_base64(part)
- part.add_header('Content-Disposition', f"attachment; filename= {os.path.basename(attachment_path)}")
- msg.attach(part)
- try:
- with smtplib.SMTP('smtp.gmail.com', 587) as server:
- server.starttls()
- server.login(EMAIL, SMTP_PASSWORD)
- server.send_message(msg)
- logging.info(f"Email sent to {', '.join(recipient_emails)}")
- except smtplib.SMTPException as e:
- logging.error(f"Error sending email: {e}")
- def cleanup_file(file_path):
- os.remove(file_path)
- logging.info(f"Deleted file {file_path}")
- # Example usage
- if __name__ == "__main__":
- login_data = login_to_api(SERVER_URL, EMAIL, PASSWORD)
- if login_data:
- token = login_data.get('accessToken')
- user_id = login_data.get('userId')
- buckets = get_time_buckets(SERVER_URL, token, user_id)
- if buckets:
- latest_bucket = max(buckets, key=lambda b: b['timeBucket'])
- assets = fetch_assets(SERVER_URL, token, PERSON_ID)
- logging.info(f"Assets fetched successfully. Number of assets: {len(assets)}")
- if assets:
- selected_asset = random.choice(assets)
- if not has_image_been_downloaded(selected_asset['id']):
- file_path = download_asset(SERVER_URL, selected_asset['id'], token)
- if file_path:
- send_email_with_attachment("Random Photo", "Here's a random photo.", recipient_emails, file_path)
- cleanup_file(file_path)
- record_downloaded_image(selected_asset['id'])
- else:
- logging.error("Failed to process the image due to download issues")
- else:
- logging.error("Unable to login and perform operations")
Add Comment
Please, Sign In to add comment