Advertisement
scottish_esquire

Redka Compact Init ver 2.0

Mar 25th, 2025
423
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 9.46 KB | None | 0 0
  1. #include <iostream>
  2. #include <fstream>
  3. #include <string>
  4. #include <vector>
  5. #include <map>
  6. #include <filesystem>
  7. #include <chrono>
  8. #include <csignal>
  9. #include <thread>
  10. #include <algorithm>
  11.  
  12. namespace fs = std::filesystem;
  13.  
  14. // Конфигурация
  15. const size_t WAL_SIZE_THRESHOLD = 512;
  16. const std::string WAL_FILENAME = "wal.log";
  17. const std::string SST_DIR = "sst_files";
  18. const std::string MAIN_SST = SST_DIR + "/main.sst";
  19.  
  20. #pragma pack(push, 1)
  21. struct BrixHeader {
  22.     uint32_t version = 1;
  23.     uint32_t entry_count;
  24.     uint64_t index_offset;
  25. };
  26.  
  27. struct BrixIndexEntry {
  28.     uint32_t key_length;
  29.     uint64_t data_offset;
  30.     uint32_t data_length;
  31.     uint32_t reserved = 0;
  32. };
  33. #pragma pack(pop)
  34.  
  35. struct WalEntry {
  36.     std::string id;
  37.     std::string data;
  38. };
  39.  
  40. struct SstEntry {
  41.     std::string id;
  42.     std::string data;
  43.  
  44.     bool operator<(const SstEntry& other) const {
  45.         return id < other.id;
  46.     }
  47. };
  48.  
  49. struct SstFileIndex {
  50.     std::string filename;
  51.     std::vector<std::pair<std::string, BrixIndexEntry>> sorted_index;
  52.     BrixHeader header;
  53. };
  54.  
  55. volatile std::sig_atomic_t g_running = 1;
  56. std::vector<SstFileIndex> sst_indices;
  57.  
  58. // Сигналы
  59. void signal_handler(int) {
  60.     g_running = 0;
  61.     std::cout << "\nShutting down..." << std::endl;
  62. }
  63.  
  64. // Чтение WAL
  65. std::vector<WalEntry> readWalFile(const std::string& filename) {
  66.     std::vector<WalEntry> entries;
  67.     try {
  68.         std::ifstream walFile(filename);
  69.         if (!walFile) throw std::runtime_error("Can't open WAL");
  70.  
  71.         std::string line;
  72.         while (g_running && std::getline(walFile, line)) {
  73.             size_t at_pos = line.find('@');
  74.             size_t space_pos = line.find(' ', at_pos);
  75.             if (at_pos == std::string::npos || space_pos == std::string::npos) {
  76.                 std::cerr << "Skipping malformed WAL entry: " << line << std::endl;
  77.                 continue;
  78.             }
  79.             entries.push_back({
  80.                 line.substr(at_pos + 1, space_pos - at_pos - 1),
  81.                 line
  82.             });
  83.         }
  84.     } catch (const std::exception& e) {
  85.         std::cerr << "WAL error: " << e.what() << std::endl;
  86.     }
  87.     return entries;
  88. }
  89.  
  90. // Загрузка индекса SST
  91. void loadSstIndex(const std::string& filename, SstFileIndex& file_index) {
  92.     std::ifstream file(filename, std::ios::binary);
  93.     if (!file) throw std::runtime_error("Can't open SST: " + filename);
  94.  
  95.     file.read(reinterpret_cast<char*>(&file_index.header), sizeof(BrixHeader));
  96.     file.seekg(file_index.header.index_offset);
  97.  
  98.     file_index.sorted_index.reserve(file_index.header.entry_count);
  99.     for (uint32_t i = 0; i < file_index.header.entry_count; ++i) {
  100.         BrixIndexEntry entry;
  101.         file.read(reinterpret_cast<char*>(&entry), sizeof(BrixIndexEntry));
  102.  
  103.         auto saved_pos = file.tellg();
  104.         file.seekg(entry.data_offset);
  105.        
  106.         uint32_t data_len;
  107.         file.read(reinterpret_cast<char*>(&data_len), sizeof(data_len));
  108.        
  109.         std::string key(entry.key_length, '\0');
  110.         file.read(&key[0], entry.key_length);
  111.        
  112.         file_index.sorted_index.emplace_back(std::move(key), entry);
  113.         file.seekg(saved_pos);
  114.     }
  115.  
  116.     std::sort(file_index.sorted_index.begin(), file_index.sorted_index.end(),
  117.         [](const auto& a, const auto& b) { return a.first < b.first; });
  118.  
  119.     file_index.filename = filename;
  120. }
  121.  
  122. // Поиск по ключу (сначала в WAL, затем в main.sst)
  123. std::string findKey(const std::string& key) {
  124.     // 1. Проверяем WAL (новые данные)
  125.     if (fs::exists(WAL_FILENAME)) {
  126.         auto walEntries = readWalFile(WAL_FILENAME);
  127.         for (auto it = walEntries.rbegin(); it != walEntries.rend(); ++it) {
  128.             if (it->id == key) return it->data;
  129.         }
  130.     }
  131.  
  132.     // 2. Ищем в основном SST
  133.     if (fs::exists(MAIN_SST)) {
  134.         SstFileIndex main_index;
  135.         loadSstIndex(MAIN_SST, main_index);
  136.        
  137.         auto comp = [](const auto& a, const std::string& b) { return a.first < b; };
  138.         auto found = std::lower_bound(main_index.sorted_index.begin(),
  139.                                      main_index.sorted_index.end(), key, comp);
  140.        
  141.         if (found != main_index.sorted_index.end() && found->first == key) {
  142.             std::ifstream file(MAIN_SST, std::ios::binary);
  143.             file.seekg(found->second.data_offset);
  144.            
  145.             uint32_t data_len;
  146.             file.read(reinterpret_cast<char*>(&data_len), sizeof(data_len));
  147.             file.seekg(found->second.key_length, std::ios::cur);
  148.            
  149.             std::string data(data_len - found->second.key_length, '\0');
  150.             file.read(&data[0], data.size());
  151.             return data;
  152.         }
  153.     }
  154.  
  155.     return "";
  156. }
  157.  
  158. // Запись SST
  159. void writeSstFile(const std::string& filename, const std::vector<SstEntry>& entries) {
  160.     std::ofstream sstFile(filename, std::ios::binary);
  161.     if (!sstFile) throw std::runtime_error("Can't create SST");
  162.  
  163.     BrixHeader header;
  164.     header.entry_count = entries.size();
  165.     sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
  166.  
  167.     std::vector<BrixIndexEntry> index;
  168.     for (const auto& entry : entries) {
  169.         BrixIndexEntry idx;
  170.         idx.key_length = entry.id.size();
  171.         idx.data_offset = sstFile.tellp();
  172.  
  173.         uint32_t total_len = sizeof(uint32_t) + entry.id.size() + entry.data.size();
  174.         sstFile.write(reinterpret_cast<char*>(&total_len), sizeof(total_len));
  175.         sstFile.write(entry.id.data(), entry.id.size());
  176.         sstFile.write(entry.data.data(), entry.data.size());
  177.  
  178.         idx.data_length = total_len;
  179.         index.push_back(idx);
  180.     }
  181.  
  182.     header.index_offset = sstFile.tellp();
  183.     for (const auto& idx : index) {
  184.         sstFile.write(reinterpret_cast<const char*>(&idx), sizeof(idx));
  185.     }
  186.  
  187.     sstFile.seekp(0);
  188.     sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
  189. }
  190.  
  191. // Мерж всех данных в main.sst
  192. void compactToMainSst() {
  193.     // 1. Собираем все данные (WAL + текущий main.sst)
  194.     std::map<std::string, std::string> latest_data;
  195.  
  196.     // Читаем WAL (новые данные)
  197.     auto wal_entries = readWalFile(WAL_FILENAME);
  198.     for (const auto& entry : wal_entries) {
  199.         latest_data[entry.id] = entry.data; // Всегда перезаписываем последнюю версию
  200.     }
  201.  
  202.     // Читаем текущий main.sst (если есть)
  203.     if (fs::exists(MAIN_SST)) {
  204.         SstFileIndex main_index;
  205.         loadSstIndex(MAIN_SST, main_index);
  206.  
  207.         std::ifstream file(MAIN_SST, std::ios::binary);
  208.         for (const auto& [key, entry] : main_index.sorted_index) {
  209.             // Пропускаем ключи, которые уже есть в WAL (они новее)
  210.             if (latest_data.count(key)) continue;
  211.  
  212.             file.seekg(entry.data_offset);
  213.             uint32_t data_len;
  214.             file.read(reinterpret_cast<char*>(&data_len), sizeof(data_len));
  215.             file.seekg(entry.key_length, std::ios::cur);
  216.  
  217.             std::string data(data_len - entry.key_length, '\0');
  218.             file.read(&data[0], data.size());
  219.             latest_data[key] = data;
  220.         }
  221.     }
  222.  
  223.     // 2. Записываем только последние версии в новый main.sst
  224.     std::vector<SstEntry> entries;
  225.     for (const auto& [id, data] : latest_data) {
  226.         entries.push_back({id, data});
  227.     }
  228.  
  229.     // Атомарная замена файла
  230.     std::string tmp_file = MAIN_SST + ".tmp";
  231.     writeSstFile(tmp_file, entries);
  232.     fs::rename(tmp_file, MAIN_SST);
  233.  
  234.     // 3. Очищаем WAL
  235.     std::ofstream walFile(WAL_FILENAME, std::ios::trunc);
  236. }
  237.  
  238. // Фоновый процесс компактизации
  239. void compactProcess() {
  240.     std::signal(SIGINT, signal_handler);
  241.     std::signal(SIGTERM, signal_handler);
  242.  
  243.     if (!fs::exists(SST_DIR)) fs::create_directory(SST_DIR);
  244.  
  245.     while (g_running) {
  246.         try {
  247.             if (fs::exists(WAL_FILENAME) && fs::file_size(WAL_FILENAME) >= WAL_SIZE_THRESHOLD) {
  248.                 compactToMainSst();
  249.                 std::cout << "Compaction complete. Main SST updated." << std::endl;
  250.             }
  251.             std::this_thread::sleep_for(std::chrono::seconds(5));
  252.         } catch (const std::exception& e) {
  253.             std::cerr << "Compaction failed: " << e.what() << std::endl;
  254.             std::this_thread::sleep_for(std::chrono::seconds(10));
  255.         }
  256.     }
  257. }
  258.  
  259. // Тест поиска
  260. void testSearch() {
  261.     std::string key;
  262.     while (g_running) {
  263.         std::cout << "Enter key (or 'exit'): ";
  264.         std::getline(std::cin, key);
  265.         if (key == "exit") break;
  266.        
  267.         auto start = std::chrono::high_resolution_clock::now();
  268.         std::string value = findKey(key);
  269.         auto end = std::chrono::high_resolution_clock::now();
  270.        
  271.         std::cout << (value.empty() ? "Not found" : value)
  272.                   << " (" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
  273.                   << " ms)" << std::endl;
  274.     }
  275. }
  276.  
  277. int main() {
  278.     try {
  279.         std::thread compact_thread(compactProcess);
  280.         testSearch();
  281.        
  282.         g_running = 0;
  283.         compact_thread.join();
  284.         return 0;
  285.     } catch (const std::exception& e) {
  286.         std::cerr << "Fatal error: " << e.what() << std::endl;
  287.         return 1;
  288.     }
  289. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement