1 /// Mechanism for asynchronously passing data from one place to another.
2 module fluid.future.pipe;
3 
4 import std.meta;
5 import std.traits;
6 import std.typecons;
7 
8 @safe:
9 
10 /// Set up a pipe from a delegate.
11 /// Params:
12 ///     dg = Function to transform the process data the pipe outputs.
13 /// Returns:
14 ///     A pipe that processes data using the function.
15 auto pipe(Ret, Args...)(Ret delegate(Args) @safe dg) {
16 
17     return new Pipe!(Ret, Args)(dg);
18 
19 }
20 
21 /// Pass plain data between pipes.
22 @("`then() can accept plain data")
23 unittest {
24 
25     import std.conv;
26 
27     string result;
28 
29     auto myPipe = pipe(() => 1);
30     myPipe
31         .then(number => number + 2)
32         .then(number => number.to!string)
33         .then(text => text ~ ".0")
34         .then(text => result = text);
35 
36     myPipe();
37     assert(result == "3.0");
38 
39 
40 }
41 
42 /// `then` will resolve pipes it returns.
43 @("`then` will resolve pipes it returns")
44 unittest {
45 
46     auto pipe1 = pipe({ });
47     auto pipe2 = pipe((int number) => 1 + number);
48 
49     int result;
50 
51     pipe1
52         .then(() => pipe2)
53         .then(value => result = value);
54 
55     pipe1();
56     assert(result == 0);
57     pipe2(10);
58     assert(result == 11);
59 
60 }
61 
62 /// Pipes can accept multiple arguments.
63 @("Pipes can accept multiple arguments")
64 unittest {
65 
66     int a, b, c;
67 
68     auto pipe = pipe((int newA, int newB, int newC) {
69         a = newA;
70         b = newB;
71         c = newC;
72     });
73     pipe(1, 2, 3);
74 
75     assert(a == 1);
76     assert(b == 2);
77     assert(c == 3);
78 
79 }
80 
81 /// Pipes provide a callback system where functions can be chained. The result of one callback can be passed
82 /// to another in a linear chain.
83 ///
84 /// Pipes make it possible to write callback-based code that shows the underlying sequence of events:
85 ///
86 /// ---
87 /// root.focusChild()
88 ///     .then(child => child.scrollIntoView())
89 ///     .then(a => node.flash(1.second))
90 /// ---
91 ///
92 /// In Fluid, they are most often used to operate on `TreeAction`. A tree action callback will fire when
93 /// the action finishes.
94 ///
95 /// This pattern resembles a [commandline pipelines](pipe), where a process "pipes" data into another.
96 /// It may be a bit similar to [JavaScript Promises][Promise], but unlike Promises, pipes may trigger
97 /// a callback multiple times, as opposed to just once.
98 ///
99 /// [pipeline]: https://en.wikipedia.org/wiki/Pipeline_(Unix)#Pipelines_in_command_line_interfaces
100 /// [Promise]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
101 final class Pipe(Return, Args...) : Subscriber!Args, Publisher!Return {
102 
103     alias Output = ToParameter!Return;
104     alias Input = Args;
105     alias Delegate = Return delegate(Input) @safe;
106 
107     private {
108 
109         Delegate callback;
110         Subscriber!Output next;
111 
112     }
113 
114     /// Set up a pipe, use a callback to process data that comes through.
115     this(Delegate callback) {
116         this.callback = callback;
117     }
118 
119     /// Subscribe to the data sent by this publisher. Only one subscriber can be assigned to a pipe at once.
120     ///
121     /// For high-level API, use `then`.
122     ///
123     /// Params:
124     ///     subscriber = Subscriber to register.
125     override void subscribe(Subscriber!Output subscriber)
126     in (this.next is null, "Pipe already has a subscriber. Cannot subscribe (then).")
127     do {
128         this.next = subscriber;
129     }
130 
131     /// Push data down the pipe.
132     /// Params:
133     ///     input = Data to load into the pipe.
134     override void opCall(Input input) {
135 
136         static if (is(Return == void)) {
137             callback(input);
138             if (next) next();
139         }
140 
141         else {
142             auto output = callback(input);
143             if (next) next(output);
144         }
145 
146     }
147 
148 }
149 
150 /// A publisher sends emits events that other objects can listen to.
151 alias Publisher(Output : void) = Publisher!();
152 
153 /// ditto
154 interface Publisher(Outputs...)
155 if (!is(Output == void)) {
156 
157     // Bug workaround: manually unwrap Outputs
158     static if (Outputs.length == 1)
159         alias Output = Outputs[0];
160     else
161         alias Output = Outputs;
162 
163     /// Low-level API to directly subscribe to the data sent by this publisher.
164     ///
165     /// Calling this multiple times is undefined behavior.
166     ///
167     /// Params:
168     ///     subscriber = Subscriber to register.
169     void subscribe(Subscriber!Output subscriber);
170 
171     /// Connect a listener to the publisher.
172     /// Params:
173     ///     listener = A function to execute when the publisher sends data. The listener can return another publisher
174     ///         which can then be accessed by proxy from the return value.
175     /// Returns:
176     ///     A pipe that is loaded with the same data that is returned by the `listener`.
177     auto then(T)(T delegate(Output) @safe next) {
178 
179         alias Publishers = AllPublishers!T;
180 
181         // Return value is a publisher
182         static if (Publishers.length != 0) {
183             //   this: Input  => Output
184             //   next: Output => T : Pipe!(NextOutput, NextInput...)
185             // return: Output => NextOutput
186             auto result = new MultiPublisher!Publishers;
187             subscribe(
188 
189                 // When this publisher receives data
190                 pipe((Output output) {
191 
192                     // Pass it to the listener
193                     auto publisher = next(output);
194 
195                     // And connect the returned publisher to the multipublisher
196                     static foreach (Publisher; Publishers) {
197                         (cast(Publisher) publisher)
198                             .subscribe(cast(SubscriberOf!Publisher) result);
199                     }
200                 })
201             );
202             return result;
203 
204         }
205 
206         // Plain return value
207         else {
208             auto pipe = new Pipe!(T, Output)(next);
209             subscribe(pipe);
210             return pipe;
211         }
212     }
213 
214 }
215 
216 /// A subscriber is an object that receives data from a `Publisher`.
217 interface Subscriber(Ts...) {
218 
219     void opCall(Ts args);
220 
221 }
222 
223 /// A basic publisher (and subscriber) implementation that will pipe data to subscribers of the matching type.
224 alias MultiPublisher(IPipes...) = MultiPublisherImpl!(staticMap!(AllPublishers, IPipes));
225 
226 /// Setting up a publisher that separately produces two different types.
227 @("Setting up a publisher that separately produces two different types.")
228 unittest {
229 
230     auto multi = new MultiPublisher!(Publisher!int, Publisher!string);
231 
232     int resultInt;
233     string resultString;
234     multi.then((int a) => resultInt = a);
235     multi.then((string a) => resultString = a);
236 
237     multi(1);
238     assert(resultInt == 1);
239     assert(resultString == "");
240 
241     multi("Hello!");
242     assert(resultInt == 1);
243     assert(resultString == "Hello!");
244 
245 }
246 
247 @("MultiPublisher can be returned from then()")
248 unittest {
249 
250     import std.stdio;
251 
252     auto multi = new MultiPublisher!(Publisher!int, Publisher!string);
253     auto start = pipe(() => 1);
254     auto chain = start.then(a => multi);
255 
256     int myInt;
257     string myString;
258 
259     chain.then((int a) => myInt = a);
260     chain.then((string a) => myString = a);
261 
262     start();
263     multi(1);
264     assert(myInt == 1);
265     multi("Hi!");
266     assert(myString == "Hi!");
267 
268 }
269 
270 class MultiPublisherImpl(IPipes...) : staticMap!(PublisherSubscriberPair, IPipes)
271 if (IPipes.length != 0) {
272 
273     // Tuple isn't strictly necessary here, but it fixes LDC builds
274     private Tuple!(staticMap!(SubscriberOf, IPipes)) subscribers;
275 
276     static foreach (i, IPipe; IPipes) {
277 
278         alias then = Publisher!(PipeContent!IPipe).then;
279 
280         void subscribe(Subscriber!(PipeContent!IPipe) subscriber)
281         in (subscribers[i] is null, "A subscriber for " ~ PipeContent!IPipe.stringof ~ " was already registered.")
282         do {
283             subscribers[i] = subscriber;
284         }
285 
286         void opCall(PipeContent!IPipe content) {
287             if (subscribers[i]) {
288                 subscribers[i](content);
289             }
290         }
291 
292     }
293 
294     override string toString() const {
295 
296         import std.conv;
297         return text("MultiPublisher!", IPipes.stringof);
298 
299     }
300 
301 }
302 
303 private alias SubscriberOf(T) = Subscriber!(PipeContent!T);
304 
305 /// List all publishers implemented by the given type (including, if the given type is a publisher).
306 alias AllPublishers(T) = Filter!(isPublisher, InterfacesTuple!T, Filter!(isInterface, T));
307 
308 /// List all subscribers implemented by the given type (including, if the given type is a publisher).
309 alias AllSubscribers(T) = Filter!(isSubscriber, InterfacesTuple!T, Filter!(isInterface, T));
310 
311 /// Check if the given type is a subscriber.
312 enum isSubscriber(T) = is(T : Subscriber!Ts, Ts...);
313 
314 /// Check if the given type is a publisher.
315 enum isPublisher(T) = is(T : Publisher!Ts, Ts...);
316 
317 private enum isInterface(T) = is(T == interface);
318 
319 /// For an instance of either `Publisher` or `Subscriber`, get the type trasmitted by the interface. This function
320 /// only operates on the two interfaces directly, and will not work with subclasses.
321 template PipeContent(T) {
322 
323     // Publisher
324     static if (is(T == Publisher!Ts, Ts...)) {
325         alias PipeContent = Ts;
326     }
327 
328     // Subscriber
329     else static if (is(T == Subscriber!Ts, Ts...)) {
330         alias PipeContent = Ts;
331     }
332 
333     // Neither
334     else static assert(false, T.stringof ~ " is not a subscriber nor a publisher");
335 
336 }
337 
338 private template PublisherSubscriberPair(T) {
339 
340     // Publisher
341     static if (is(T == Publisher!Ts, Ts...)) {
342         alias PublisherSubscriberPair = AliasSeq!(Publisher!Ts, Subscriber!Ts);
343     }
344 
345     // Subscriber
346     else static if (is(T == Subscriber!Ts, Ts...)) {
347         alias PublisherSubscriberPair = AliasSeq!(Publisher!Ts, Subscriber!Ts);
348     }
349 
350     // Neither
351     else static assert(false, T.stringof ~ " is not a subscriber nor a publisher");
352 
353 }
354 
355 /// Converts `void` to `()` (an empty tuple), leaves remaining types unchanged.
356 template ToParameter(T) {
357     static if (is(T == void)) {
358         alias ToParameter = AliasSeq!();
359     }
360     else {
361         alias ToParameter = T;
362     }
363 }
364 
365 struct Event(T...) {
366 
367     import std.array;
368 
369     private Appender!(Subscriber!T[]) subscribers;
370 
371     size_t length() const {
372         return subscribers[].length;
373     }
374 
375     void clearSubscribers() {
376         subscribers.clear();
377     }
378 
379     void subscribe(Subscriber!T subscriber) {
380         this.subscribers ~= subscriber;
381     }
382 
383     void opOpAssign(string op : "~")(Subscriber!T subscriber) {
384         this.subscribers ~= subscriber;
385     }
386 
387     void opOpAssign(string op : "~")(Subscriber!T[] subscribers) {
388         this.subscribers ~= subscribers;
389     }
390 
391     void opCall(T arguments) {
392         foreach (subscriber; subscribers[]) {
393             subscriber(arguments);
394         }
395     }
396 
397     string toString() const {
398 
399         import std.conv;
400         return text("Event!", T.stringof, "(", length, " events)");
401 
402     }
403 
404 }
405 
406 /// Get the Publisher interfaces that can output a value that shares a common type with `Inputs`.
407 template PublisherType(Publisher, Inputs...) {
408 
409     alias Result = AliasSeq!();
410 
411     // Check each publisher
412     static foreach (P; AllPublishers!Publisher) {
413 
414         // See if its type can cast to the inputs
415         static if (is(Inputs : PipeContent!P) || is(PipeContent!P : Inputs)) {
416             Result = AliasSeq!(Result, P);
417         }
418 
419     }
420 
421     alias PublisherType = Result;
422 
423 }
424 
425 /// Connect to a publisher and assert the values it sends equal the one attached.
426 AssertPipe!(PipeContent!(PublisherType!(T, Inputs)[0])) thenAssertEquals(T, Inputs...)(T publisher, Inputs value,
427     string file = __FILE__, size_t lineNumber = __LINE__)
428 if (PublisherType!(T, Inputs).length != 0) {
429 
430     auto pipe = new typeof(return)(value, file, lineNumber);
431     publisher.subscribe(pipe);
432     return pipe;
433 
434 }
435 
436 class AssertPipe(Ts...) : Subscriber!Ts, Publisher!(), Publisher!Ts
437 if (Ts.length != 0) {
438 
439     public {
440 
441         /// Value this pipe expects to receive.
442         Tuple!Ts expected;
443         string file;
444         size_t lineNumber;
445 
446     }
447 
448     private {
449 
450         Event!() _eventEmpty;
451         Event!Ts _event;
452 
453     }
454 
455     this(Ts expected, string file = __FILE__, size_t lineNumber = __LINE__) {
456         this.expected = expected;
457         this.file = file;
458         this.lineNumber = lineNumber;
459     }
460 
461     override void subscribe(Subscriber!() subscriber) {
462         _eventEmpty ~= subscriber;
463     }
464 
465     override void subscribe(Subscriber!Ts subscriber) {
466         _event ~= subscriber;
467     }
468 
469     override void opCall(Ts received) {
470 
471         import std.conv;
472         import std.exception;
473         import core.exception;
474         import fluid.node;
475 
476         // Direct comparison for nodes to ensure safety on older compilers
477         static if (is(Ts == AliasSeq!Node)) {
478             const bothNull = expected[0] is null && received[0] is null;
479             enforce!AssertError(bothNull || expected[0].opEquals(received),
480                 text("Expected ", expected.expand, ", but received ", received),
481                 file,
482                 lineNumber);
483         }
484         else {
485             enforce!AssertError(expected == tuple(received),
486                 text("Expected ", expected.expand, ", but received ", received),
487                 file,
488                 lineNumber);
489         }
490 
491         _event(received);
492         _eventEmpty();
493 
494     }
495 
496 
497 }
498 
499 /// Combine multiple publishers to create one that waits until all of them are completed.
500 ///
501 /// ---
502 /// auto one = pipe({
503 ///     writeln("One finished");
504 /// });
505 /// auto two = pipe({
506 ///     writeln("Two finished");
507 /// });
508 /// join(one, two).then({
509 ///     writeln("Both done");
510 /// });
511 /// one();  // One finished
512 /// two();  // Two finished
513 /// // Both done
514 /// ---
515 JoinPublisher join(Publisher!()[] publishers...) {
516 
517     return new JoinPublisher(publishers);
518 
519 }
520 
521 @("JoinPublisher emits an event when all tasks finish")
522 unittest {
523 
524     int oneFinished;
525     int twoFinished;
526     int joinFinished;
527 
528     auto one = pipe({
529         oneFinished++;
530     });
531     auto two = pipe({
532         twoFinished++;
533     });
534     join(one, two).then({
535         joinFinished++;
536     });
537     one();
538     assert(oneFinished == 1);
539     assert(joinFinished == 0);
540     two();
541     assert(twoFinished == 1);
542     assert(joinFinished == 1);
543 
544 }
545 
546 /// Listens to a number of publishers, and as soon as all of them emit a result at least once, emits an event.
547 ///
548 /// Only emits events once; if a publisher emits multiple events, only the first one is used.
549 /// Does not publish any values on finish.
550 class JoinPublisher : Publisher!() {
551 
552     private {
553 
554         Event!() _subscribers;
555         bool[] _done;
556         size_t _tasksLeft;
557 
558     }
559 
560     this(Publisher!()[] publishers...) {
561 
562         this._done = new bool[publishers.length];
563         this._tasksLeft = publishers.length;
564 
565         foreach (i, publisher; publishers) {
566             add(i, publisher);
567         }
568 
569     }
570 
571     void subscribe(Subscriber!() subscriber) {
572         _subscribers ~= subscriber;
573     }
574 
575     /// Assign a publisher to listen to by index.
576     private void add(size_t i, Publisher!() publisher) {
577         publisher.then({
578             emit(i);
579         });
580     }
581 
582     /// Mark a task as finished using its index.
583     ///
584     /// Does nothing if the task has already finished. Counts the number of tasks that remains to be completed,
585     /// and if all have, emits an event.
586     private void emit(size_t i) {
587 
588         if (_done[i]) return;
589 
590         _done[i] = true;
591         _tasksLeft--;
592 
593         if (_tasksLeft == 0) {
594             _subscribers();
595         }
596 
597     }
598 
599 }