Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <fstream>
- #include <string>
- #include <vector>
- #include <map> // Используем std::map вместо unordered_map для автоматической сортировки
- #include <filesystem>
- #include <chrono>
- #include <csignal>
- #include <thread>
- #include <zlib.h> // Для сжатия данных
- namespace fs = std::filesystem;
- // Конфигурационные параметры (можно вынести в отдельный файл)
- // const size_t WAL_SIZE_THRESHOLD = 64 * 1024 * 1024; // 64MB
- const size_t WAL_SIZE_THRESHOLD = 512; // Уменьшили для тестирования!!!!!!!!!!!!!!!!!!!!!!!!!!!
- const std::string WAL_FILENAME = "wal.log";
- const std::string SST_DIR = "sst_files";
- const bool ENABLE_COMPRESSION = true;
- #pragma pack(push, 1) // Выравнивание структур для бинарной записи
- struct BrixHeader {
- uint32_t version = 1; // Версия формата
- uint32_t entry_count; // Количество записей
- uint64_t index_offset; // Смещение индексного блока
- uint8_t flags = 0; // Флаги (например, сжатие)
- };
- struct BrixIndexEntry {
- uint32_t key_length; // Длина ключа
- uint64_t data_offset; // Смещение данных
- uint32_t data_length; // Длина данных
- };
- #pragma pack(pop)
- struct WalEntry {
- std::string id;
- std::string data;
- };
- struct SstEntry {
- std::string id;
- std::string data;
- // Для сортировки
- bool operator<(const SstEntry& other) const {
- return id < other.id;
- }
- };
- // Глобальный флаг для graceful shutdown
- volatile std::sig_atomic_t g_running = 1;
- void signal_handler(int) {
- g_running = 0;
- std::cout << "\nReceived shutdown signal, finishing current operation..." << std::endl;
- }
- // Сжатие данных с помощью zlib
- std::string compressData(const std::string& data) {
- uLongf compressed_size = compressBound(data.size());
- std::string compressed(compressed_size, '\0');
- if (compress2((Bytef*)compressed.data(), &compressed_size,
- (const Bytef*)data.data(), data.size(),
- Z_BEST_COMPRESSION) != Z_OK) {
- throw std::runtime_error("Compression failed");
- }
- compressed.resize(compressed_size);
- return compressed;
- }
- // Чтение WAL с обработкой ошибок
- std::vector<WalEntry> readWalFile(const std::string& filename) {
- std::vector<WalEntry> entries;
- try {
- std::ifstream walFile(filename);
- if (!walFile) {
- throw std::runtime_error("Cannot open WAL file");
- }
- std::string line;
- while (g_running && std::getline(walFile, line)) {
- try {
- size_t at_pos = line.find('@');
- size_t space_pos = line.find(' ', at_pos);
- if (at_pos == std::string::npos || space_pos == std::string::npos) {
- std::cerr << "Skipping malformed WAL entry: " << line << std::endl;
- continue;
- }
- std::string id = line.substr(at_pos + 1, space_pos - at_pos - 1);
- entries.push_back({id, line});
- } catch (const std::exception& e) {
- std::cerr << "Error parsing WAL entry: " << e.what() << std::endl;
- }
- }
- } catch (const std::exception& e) {
- std::cerr << "WAL read error: " << e.what() << std::endl;
- }
- return entries;
- }
- // Запись в SST с индексом и поддержкой сжатия
- void writeSstFile(const std::string& filename, const std::vector<SstEntry>& entries) {
- try {
- std::ofstream sstFile(filename, std::ios::binary);
- if (!sstFile) {
- throw std::runtime_error("Cannot create SST file");
- }
- // Заголовок (запишем реальные данные позже)
- BrixHeader header;
- header.entry_count = entries.size();
- sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
- // Индексные записи
- std::vector<BrixIndexEntry> index;
- index.reserve(entries.size());
- // Запись данных
- for (const auto& entry : entries) {
- BrixIndexEntry idx;
- idx.key_length = entry.id.size();
- idx.data_offset = sstFile.tellp(); // Текущая позиция
- std::string data = entry.data;
- if (ENABLE_COMPRESSION) {
- try {
- data = compressData(entry.data);
- header.flags |= 0x01; // Устанавливаем флаг сжатия
- } catch (...) {
- // В случае ошибки сжатия используем оригинальные данные
- std::cerr << "Compression failed for entry " << entry.id << ", using uncompressed" << std::endl;
- }
- }
- idx.data_length = data.size();
- // Запись данных
- uint32_t data_len = data.size();
- sstFile.write(reinterpret_cast<char*>(&data_len), sizeof(data_len));
- sstFile.write(data.data(), data.size());
- index.push_back(idx);
- }
- // Запись индекса
- header.index_offset = sstFile.tellp();
- for (const auto& idx : index) {
- sstFile.write(reinterpret_cast<const char*>(&idx), sizeof(idx));
- }
- // Обновляем заголовок
- sstFile.seekp(0);
- sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
- } catch (const std::exception& e) {
- std::cerr << "SST write error: " << e.what() << std::endl;
- fs::remove(filename); // Удаляем частично записанный файл
- throw;
- }
- }
- void compactProcess() {
- std::signal(SIGINT, signal_handler);
- std::signal(SIGTERM, signal_handler);
- // Создаем директорию для SST
- if (!fs::exists(SST_DIR)) {
- fs::create_directory(SST_DIR);
- }
- while (g_running) {
- try {
- // Проверяем размер WAL
- if (!fs::exists(WAL_FILENAME) ||
- fs::file_size(WAL_FILENAME) < WAL_SIZE_THRESHOLD) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
- continue;
- }
- // Читаем и обрабатываем WAL
- auto walEntries = readWalFile(WAL_FILENAME);
- if (walEntries.empty()) {
- continue;
- }
- // Собираем последние версии (автоматически сортируются по id)
- std::map<std::string, SstEntry> latestEntries;
- for (const auto& entry : walEntries) {
- latestEntries[entry.id] = {entry.id, entry.data};
- }
- // Генерируем имя файла
- auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch()).count();
- std::string sstFilename = SST_DIR + "/" + std::to_string(timestamp) + ".sst";
- // Подготавливаем данные для записи
- std::vector<SstEntry> entriesToWrite;
- entriesToWrite.reserve(latestEntries.size());
- for (const auto& [id, entry] : latestEntries) {
- entriesToWrite.push_back(entry);
- }
- // Записываем SST
- writeSstFile(sstFilename, entriesToWrite);
- // Очищаем WAL
- std::ofstream walFile(WAL_FILENAME, std::ios::trunc);
- std::cout << "Compaction successful. Created: " << sstFilename
- << " (" << entriesToWrite.size() << " entries)" << std::endl;
- } catch (const std::exception& e) {
- std::cerr << "Compaction error: " << e.what() << std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(10)); // Чтобы не зациклиться на ошибках
- }
- }
- }
- int main() {
- try {
- compactProcess();
- return 0;
- } catch (const std::exception& e) {
- std::cerr << "Fatal error: " << e.what() << std::endl;
- return 1;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement