Advertisement
scottish_esquire

Redka Compact Init

Mar 24th, 2025
398
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 8.55 KB | None | 0 0
  1. #include <iostream>
  2. #include <fstream>
  3. #include <string>
  4. #include <vector>
  5. #include <map> // Используем std::map вместо unordered_map для автоматической сортировки
  6. #include <filesystem>
  7. #include <chrono>
  8. #include <csignal>
  9. #include <thread>
  10. #include <zlib.h> // Для сжатия данных
  11.  
  12. namespace fs = std::filesystem;
  13.  
  14. // Конфигурационные параметры (можно вынести в отдельный файл)
  15. // const size_t WAL_SIZE_THRESHOLD = 64 * 1024 * 1024; // 64MB
  16.  
  17. const size_t WAL_SIZE_THRESHOLD = 512; // Уменьшили для тестирования!!!!!!!!!!!!!!!!!!!!!!!!!!!
  18.  
  19. const std::string WAL_FILENAME = "wal.log";
  20. const std::string SST_DIR = "sst_files";
  21. const bool ENABLE_COMPRESSION = true;
  22.  
  23. #pragma pack(push, 1) // Выравнивание структур для бинарной записи
  24. struct BrixHeader {
  25.     uint32_t version = 1;    // Версия формата
  26.     uint32_t entry_count;    // Количество записей
  27.     uint64_t index_offset;   // Смещение индексного блока
  28.     uint8_t flags = 0;       // Флаги (например, сжатие)
  29. };
  30.  
  31. struct BrixIndexEntry {
  32.     uint32_t key_length;     // Длина ключа
  33.     uint64_t data_offset;    // Смещение данных
  34.     uint32_t data_length;    // Длина данных
  35. };
  36. #pragma pack(pop)
  37.  
  38. struct WalEntry {
  39.     std::string id;
  40.     std::string data;
  41. };
  42.  
  43. struct SstEntry {
  44.     std::string id;
  45.     std::string data;
  46.  
  47.     // Для сортировки
  48.     bool operator<(const SstEntry& other) const {
  49.         return id < other.id;
  50.     }
  51. };
  52.  
  53. // Глобальный флаг для graceful shutdown
  54. volatile std::sig_atomic_t g_running = 1;
  55.  
  56. void signal_handler(int) {
  57.     g_running = 0;
  58.     std::cout << "\nReceived shutdown signal, finishing current operation..." << std::endl;
  59. }
  60.  
  61. // Сжатие данных с помощью zlib
  62. std::string compressData(const std::string& data) {
  63.     uLongf compressed_size = compressBound(data.size());
  64.     std::string compressed(compressed_size, '\0');
  65.  
  66.     if (compress2((Bytef*)compressed.data(), &compressed_size,
  67.                  (const Bytef*)data.data(), data.size(),
  68.                  Z_BEST_COMPRESSION) != Z_OK) {
  69.         throw std::runtime_error("Compression failed");
  70.     }
  71.  
  72.     compressed.resize(compressed_size);
  73.     return compressed;
  74. }
  75.  
  76. // Чтение WAL с обработкой ошибок
  77. std::vector<WalEntry> readWalFile(const std::string& filename) {
  78.     std::vector<WalEntry> entries;
  79.  
  80.     try {
  81.         std::ifstream walFile(filename);
  82.         if (!walFile) {
  83.             throw std::runtime_error("Cannot open WAL file");
  84.         }
  85.  
  86.         std::string line;
  87.         while (g_running && std::getline(walFile, line)) {
  88.             try {
  89.                 size_t at_pos = line.find('@');
  90.                 size_t space_pos = line.find(' ', at_pos);
  91.  
  92.                 if (at_pos == std::string::npos || space_pos == std::string::npos) {
  93.                     std::cerr << "Skipping malformed WAL entry: " << line << std::endl;
  94.                     continue;
  95.                 }
  96.  
  97.                 std::string id = line.substr(at_pos + 1, space_pos - at_pos - 1);
  98.                 entries.push_back({id, line});
  99.             } catch (const std::exception& e) {
  100.                 std::cerr << "Error parsing WAL entry: " << e.what() << std::endl;
  101.             }
  102.         }
  103.     } catch (const std::exception& e) {
  104.         std::cerr << "WAL read error: " << e.what() << std::endl;
  105.     }
  106.  
  107.     return entries;
  108. }
  109.  
  110. // Запись в SST с индексом и поддержкой сжатия
  111. void writeSstFile(const std::string& filename, const std::vector<SstEntry>& entries) {
  112.     try {
  113.         std::ofstream sstFile(filename, std::ios::binary);
  114.         if (!sstFile) {
  115.             throw std::runtime_error("Cannot create SST file");
  116.         }
  117.  
  118.         // Заголовок (запишем реальные данные позже)
  119.         BrixHeader header;
  120.         header.entry_count = entries.size();
  121.         sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
  122.  
  123.         // Индексные записи
  124.         std::vector<BrixIndexEntry> index;
  125.         index.reserve(entries.size());
  126.  
  127.         // Запись данных
  128.         for (const auto& entry : entries) {
  129.             BrixIndexEntry idx;
  130.             idx.key_length = entry.id.size();
  131.             idx.data_offset = sstFile.tellp(); // Текущая позиция
  132.  
  133.             std::string data = entry.data;
  134.             if (ENABLE_COMPRESSION) {
  135.                 try {
  136.                     data = compressData(entry.data);
  137.                     header.flags |= 0x01; // Устанавливаем флаг сжатия
  138.                 } catch (...) {
  139.                     // В случае ошибки сжатия используем оригинальные данные
  140.                     std::cerr << "Compression failed for entry " << entry.id << ", using uncompressed" << std::endl;
  141.                 }
  142.             }
  143.  
  144.             idx.data_length = data.size();
  145.  
  146.             // Запись данных
  147.             uint32_t data_len = data.size();
  148.             sstFile.write(reinterpret_cast<char*>(&data_len), sizeof(data_len));
  149.             sstFile.write(data.data(), data.size());
  150.  
  151.             index.push_back(idx);
  152.         }
  153.  
  154.         // Запись индекса
  155.         header.index_offset = sstFile.tellp();
  156.         for (const auto& idx : index) {
  157.             sstFile.write(reinterpret_cast<const char*>(&idx), sizeof(idx));
  158.         }
  159.  
  160.         // Обновляем заголовок
  161.         sstFile.seekp(0);
  162.         sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
  163.  
  164.     } catch (const std::exception& e) {
  165.         std::cerr << "SST write error: " << e.what() << std::endl;
  166.         fs::remove(filename); // Удаляем частично записанный файл
  167.         throw;
  168.     }
  169. }
  170.  
  171. void compactProcess() {
  172.     std::signal(SIGINT, signal_handler);
  173.     std::signal(SIGTERM, signal_handler);
  174.  
  175.     // Создаем директорию для SST
  176.     if (!fs::exists(SST_DIR)) {
  177.         fs::create_directory(SST_DIR);
  178.     }
  179.  
  180.     while (g_running) {
  181.         try {
  182.             // Проверяем размер WAL
  183.             if (!fs::exists(WAL_FILENAME) ||
  184.                 fs::file_size(WAL_FILENAME) < WAL_SIZE_THRESHOLD) {
  185.                 std::this_thread::sleep_for(std::chrono::seconds(5));
  186.                 continue;
  187.             }
  188.  
  189.             // Читаем и обрабатываем WAL
  190.             auto walEntries = readWalFile(WAL_FILENAME);
  191.             if (walEntries.empty()) {
  192.                 continue;
  193.             }
  194.  
  195.             // Собираем последние версии (автоматически сортируются по id)
  196.             std::map<std::string, SstEntry> latestEntries;
  197.             for (const auto& entry : walEntries) {
  198.                 latestEntries[entry.id] = {entry.id, entry.data};
  199.             }
  200.  
  201.             // Генерируем имя файла
  202.             auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
  203.                 std::chrono::system_clock::now().time_since_epoch()).count();
  204.             std::string sstFilename = SST_DIR + "/" + std::to_string(timestamp) + ".sst";
  205.  
  206.             // Подготавливаем данные для записи
  207.             std::vector<SstEntry> entriesToWrite;
  208.             entriesToWrite.reserve(latestEntries.size());
  209.             for (const auto& [id, entry] : latestEntries) {
  210.                 entriesToWrite.push_back(entry);
  211.             }
  212.  
  213.             // Записываем SST
  214.             writeSstFile(sstFilename, entriesToWrite);
  215.  
  216.             // Очищаем WAL
  217.             std::ofstream walFile(WAL_FILENAME, std::ios::trunc);
  218.  
  219.             std::cout << "Compaction successful. Created: " << sstFilename
  220.                       << " (" << entriesToWrite.size() << " entries)" << std::endl;
  221.  
  222.         } catch (const std::exception& e) {
  223.             std::cerr << "Compaction error: " << e.what() << std::endl;
  224.             std::this_thread::sleep_for(std::chrono::seconds(10)); // Чтобы не зациклиться на ошибках
  225.         }
  226.     }
  227. }
  228.  
  229. int main() {
  230.     try {
  231.         compactProcess();
  232.         return 0;
  233.     } catch (const std::exception& e) {
  234.         std::cerr << "Fatal error: " << e.what() << std::endl;
  235.         return 1;
  236.     }
  237. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement