Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <fstream>
- #include <string>
- #include <vector>
- #include <map>
- #include <filesystem>
- #include <chrono>
- #include <csignal>
- #include <thread>
- #include <algorithm>
- namespace fs = std::filesystem;
- // Конфигурация
- const size_t WAL_SIZE_THRESHOLD = 512;
- const std::string WAL_FILENAME = "wal.log";
- const std::string SST_DIR = "sst_files";
- const std::string MAIN_SST = SST_DIR + "/main.sst";
- #pragma pack(push, 1)
- struct BrixHeader {
- uint32_t version = 1;
- uint32_t entry_count;
- uint64_t index_offset;
- };
- struct BrixIndexEntry {
- uint32_t key_length;
- uint64_t data_offset;
- uint32_t data_length;
- uint32_t reserved = 0;
- };
- #pragma pack(pop)
- struct WalEntry {
- std::string id;
- std::string data;
- };
- struct SstEntry {
- std::string id;
- std::string data;
- bool operator<(const SstEntry& other) const {
- return id < other.id;
- }
- };
- struct SstFileIndex {
- std::string filename;
- std::vector<std::pair<std::string, BrixIndexEntry>> sorted_index;
- BrixHeader header;
- };
- volatile std::sig_atomic_t g_running = 1;
- std::vector<SstFileIndex> sst_indices;
- // Сигналы
- void signal_handler(int) {
- g_running = 0;
- std::cout << "\nShutting down..." << std::endl;
- }
- // Чтение WAL
- std::vector<WalEntry> readWalFile(const std::string& filename) {
- std::vector<WalEntry> entries;
- try {
- std::ifstream walFile(filename);
- if (!walFile) throw std::runtime_error("Can't open WAL");
- std::string line;
- while (g_running && std::getline(walFile, line)) {
- size_t at_pos = line.find('@');
- size_t space_pos = line.find(' ', at_pos);
- if (at_pos == std::string::npos || space_pos == std::string::npos) {
- std::cerr << "Skipping malformed WAL entry: " << line << std::endl;
- continue;
- }
- entries.push_back({
- line.substr(at_pos + 1, space_pos - at_pos - 1),
- line
- });
- }
- } catch (const std::exception& e) {
- std::cerr << "WAL error: " << e.what() << std::endl;
- }
- return entries;
- }
- // Загрузка индекса SST
- void loadSstIndex(const std::string& filename, SstFileIndex& file_index) {
- std::ifstream file(filename, std::ios::binary);
- if (!file) throw std::runtime_error("Can't open SST: " + filename);
- file.read(reinterpret_cast<char*>(&file_index.header), sizeof(BrixHeader));
- file.seekg(file_index.header.index_offset);
- file_index.sorted_index.reserve(file_index.header.entry_count);
- for (uint32_t i = 0; i < file_index.header.entry_count; ++i) {
- BrixIndexEntry entry;
- file.read(reinterpret_cast<char*>(&entry), sizeof(BrixIndexEntry));
- auto saved_pos = file.tellg();
- file.seekg(entry.data_offset);
- uint32_t data_len;
- file.read(reinterpret_cast<char*>(&data_len), sizeof(data_len));
- std::string key(entry.key_length, '\0');
- file.read(&key[0], entry.key_length);
- file_index.sorted_index.emplace_back(std::move(key), entry);
- file.seekg(saved_pos);
- }
- std::sort(file_index.sorted_index.begin(), file_index.sorted_index.end(),
- [](const auto& a, const auto& b) { return a.first < b.first; });
- file_index.filename = filename;
- }
- // Поиск по ключу (сначала в WAL, затем в main.sst)
- std::string findKey(const std::string& key) {
- // 1. Проверяем WAL (новые данные)
- if (fs::exists(WAL_FILENAME)) {
- auto walEntries = readWalFile(WAL_FILENAME);
- for (auto it = walEntries.rbegin(); it != walEntries.rend(); ++it) {
- if (it->id == key) return it->data;
- }
- }
- // 2. Ищем в основном SST
- if (fs::exists(MAIN_SST)) {
- SstFileIndex main_index;
- loadSstIndex(MAIN_SST, main_index);
- auto comp = [](const auto& a, const std::string& b) { return a.first < b; };
- auto found = std::lower_bound(main_index.sorted_index.begin(),
- main_index.sorted_index.end(), key, comp);
- if (found != main_index.sorted_index.end() && found->first == key) {
- std::ifstream file(MAIN_SST, std::ios::binary);
- file.seekg(found->second.data_offset);
- uint32_t data_len;
- file.read(reinterpret_cast<char*>(&data_len), sizeof(data_len));
- file.seekg(found->second.key_length, std::ios::cur);
- std::string data(data_len - found->second.key_length, '\0');
- file.read(&data[0], data.size());
- return data;
- }
- }
- return "";
- }
- // Запись SST
- void writeSstFile(const std::string& filename, const std::vector<SstEntry>& entries) {
- std::ofstream sstFile(filename, std::ios::binary);
- if (!sstFile) throw std::runtime_error("Can't create SST");
- BrixHeader header;
- header.entry_count = entries.size();
- sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
- std::vector<BrixIndexEntry> index;
- for (const auto& entry : entries) {
- BrixIndexEntry idx;
- idx.key_length = entry.id.size();
- idx.data_offset = sstFile.tellp();
- uint32_t total_len = sizeof(uint32_t) + entry.id.size() + entry.data.size();
- sstFile.write(reinterpret_cast<char*>(&total_len), sizeof(total_len));
- sstFile.write(entry.id.data(), entry.id.size());
- sstFile.write(entry.data.data(), entry.data.size());
- idx.data_length = total_len;
- index.push_back(idx);
- }
- header.index_offset = sstFile.tellp();
- for (const auto& idx : index) {
- sstFile.write(reinterpret_cast<const char*>(&idx), sizeof(idx));
- }
- sstFile.seekp(0);
- sstFile.write(reinterpret_cast<char*>(&header), sizeof(header));
- }
- // Мерж всех данных в main.sst
- void compactToMainSst() {
- // 1. Собираем все данные (WAL + текущий main.sst)
- std::map<std::string, std::string> latest_data;
- // Читаем WAL (новые данные)
- auto wal_entries = readWalFile(WAL_FILENAME);
- for (const auto& entry : wal_entries) {
- latest_data[entry.id] = entry.data; // Всегда перезаписываем последнюю версию
- }
- // Читаем текущий main.sst (если есть)
- if (fs::exists(MAIN_SST)) {
- SstFileIndex main_index;
- loadSstIndex(MAIN_SST, main_index);
- std::ifstream file(MAIN_SST, std::ios::binary);
- for (const auto& [key, entry] : main_index.sorted_index) {
- // Пропускаем ключи, которые уже есть в WAL (они новее)
- if (latest_data.count(key)) continue;
- file.seekg(entry.data_offset);
- uint32_t data_len;
- file.read(reinterpret_cast<char*>(&data_len), sizeof(data_len));
- file.seekg(entry.key_length, std::ios::cur);
- std::string data(data_len - entry.key_length, '\0');
- file.read(&data[0], data.size());
- latest_data[key] = data;
- }
- }
- // 2. Записываем только последние версии в новый main.sst
- std::vector<SstEntry> entries;
- for (const auto& [id, data] : latest_data) {
- entries.push_back({id, data});
- }
- // Атомарная замена файла
- std::string tmp_file = MAIN_SST + ".tmp";
- writeSstFile(tmp_file, entries);
- fs::rename(tmp_file, MAIN_SST);
- // 3. Очищаем WAL
- std::ofstream walFile(WAL_FILENAME, std::ios::trunc);
- }
- // Фоновый процесс компактизации
- void compactProcess() {
- std::signal(SIGINT, signal_handler);
- std::signal(SIGTERM, signal_handler);
- if (!fs::exists(SST_DIR)) fs::create_directory(SST_DIR);
- while (g_running) {
- try {
- if (fs::exists(WAL_FILENAME) && fs::file_size(WAL_FILENAME) >= WAL_SIZE_THRESHOLD) {
- compactToMainSst();
- std::cout << "Compaction complete. Main SST updated." << std::endl;
- }
- std::this_thread::sleep_for(std::chrono::seconds(5));
- } catch (const std::exception& e) {
- std::cerr << "Compaction failed: " << e.what() << std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(10));
- }
- }
- }
- // Тест поиска
- void testSearch() {
- std::string key;
- while (g_running) {
- std::cout << "Enter key (or 'exit'): ";
- std::getline(std::cin, key);
- if (key == "exit") break;
- auto start = std::chrono::high_resolution_clock::now();
- std::string value = findKey(key);
- auto end = std::chrono::high_resolution_clock::now();
- std::cout << (value.empty() ? "Not found" : value)
- << " (" << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
- << " ms)" << std::endl;
- }
- }
- int main() {
- try {
- std::thread compact_thread(compactProcess);
- testSearch();
- g_running = 0;
- compact_thread.join();
- return 0;
- } catch (const std::exception& e) {
- std::cerr << "Fatal error: " << e.what() << std::endl;
- return 1;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement