Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module pipes;
- import
- core.thread,
- std.parallelism,
- std.concurrency,
- std.functional,
- std.typecons,
- std.traits;
- synchronized class Queue(T) {
- private T[] queue;
- void opOpAssign(string op)(T object) if(op == "~") {
- queue ~= object;
- }
- auto length(){
- return queue.length;
- }
- Nullable!T pop(){
- Nullable!T result;
- if(!queue.length){
- result.nullify;
- return result;
- }
- result = queue[0];
- queue = queue[1..$];
- return result;
- }
- }
- synchronized class Flag {
- private bool flag;
- void set(bool flag){
- this.flag = flag;
- }
- bool get(){
- return flag;
- }
- }
- auto pipe(Iterable, Fn)(Iterable iterable, Fn callback) if(isIterable!Iterable) {
- shared class RootAction {
- Queue!(Tuple!(ForeachType!Iterable)) results;
- this(Iterable iterable){
- results = new shared Queue!(Tuple!(ForeachType!Iterable));
- foreach(e; iterable){
- results ~= tuple(e);
- }
- }
- auto previous(){
- return null;
- }
- bool has(){
- return results.length > 0;
- }
- void call(){}
- }
- return pipe(new shared RootAction(iterable), callback);
- }
- auto pipe(I, O)(I fnIn, O fnOut) if(!isIterable!I) {
- shared class PipeAction {
- Flag working;
- O cb;
- Queue!(ReturnType!O) results;
- this(O cb){
- this.cb = cb;
- working = new Flag;
- results = new shared Queue!(ReturnType!O);
- }
- auto previous(){
- return fnIn;
- }
- bool has(){
- return working.get || fnIn.results.length > 0 || fnIn.has;
- }
- void call(){
- working.set(true);
- auto element = fnIn.results.pop;
- if(!element.isNull){
- results ~= cb(element.expand);
- }
- working.set(false);
- fnIn.call;
- }
- }
- return new shared PipeAction(fnOut);
- }
- auto run(I)(I chain){
- while(chain.has){
- chain.call;
- }
- }
- auto parallel(I)(I action, int workerCount=0){
- if(workerCount == 0){
- workerCount = totalCPUs*2-1;
- }
- class Parallel {
- Thread[] workers;
- void join(){
- foreach(w; workers)
- w.join;
- }
- }
- auto p = new Parallel;
- foreach(_; 0..workerCount){
- p.workers ~= new Thread({ run(action); }).start();
- }
- return p;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement