alaestor

[Go] p3 multi-distributor-consumer

Aug 31st, 2020
203
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.09 KB | None | 0 0
  1. package main
  2. //2020  Aion Algos#9218 http://discord.futuregadgetlab.net
  3. import (
  4.   "fmt"
  5.   "time"
  6.   "sync"
  7.   "container/list"
  8. )
  9.  
  10. type Stack struct {
  11.   stack *list.List
  12. }
  13.  
  14. func (s *Stack) Push(value int) {
  15.   s.stack.PushFront(value)
  16. }
  17.  
  18. func (s *Stack) Pop() (int, error) {
  19.   if (s.stack.Len() > 0) {
  20.     ele := s.stack.Front()
  21.     s.stack.Remove(ele)
  22.     return ele.Value.(int),nil
  23.   }
  24.   return 0,fmt.Errorf("Error: Nothing to Pop")
  25. }
  26.  
  27. func (s *Stack) Empty() bool {
  28.     return s.stack.Len() == 0
  29. }
  30.  
  31. func main() {
  32.   fmt.Println("Multi-Distributor-Consumer example starting.")
  33.   distributors := 5
  34.   consumers := 20
  35.   buffer := 1
  36.  
  37.   var stack_mutex sync.Mutex
  38.   stack := &Stack { stack: list.New() }
  39.   for i := 0; i < 256; i++ {
  40.     stack.Push(i)
  41.   }
  42.  
  43.   queue := make(chan int, buffer)
  44.  
  45.   var dwg sync.WaitGroup
  46.   dwg.Add(distributors)
  47.   for i := 0; i < distributors; i++ {
  48.     go distributor(i, &dwg, &stack_mutex, stack, queue)
  49.   }
  50.  
  51.   var cwg sync.WaitGroup
  52.   cwg.Add(consumers)
  53.   for i := 0; i < consumers; i++ {
  54.     go consumer(i, &cwg, queue)
  55.   }
  56.  
  57.   dwg.Wait()
  58.   close(queue)
  59.   cwg.Wait()
  60.   if (stack.Empty() != true) {
  61.     fmt.Println("Something went wrong! Items are still on the stack!")
  62.   } else { fmt.Println("Multi-Distributor-Consumer example complete.") }
  63. }
  64.  
  65. func consumer(id int, wg *sync.WaitGroup, queue <-chan int) {
  66.   fmt.Println("Consumer", id, "started.")
  67.   defer wg.Done()
  68.   for q := range queue {
  69.     fmt.Println("Consumer", id, "recieved:", q)
  70.     time.Sleep(time.Millisecond * 200)
  71.   }
  72.   fmt.Println("Consumer", id, "done.")
  73. }
  74.  
  75. func distributor(id int, wg *sync.WaitGroup, sm *sync.Mutex, s *Stack, queue chan<- int) {
  76.   defer wg.Done()
  77.   fmt.Println("Distributor", id, "started.")
  78.   for {
  79.     if s.Empty() {
  80.       break;
  81.     }
  82.     // enter critical section
  83.     sm.Lock()
  84.     i,e := s.Pop()
  85.     sm.Unlock()
  86.     // leave critical section
  87.     if e != nil {
  88.       fmt.Println("Distributor", id, e)
  89.       break;
  90.     }
  91.     queue<- i
  92.     fmt.Println("Distributor", id, "produced:", i)
  93.   }
  94.   fmt.Println("Distributor", id, "done.")
  95. }
  96.  
Add Comment
Please, Sign In to add comment