Advertisement
Lauda

Untitled

Apr 13th, 2013
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.46 KB | None | 0 0
  1. /*
  2. Naparviti klasu mb (message_box) koja sadrzi n komunikacionih kanala.
  3. (n se odredjuje u trenutku instanciranja objekta klase).
  4. Komunikacioni kanal (sanduce) omogucava komunikaciju izmedju proizvodjaca i
  5. potrosaca nazavisnu od ostalih komunikacija.
  6. Svaki kanal moze da sadrzi neogranicen broj poruka.
  7.  
  8. mb ima dve operacije.
  9. mb::send() je neblokirajuca operacija sa dva parametra:
  10.   vrednosti (objekt) koja se salje i
  11.   indeksom kanala u koji se salje.
  12.  
  13. mb::receive() je blokirajuca operacija koja prihvata indeks kanala iz kojeg
  14. ocekuje poruku, a vraca objekt poruke.
  15. Jednu poruku je moguce preuzeti samo jednom (pri preuzimanju, poruka se i izbacuje iz sanduceta)
  16. Ako u kanalu nema poruke, nit koja je pozvala receive() ceka poruku.
  17. Niti koje pozovu receive moraju da dobiju poruke iz odgovarajuceg kanala,
  18. ali ne moraju da dobiju poruke u redosledu u kojem su one (poruke) poslate.
  19.  
  20. Operacije send() i receive() bacaju izuzetak ako im se prosledi kanal koji ne postoji.
  21.  
  22. Operacije ove klase su thread safe.
  23. */
  24. #include <iostream>
  25. #include <thread>
  26. #include <queue>
  27. #include <vector>
  28.  
  29. using namespace std;
  30.  
  31. template<typename T, size_t N>
  32. class mb {
  33.     vector<queue<T>> data_;
  34.     vector<condition_variable*> cv_;
  35.     mutex mx_;
  36.    
  37.     public:
  38.         mb();
  39.         ~mb();
  40.        
  41.     void send(T data, size_t channel);
  42.     T receive(size_t channel);
  43. };
  44.  
  45. template<typename T, size_t N>
  46. mb<T, N>::mb()
  47. {
  48.     for(size_t i=0; i<N; ++i)
  49.     {
  50.         data_.push_back(queue<T>{});
  51.         cv_.push_back(new condition_variable);
  52.     }
  53. }
  54.  
  55. template<typename T, size_t N>
  56. mb<T, N>::~mb()
  57. {
  58.     for(size_t i=0; i<N; ++i)
  59.     {
  60.         data_.pop_back();
  61.         delete cv_.back();
  62.         cv_.pop_back();
  63.     }
  64. }
  65.  
  66. template<typename T, size_t N>
  67. void mb<T, N>::send(T data, size_t channel)
  68. {
  69.     if (channel >= data_.size())
  70.          throw out_of_range("Invalid channel index.");
  71.  
  72.     lock_guard<mutex> l{mx_};
  73.     data_[channel].push(data);      // Ubaci poruku u kanal
  74.     cv_[channel]->notify_one();   // Probudi potrosaca ako postoji
  75. }
  76.  
  77. template<typename T, size_t N>
  78. T mb<T, N>::receive(size_t channel)
  79. {
  80.     if (channel >= data_.size())
  81.         throw out_of_range("Invalid channel index.");
  82.  
  83.     unique_lock<mutex> l{mx_};
  84.     while(data_[channel].empty()) // Dok nema poruke u kanalu -> cekaj
  85.     {  
  86.         cv_[channel]->wait(l);
  87.     }
  88.     T t = data_[channel].front();
  89.     data_[channel].pop();
  90.     return t;
  91. }
  92.  
  93. mb<char, 3> mb3;
  94.  
  95. // proizvodjac salje tri uzastopna karaktera pocevsi od prosledjenog karaktera c
  96. void producer(char c, size_t channel)
  97. {
  98.     this_thread::sleep_for(chrono::seconds(1));
  99.     mb3.send(c, channel);
  100.     this_thread::sleep_for(chrono::seconds(1));
  101.     mb3.send(c+1, channel);
  102.     this_thread::sleep_for(chrono::seconds(1));
  103.     mb3.send(c+2, channel);
  104. }
  105.  
  106. void consumer(size_t channel)
  107. {
  108.     static mutex mx;
  109.     this_thread::sleep_for(chrono::seconds(1));
  110.     char c = mb3.receive(channel);
  111.     lock_guard<mutex> l(mx);
  112.     cout << "[" << channel << "]= " << c << endl;
  113. }
  114.  
  115. const size_t PROD=3;
  116. const size_t CONS=9;
  117.  
  118. int main()
  119. {
  120.     thread prod[PROD];
  121.     thread cons[CONS];
  122.  
  123.     // posto svaki proizvodjac posalje tri uzastopna karaktera
  124.     // pocevsi od prosledjenog, znaci da pri ispisu preuzeti karakteri
  125.     // treba da budu (u proizvoljnom redosledu):
  126.     // a,b,c, g,h,i, m,n,o
  127.     char msg[] = {'a', 'g', 'm'};
  128.  
  129.     for(size_t i=0;i<PROD;++i)
  130.         prod[i]=thread(producer, msg[i], i%3);
  131.        
  132.     for(size_t i=0;i<CONS;++i)
  133.         cons[i]=thread(consumer, i%3);
  134.        
  135.     for(size_t i=0;i<PROD;++i)
  136.         prod[i].join();
  137.        
  138.     for(size_t i=0;i<CONS;++i)
  139.         cons[i].join();
  140.        
  141.     return 0;
  142. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement