Advertisement
WeltEnSTurm

Untitled

Feb 2nd, 2018
3,196
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
D 2.59 KB | None | 0 0
  1. module pipes;
  2.  
  3. import
  4.     core.thread,
  5.     std.parallelism,
  6.     std.concurrency,
  7.     std.functional,
  8.     std.typecons,
  9.     std.traits;
  10.  
  11.  
  12. synchronized class Queue(T) {
  13.     private T[] queue;
  14.  
  15.     void opOpAssign(string op)(T object) if(op == "~") {
  16.         queue ~= object;
  17.     }
  18.  
  19.     auto length(){
  20.         return queue.length;
  21.     }
  22.  
  23.     Nullable!T pop(){
  24.         Nullable!T result;
  25.         if(!queue.length){
  26.             result.nullify;
  27.             return result;
  28.         }
  29.         result = queue[0];
  30.         queue = queue[1..$];
  31.         return result;
  32.     }
  33. }
  34.  
  35.  
  36. synchronized class Flag {
  37.     private bool flag;
  38.     void set(bool flag){
  39.         this.flag = flag;
  40.     }
  41.     bool get(){
  42.         return flag;
  43.     }
  44. }
  45.  
  46.  
  47. auto pipe(Iterable, Fn)(Iterable iterable, Fn callback) if(isIterable!Iterable) {
  48.     shared class RootAction {
  49.         Queue!(Tuple!(ForeachType!Iterable)) results;
  50.         this(Iterable iterable){
  51.             results = new shared Queue!(Tuple!(ForeachType!Iterable));
  52.             foreach(e; iterable){
  53.                 results ~= tuple(e);
  54.             }
  55.         }
  56.         auto previous(){
  57.             return null;
  58.         }
  59.         bool has(){
  60.             return results.length > 0;
  61.         }
  62.         void call(){}
  63.     }
  64.     return pipe(new shared RootAction(iterable), callback);
  65. }
  66.  
  67. auto pipe(I, O)(I fnIn, O fnOut) if(!isIterable!I) {
  68.     shared class PipeAction {
  69.         Flag working;
  70.         O cb;
  71.         Queue!(ReturnType!O) results;
  72.         this(O cb){
  73.             this.cb = cb;
  74.             working = new Flag;
  75.             results = new shared Queue!(ReturnType!O);
  76.         }
  77.         auto previous(){
  78.             return fnIn;
  79.         }
  80.         bool has(){
  81.             return working.get || fnIn.results.length > 0 || fnIn.has;
  82.         }
  83.         void call(){
  84.             working.set(true);
  85.             auto element = fnIn.results.pop;
  86.             if(!element.isNull){
  87.                 results ~= cb(element.expand);
  88.             }
  89.             working.set(false);
  90.             fnIn.call;
  91.         }
  92.     }
  93.     return new shared PipeAction(fnOut);
  94. }
  95.  
  96. auto run(I)(I chain){
  97.     while(chain.has){
  98.         chain.call;
  99.     }
  100. }
  101.  
  102. auto parallel(I)(I action, int workerCount=0){
  103.     if(workerCount == 0){
  104.         workerCount = totalCPUs*2-1;
  105.     }
  106.     class Parallel {
  107.         Thread[] workers;
  108.         void join(){
  109.             foreach(w; workers)
  110.                 w.join;
  111.         }
  112.     }
  113.     auto p = new Parallel;
  114.     foreach(_; 0..workerCount){
  115.         p.workers ~= new Thread({ run(action); }).start();
  116.     }
  117.     return p;
  118. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement