Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- Naparviti klasu mb (message_box) koja sadrzi n komunikacionih kanala.
- (n se odredjuje u trenutku instanciranja objekta klase).
- Komunikacioni kanal (sanduce) omogucava komunikaciju izmedju proizvodjaca i
- potrosaca nazavisnu od ostalih komunikacija.
- Svaki kanal moze da sadrzi neogranicen broj poruka.
- mb ima dve operacije.
- mb::send() je neblokirajuca operacija sa dva parametra:
- vrednosti (objekt) koja se salje i
- indeksom kanala u koji se salje.
- mb::receive() je blokirajuca operacija koja prihvata indeks kanala iz kojeg
- ocekuje poruku, a vraca objekt poruke.
- Jednu poruku je moguce preuzeti samo jednom (pri preuzimanju, poruka se i izbacuje iz sanduceta)
- Ako u kanalu nema poruke, nit koja je pozvala receive() ceka poruku.
- Niti koje pozovu receive moraju da dobiju poruke iz odgovarajuceg kanala,
- ali ne moraju da dobiju poruke u redosledu u kojem su one (poruke) poslate.
- Operacije send() i receive() bacaju izuzetak ako im se prosledi kanal koji ne postoji.
- Operacije ove klase su thread safe.
- */
- #include <iostream>
- #include <thread>
- #include <queue>
- #include <vector>
- using namespace std;
- template<typename T, size_t N>
- class mb {
- vector<queue<T>> data_;
- vector<condition_variable*> cv_;
- mutex mx_;
- public:
- mb();
- ~mb();
- void send(T data, size_t channel);
- T receive(size_t channel);
- };
- template<typename T, size_t N>
- mb<T, N>::mb()
- {
- for(size_t i=0; i<N; ++i)
- {
- data_.push_back(queue<T>{});
- cv_.push_back(new condition_variable);
- }
- }
- template<typename T, size_t N>
- mb<T, N>::~mb()
- {
- for(size_t i=0; i<N; ++i)
- {
- data_.pop_back();
- delete cv_.back();
- cv_.pop_back();
- }
- }
- template<typename T, size_t N>
- void mb<T, N>::send(T data, size_t channel)
- {
- if (channel >= data_.size())
- throw out_of_range("Invalid channel index.");
- lock_guard<mutex> l{mx_};
- data_[channel].push(data); // Ubaci poruku u kanal
- cv_[channel]->notify_one(); // Probudi potrosaca ako postoji
- }
- template<typename T, size_t N>
- T mb<T, N>::receive(size_t channel)
- {
- if (channel >= data_.size())
- throw out_of_range("Invalid channel index.");
- unique_lock<mutex> l{mx_};
- while(data_[channel].empty()) // Dok nema poruke u kanalu -> cekaj
- {
- cv_[channel]->wait(l);
- }
- T t = data_[channel].front();
- data_[channel].pop();
- return t;
- }
- mb<char, 3> mb3;
- // proizvodjac salje tri uzastopna karaktera pocevsi od prosledjenog karaktera c
- void producer(char c, size_t channel)
- {
- this_thread::sleep_for(chrono::seconds(1));
- mb3.send(c, channel);
- this_thread::sleep_for(chrono::seconds(1));
- mb3.send(c+1, channel);
- this_thread::sleep_for(chrono::seconds(1));
- mb3.send(c+2, channel);
- }
- void consumer(size_t channel)
- {
- static mutex mx;
- this_thread::sleep_for(chrono::seconds(1));
- char c = mb3.receive(channel);
- lock_guard<mutex> l(mx);
- cout << "[" << channel << "]= " << c << endl;
- }
- const size_t PROD=3;
- const size_t CONS=9;
- int main()
- {
- thread prod[PROD];
- thread cons[CONS];
- // posto svaki proizvodjac posalje tri uzastopna karaktera
- // pocevsi od prosledjenog, znaci da pri ispisu preuzeti karakteri
- // treba da budu (u proizvoljnom redosledu):
- // a,b,c, g,h,i, m,n,o
- char msg[] = {'a', 'g', 'm'};
- for(size_t i=0;i<PROD;++i)
- prod[i]=thread(producer, msg[i], i%3);
- for(size_t i=0;i<CONS;++i)
- cons[i]=thread(consumer, i%3);
- for(size_t i=0;i<PROD;++i)
- prod[i].join();
- for(size_t i=0;i<CONS;++i)
- cons[i].join();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement