Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import face_recognition
- from collections import defaultdict
- import shutil
- import torch
- import numpy as np
- import time
- import logging
- import json
- # Глобальные переменные
- LIMITED_FILES_COUNT = 20
- THRESHOLDS = [0.43, 0.54, 0.6]
- EMBEDDINGS_DIR = "embeddings"
- # Настройка логирования
- logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', handlers=[logging.FileHandler("log.txt"), logging.StreamHandler()])
- # Функция для извлечения префикса из имени файла
- def get_prefix(filename):
- parts = filename.split('-')
- if len(parts) >= 3:
- return '-'.join(parts[:2])
- else:
- prefix = ''
- for char in filename:
- if char.isdigit():
- break
- prefix += char
- return prefix if prefix else filename
- # Функция для извлечения эмбеддингов лиц и сохранения их в файл
- def extract_embeddings(image_path):
- embedding_path = os.path.join(EMBEDDINGS_DIR, os.path.basename(image_path) + ".json")
- if os.path.exists(embedding_path):
- with open(embedding_path, 'r') as f:
- embedding = json.load(f)
- return np.array(embedding)
- else:
- logging.info(f"Processing file: {image_path}")
- image = face_recognition.load_image_file(image_path)
- face_encodings = face_recognition.face_encodings(image)
- if face_encodings:
- embedding = face_encodings[0]
- with open(embedding_path, 'w') as f:
- json.dump(embedding.tolist(), f)
- return embedding
- else:
- return None
- # Группировка файлов по префиксу
- def group_by_prefix(file_list):
- groups = defaultdict(list)
- for file in file_list:
- prefix = get_prefix(file)
- groups[prefix].append(file)
- return groups
- # Функция для извлечения и кэширования эмбеддингов для всех изображений
- def extract_and_cache_embeddings(grouped_files):
- embeddings_cache = {}
- for key, files in grouped_files.items():
- limited_files = files[:LIMITED_FILES_COUNT // 2] + files[-LIMITED_FILES_COUNT // 2:]
- embeddings_cache[key] = [(extract_embeddings(file), file) for file in limited_files if extract_embeddings(file) is not None]
- return embeddings_cache
- # Функция для сравнения двух эмбеддингов
- def compare_embeddings(embedding1, embedding2):
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- embedding1 = torch.tensor(embedding1).to(device)
- embedding2 = torch.tensor(embedding2).to(device)
- distance = torch.nn.functional.pairwise_distance(embedding1.unsqueeze(0), embedding2.unsqueeze(0)).item()
- return distance
- # Функция для объединения групп на основе новых правил
- def merge_groups(groups, embeddings_cache, threshold):
- merged_groups = []
- group_keys = list(groups.keys())
- while group_keys:
- current_key = group_keys.pop(0)
- current_group = groups[current_key]
- current_embeddings = embeddings_cache[current_key]
- i = 0
- while i < len(group_keys):
- comparison_key = group_keys[i]
- comparison_group = groups[comparison_key]
- comparison_embeddings = embeddings_cache[comparison_key]
- total_comparisons = 0
- distances = []
- for (emb1, file1) in current_embeddings:
- for (emb2, file2) in comparison_embeddings:
- total_comparisons += 1
- distance = compare_embeddings(emb1, emb2)
- distances.append(distance)
- logging.info(f"Comparing {file1} and {file2}: Distance = {distance}")
- if total_comparisons == 0:
- i += 1
- continue
- # Применяем новые правила
- count_45 = sum(1 for d in distances if d <= 0.45)
- count_50 = sum(1 for d in distances if d <= 0.5)
- count_55 = sum(1 for d in distances if d <= 0.55)
- count_57 = sum(1 for d in distances if d >= 0.57)
- count_65 = sum(1 for d in distances if d > 0.65)
- count_70 = sum(1 for d in distances if d > 0.7)
- percent_45 = count_45 / total_comparisons
- percent_50 = count_50 / total_comparisons
- percent_55 = count_55 / total_comparisons
- percent_57 = count_57 / total_comparisons
- percent_65 = count_65 / total_comparisons
- percent_70 = count_70 / total_comparisons
- logging.info(f"Group {current_key} vs Group {comparison_key}: {percent_45*100:.2f}% <= 0.45, {percent_50*100:.2f}% <= 0.5, {percent_55*100:.2f}% <= 0.55, {percent_57*100:.2f}% >= 0.57, {percent_65*100:.2f}% > 0.65, {percent_70*100:.2f}% > 0.7")
- if percent_45 >= 0.1:
- current_group.extend(comparison_group)
- del groups[comparison_key]
- del embeddings_cache[comparison_key]
- group_keys.pop(i)
- elif percent_50 >= 0.4:
- current_group.extend(comparison_group)
- del groups[comparison_key]
- del embeddings_cache[comparison_key]
- group_keys.pop(i)
- elif percent_55 >= 0.7:
- current_group.extend(comparison_group)
- del groups[comparison_key]
- del embeddings_cache[comparison_key]
- group_keys.pop(i)
- elif percent_65 >= 0.3:
- i += 1
- elif percent_70 >= 0.1:
- i += 1
- elif percent_57 >= 0.8:
- i += 1
- else:
- i += 1
- merged_groups.append((current_key, current_group))
- # Преобразуем список обратно в словарь для следующей итерации
- new_groups = {}
- new_embeddings_cache = {}
- for i, (key, group) in enumerate(merged_groups):
- new_key = f"group_{i}"
- new_groups[new_key] = group
- # Обновляем кэшированные эмбеддинги для новых групп
- new_embeddings_cache[new_key] = []
- for emb, file in embeddings_cache[key]:
- if file in group:
- new_embeddings_cache[new_key].append((emb, file))
- return new_groups, new_embeddings_cache
- # Функция для копирования файлов в выходную директорию
- def copy_files_to_output(merged_groups, output_directory):
- if not os.path.exists(output_directory):
- os.makedirs(output_directory)
- for i, (group_name, group) in enumerate(merged_groups.items()):
- group_dir = os.path.join(output_directory, f"group_{i + 1}")
- if not os.path.exists(group_dir):
- os.makedirs(group_dir)
- for file in group:
- shutil.copy(file, group_dir)
- # Функция для копирования групп, содержащих только один префикс, в отдельную папку
- def copy_single_prefix_groups_to_output(groups, output_directory):
- single_prefix_dir = os.path.join(output_directory, "single_prefix_groups")
- if not os.path.exists(single_prefix_dir):
- os.makedirs(single_prefix_dir)
- prefix_counts = defaultdict(int)
- for group in groups.values():
- for file in group:
- prefix = get_prefix(file)
- prefix_counts[prefix] += 1
- for i, (group_name, group) in enumerate(groups.items()):
- group_prefixes = set(get_prefix(file) for file in group)
- if all(prefix_counts[prefix] == len(group) for prefix in group_prefixes):
- group_dir = os.path.join(single_prefix_dir, f"group_{i + 1}")
- if not os.path.exists(group_dir):
- os.makedirs(group_dir)
- for file in group:
- shutil.copy(file, group_dir)
- # Основная функция
- def main(input_directory, output_directory):
- # Проверка доступности GPU
- if not torch.cuda.is_available():
- logging.error("GPU is not available. Please ensure you have installed the correct drivers and CUDA toolkit.")
- return
- logging.info(f"Using GPU: {torch.cuda.get_device_name(torch.cuda.current_device())}")
- # Создание директории для эмбеддингов, если она не существует
- if not os.path.exists(EMBEDDINGS_DIR):
- os.makedirs(EMBEDDINGS_DIR)
- start_time = time.time()
- files = [os.path.join(input_directory, file) for file in os.listdir(input_directory) if file.endswith('.jpg')]
- grouped_files = group_by_prefix(files)
- # Вывод количества групп по префиксам
- logging.info(f"Number of groups by prefix: {len(grouped_files)}")
- # Извлечение и кэширование эмбеддингов
- embeddings_cache = extract_and_cache_embeddings(grouped_files)
- # Проводим многократную группировку с разными значениями порога
- for threshold in THRESHOLDS:
- logging.info(f"Grouping with threshold: {threshold}")
- grouped_files, embeddings_cache = merge_groups(grouped_files, embeddings_cache, threshold)
- logging.info(f"Number of groups after grouping with threshold {threshold}: {len(grouped_files)}")
- # Копируем файлы в выходную директорию
- copy_files_to_output(grouped_files, output_directory)
- # Отделяем группы с одним префиксом и копируем в отдельную папку
- copy_single_prefix_groups_to_output(grouped_files, output_directory)
- end_time = time.time()
- logging.info(f"Total execution time: {end_time - start_time} seconds")
- for i, (group_name, group) in enumerate(grouped_files.items()):
- logging.info(f"Group {i + 1}:")
- for file in group:
- logging.info(f" {file}")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement