Advertisement
lichenran1234

Concurrent KV store

Apr 11th, 2021
251
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.07 KB | None | 0 0
  1. #include <thread>
  2. #include <memory>
  3.  
  4. using namespace std;
  5.  
  6. class KeyValueStore {
  7.  public:
  8.     KeyValueStore() {
  9.         // If there is no snapshotting:
  10.         // write_ahead_log_helper_.locateToBeginning();
  11.         // int key;
  12.         // int value;
  13.         // while (write_ahead_log_helper_.hasNext()) {
  14.         //     write_ahead_log_helper_.next(&key, &value);
  15.         //     map_[key] = value;
  16.         // }
  17.        
  18.         // If there is snapshotting:
  19.         // map_snapshotter_.recoverFromLatestSnapshot(&map_, &current_version_id_);
  20.         // write_ahead_log_helper_.locateToVersion(current_version_id_);
  21.         // int key;
  22.         // int value;
  23.         // while (write_ahead_log_helper_.hasNext()) {
  24.         //     write_ahead_log_helper_.next(&key, &value);
  25.         //     map_[key] = value;
  26.         //     ++current_version_id_;
  27.         //     ++num_writes_since_last_snapshot_;
  28.         // }
  29.     }
  30.     bool put(int key, int value) {
  31.         acquireWriteLock();
  32.         // write_ahead_log_helper_.append(current_version_id_, "put," + key + "," + value);
  33.         // ++current_version_id_;
  34.         // ++num_writes_since_last_snapshot_;
  35.         // Snapshot the map once every 1000 writes.
  36.         // bool should_create_snapshot = false;
  37.         // if (num_writes_since_last_snapshot_ >= 1000) {
  38.         //     should_create_snapshot = true;
  39.         // }
  40.         map_[key] = value;
  41.         releaseWriteLock();
  42.        
  43.         // if (should_create_snapshot) {
  44.         //     acquireReadLock();
  45.         //     num_writes_since_last_snapshot_ = 0;
  46.         //     map_snapshotter_.createSnapshot(map_, current_version_id_);
  47.         //     releaseReadLock();
  48.         // }
  49.         return true;
  50.     }
  51.     bool get(int key, int* value) {
  52.         bool is_successful = false;
  53.         acquireReadLock();
  54.         auto it = map_.find(key);
  55.         if (it != map_.end()) {
  56.             is_successful = true;
  57.             *value = it->second;
  58.         }
  59.         releaseReadLock();
  60.         return is_successful;
  61.     }
  62.  private:
  63.     void acquireReadLock() {
  64.         unique_lock<mutex> lock(mutex_);
  65.         cv_.wait(lock, [this] {return !has_write_thread_;});
  66.         num_read_threads_ += 1;
  67.     }
  68.     void releaseReadLock() {
  69.         unique_lock<mutex> lock(mutex_);
  70.         num_read_threads_ -= 1;
  71.         if (num_read_threads_ == 0) {
  72.             cv_.notify_all();
  73.         }
  74.     }
  75.     void acquireWriteLock() {
  76.         unique_lock<mutex> lock(mutex_);
  77.         cv_.wait(lock, [this] {return !has_write_thread_ && num_read_threads_ == 0;});
  78.         has_write_thread_ = true;
  79.     }
  80.     void releaseWriteLock() {
  81.         unique_lock<mutex> lock(mutex_);
  82.         has_write_thread_ = false;
  83.         cv_.notify_all();
  84.     }
  85.    
  86.     map<int, int> map_;
  87.    
  88.     // WriteAheadLogHelper write_ahead_log_helper_;
  89.     // MapSnapshotter map_snapshotter_;
  90.     // int current_version_id_ = 0;
  91.     // int num_writes_since_last_snapshot_ = 0;
  92.    
  93.     bool has_write_thread_ = false;
  94.     bool num_read_threads_ = 0;
  95.    
  96.     mutex mutex_;
  97.     condition_variable cv_;
  98. };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement