1 /// Mechanism for asynchronously passing data from one place to another. 2 module fluid.future.pipe; 3 4 import std.meta; 5 import std.traits; 6 import std.typecons; 7 8 @safe: 9 10 /// Set up a pipe from a delegate. 11 /// Params: 12 /// dg = Function to transform the process data the pipe outputs. 13 /// Returns: 14 /// A pipe that processes data using the function. 15 auto pipe(Ret, Args...)(Ret delegate(Args) @safe dg) { 16 17 return new Pipe!(Ret, Args)(dg); 18 19 } 20 21 /// Pass plain data between pipes. 22 @("`then() can accept plain data") 23 unittest { 24 25 import std.conv; 26 27 string result; 28 29 auto myPipe = pipe(() => 1); 30 myPipe 31 .then(number => number + 2) 32 .then(number => number.to!string) 33 .then(text => text ~ ".0") 34 .then(text => result = text); 35 36 myPipe(); 37 assert(result == "3.0"); 38 39 40 } 41 42 /// `then` will resolve pipes it returns. 43 @("`then` will resolve pipes it returns") 44 unittest { 45 46 auto pipe1 = pipe({ }); 47 auto pipe2 = pipe((int number) => 1 + number); 48 49 int result; 50 51 pipe1 52 .then(() => pipe2) 53 .then(value => result = value); 54 55 pipe1(); 56 assert(result == 0); 57 pipe2(10); 58 assert(result == 11); 59 60 } 61 62 /// Pipes can accept multiple arguments. 63 @("Pipes can accept multiple arguments") 64 unittest { 65 66 int a, b, c; 67 68 auto pipe = pipe((int newA, int newB, int newC) { 69 a = newA; 70 b = newB; 71 c = newC; 72 }); 73 pipe(1, 2, 3); 74 75 assert(a == 1); 76 assert(b == 2); 77 assert(c == 3); 78 79 } 80 81 /// Pipes provide a callback system where functions can be chained. The result of one callback can be passed 82 /// to another in a linear chain. 83 /// 84 /// Pipes make it possible to write callback-based code that shows the underlying sequence of events: 85 /// 86 /// --- 87 /// root.focusChild() 88 /// .then(child => child.scrollIntoView()) 89 /// .then(a => node.flash(1.second)) 90 /// --- 91 /// 92 /// In Fluid, they are most often used to operate on `TreeAction`. A tree action callback will fire when 93 /// the action finishes. 94 /// 95 /// This pattern resembles a [commandline pipelines](pipe), where a process "pipes" data into another. 96 /// It may be a bit similar to [JavaScript Promises][Promise], but unlike Promises, pipes may trigger 97 /// a callback multiple times, as opposed to just once. 98 /// 99 /// [pipeline]: https://en.wikipedia.org/wiki/Pipeline_(Unix)#Pipelines_in_command_line_interfaces 100 /// [Promise]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise 101 final class Pipe(Return, Args...) : Subscriber!Args, Publisher!Return { 102 103 alias Output = ToParameter!Return; 104 alias Input = Args; 105 alias Delegate = Return delegate(Input) @safe; 106 107 private { 108 109 Delegate callback; 110 Subscriber!Output next; 111 112 } 113 114 /// Set up a pipe, use a callback to process data that comes through. 115 this(Delegate callback) { 116 this.callback = callback; 117 } 118 119 /// Subscribe to the data sent by this publisher. Only one subscriber can be assigned to a pipe at once. 120 /// 121 /// For high-level API, use `then`. 122 /// 123 /// Params: 124 /// subscriber = Subscriber to register. 125 override void subscribe(Subscriber!Output subscriber) 126 in (this.next is null, "Pipe already has a subscriber. Cannot subscribe (then).") 127 do { 128 this.next = subscriber; 129 } 130 131 /// Push data down the pipe. 132 /// Params: 133 /// input = Data to load into the pipe. 134 override void opCall(Input input) { 135 136 static if (is(Return == void)) { 137 callback(input); 138 if (next) next(); 139 } 140 141 else { 142 auto output = callback(input); 143 if (next) next(output); 144 } 145 146 } 147 148 } 149 150 /// A publisher sends emits events that other objects can listen to. 151 alias Publisher(Output : void) = Publisher!(); 152 153 /// ditto 154 interface Publisher(Outputs...) 155 if (!is(Output == void)) { 156 157 // Bug workaround: manually unwrap Outputs 158 static if (Outputs.length == 1) 159 alias Output = Outputs[0]; 160 else 161 alias Output = Outputs; 162 163 /// Low-level API to directly subscribe to the data sent by this publisher. 164 /// 165 /// Calling this multiple times is undefined behavior. 166 /// 167 /// Params: 168 /// subscriber = Subscriber to register. 169 void subscribe(Subscriber!Output subscriber); 170 171 /// Connect a listener to the publisher. 172 /// Params: 173 /// listener = A function to execute when the publisher sends data. The listener can return another publisher 174 /// which can then be accessed by proxy from the return value. 175 /// Returns: 176 /// A pipe that is loaded with the same data that is returned by the `listener`. 177 auto then(T)(T delegate(Output) @safe next) { 178 179 alias Publishers = AllPublishers!T; 180 181 // Return value is a publisher 182 static if (Publishers.length != 0) { 183 // this: Input => Output 184 // next: Output => T : Pipe!(NextOutput, NextInput...) 185 // return: Output => NextOutput 186 auto result = new MultiPublisher!Publishers; 187 subscribe( 188 189 // When this publisher receives data 190 pipe((Output output) { 191 192 // Pass it to the listener 193 auto publisher = next(output); 194 195 // And connect the returned publisher to the multipublisher 196 static foreach (Publisher; Publishers) { 197 (cast(Publisher) publisher) 198 .subscribe(cast(SubscriberOf!Publisher) result); 199 } 200 }) 201 ); 202 return result; 203 204 } 205 206 // Plain return value 207 else { 208 auto pipe = new Pipe!(T, Output)(next); 209 subscribe(pipe); 210 return pipe; 211 } 212 } 213 214 } 215 216 /// A subscriber is an object that receives data from a `Publisher`. 217 interface Subscriber(Ts...) { 218 219 void opCall(Ts args); 220 221 } 222 223 /// A basic publisher (and subscriber) implementation that will pipe data to subscribers of the matching type. 224 alias MultiPublisher(IPipes...) = MultiPublisherImpl!(staticMap!(AllPublishers, IPipes)); 225 226 /// Setting up a publisher that separately produces two different types. 227 @("Setting up a publisher that separately produces two different types.") 228 unittest { 229 230 auto multi = new MultiPublisher!(Publisher!int, Publisher!string); 231 232 int resultInt; 233 string resultString; 234 multi.then((int a) => resultInt = a); 235 multi.then((string a) => resultString = a); 236 237 multi(1); 238 assert(resultInt == 1); 239 assert(resultString == ""); 240 241 multi("Hello!"); 242 assert(resultInt == 1); 243 assert(resultString == "Hello!"); 244 245 } 246 247 @("MultiPublisher can be returned from then()") 248 unittest { 249 250 import std.stdio; 251 252 auto multi = new MultiPublisher!(Publisher!int, Publisher!string); 253 auto start = pipe(() => 1); 254 auto chain = start.then(a => multi); 255 256 int myInt; 257 string myString; 258 259 chain.then((int a) => myInt = a); 260 chain.then((string a) => myString = a); 261 262 start(); 263 multi(1); 264 assert(myInt == 1); 265 multi("Hi!"); 266 assert(myString == "Hi!"); 267 268 } 269 270 class MultiPublisherImpl(IPipes...) : staticMap!(PublisherSubscriberPair, IPipes) 271 if (IPipes.length != 0) { 272 273 // Tuple isn't strictly necessary here, but it fixes LDC builds 274 private Tuple!(staticMap!(SubscriberOf, IPipes)) subscribers; 275 276 static foreach (i, IPipe; IPipes) { 277 278 alias then = Publisher!(PipeContent!IPipe).then; 279 280 void subscribe(Subscriber!(PipeContent!IPipe) subscriber) 281 in (subscribers[i] is null, "A subscriber for " ~ PipeContent!IPipe.stringof ~ " was already registered.") 282 do { 283 subscribers[i] = subscriber; 284 } 285 286 void opCall(PipeContent!IPipe content) { 287 if (subscribers[i]) { 288 subscribers[i](content); 289 } 290 } 291 292 } 293 294 override string toString() const { 295 296 import std.conv; 297 return text("MultiPublisher!", IPipes.stringof); 298 299 } 300 301 } 302 303 private alias SubscriberOf(T) = Subscriber!(PipeContent!T); 304 305 /// List all publishers implemented by the given type (including, if the given type is a publisher). 306 alias AllPublishers(T) = Filter!(isPublisher, InterfacesTuple!T, Filter!(isInterface, T)); 307 308 /// List all subscribers implemented by the given type (including, if the given type is a publisher). 309 alias AllSubscribers(T) = Filter!(isSubscriber, InterfacesTuple!T, Filter!(isInterface, T)); 310 311 /// Check if the given type is a subscriber. 312 enum isSubscriber(T) = is(T : Subscriber!Ts, Ts...); 313 314 /// Check if the given type is a publisher. 315 enum isPublisher(T) = is(T : Publisher!Ts, Ts...); 316 317 private enum isInterface(T) = is(T == interface); 318 319 /// For an instance of either `Publisher` or `Subscriber`, get the type trasmitted by the interface. This function 320 /// only operates on the two interfaces directly, and will not work with subclasses. 321 template PipeContent(T) { 322 323 // Publisher 324 static if (is(T == Publisher!Ts, Ts...)) { 325 alias PipeContent = Ts; 326 } 327 328 // Subscriber 329 else static if (is(T == Subscriber!Ts, Ts...)) { 330 alias PipeContent = Ts; 331 } 332 333 // Neither 334 else static assert(false, T.stringof ~ " is not a subscriber nor a publisher"); 335 336 } 337 338 private template PublisherSubscriberPair(T) { 339 340 // Publisher 341 static if (is(T == Publisher!Ts, Ts...)) { 342 alias PublisherSubscriberPair = AliasSeq!(Publisher!Ts, Subscriber!Ts); 343 } 344 345 // Subscriber 346 else static if (is(T == Subscriber!Ts, Ts...)) { 347 alias PublisherSubscriberPair = AliasSeq!(Publisher!Ts, Subscriber!Ts); 348 } 349 350 // Neither 351 else static assert(false, T.stringof ~ " is not a subscriber nor a publisher"); 352 353 } 354 355 /// Converts `void` to `()` (an empty tuple), leaves remaining types unchanged. 356 template ToParameter(T) { 357 static if (is(T == void)) { 358 alias ToParameter = AliasSeq!(); 359 } 360 else { 361 alias ToParameter = T; 362 } 363 } 364 365 struct Event(T...) { 366 367 import std.array; 368 369 private Appender!(Subscriber!T[]) subscribers; 370 371 size_t length() const { 372 return subscribers[].length; 373 } 374 375 void clearSubscribers() { 376 subscribers.clear(); 377 } 378 379 void subscribe(Subscriber!T subscriber) { 380 this.subscribers ~= subscriber; 381 } 382 383 void opOpAssign(string op : "~")(Subscriber!T subscriber) { 384 this.subscribers ~= subscriber; 385 } 386 387 void opOpAssign(string op : "~")(Subscriber!T[] subscribers) { 388 this.subscribers ~= subscribers; 389 } 390 391 void opCall(T arguments) { 392 foreach (subscriber; subscribers[]) { 393 subscriber(arguments); 394 } 395 } 396 397 string toString() const { 398 399 import std.conv; 400 return text("Event!", T.stringof, "(", length, " events)"); 401 402 } 403 404 } 405 406 /// Get the Publisher interfaces that can output a value that shares a common type with `Inputs`. 407 template PublisherType(Publisher, Inputs...) { 408 409 alias Result = AliasSeq!(); 410 411 // Check each publisher 412 static foreach (P; AllPublishers!Publisher) { 413 414 // See if its type can cast to the inputs 415 static if (is(Inputs : PipeContent!P) || is(PipeContent!P : Inputs)) { 416 Result = AliasSeq!(Result, P); 417 } 418 419 } 420 421 alias PublisherType = Result; 422 423 } 424 425 /// Connect to a publisher and assert the values it sends equal the one attached. 426 AssertPipe!(PipeContent!(PublisherType!(T, Inputs)[0])) thenAssertEquals(T, Inputs...)(T publisher, Inputs value, 427 string file = __FILE__, size_t lineNumber = __LINE__) 428 if (PublisherType!(T, Inputs).length != 0) { 429 430 auto pipe = new typeof(return)(value, file, lineNumber); 431 publisher.subscribe(pipe); 432 return pipe; 433 434 } 435 436 class AssertPipe(Ts...) : Subscriber!Ts, Publisher!(), Publisher!Ts 437 if (Ts.length != 0) { 438 439 public { 440 441 /// Value this pipe expects to receive. 442 Tuple!Ts expected; 443 string file; 444 size_t lineNumber; 445 446 } 447 448 private { 449 450 Event!() _eventEmpty; 451 Event!Ts _event; 452 453 } 454 455 this(Ts expected, string file = __FILE__, size_t lineNumber = __LINE__) { 456 this.expected = expected; 457 this.file = file; 458 this.lineNumber = lineNumber; 459 } 460 461 override void subscribe(Subscriber!() subscriber) { 462 _eventEmpty ~= subscriber; 463 } 464 465 override void subscribe(Subscriber!Ts subscriber) { 466 _event ~= subscriber; 467 } 468 469 override void opCall(Ts received) { 470 471 import std.conv; 472 import std.exception; 473 import core.exception; 474 import fluid.node; 475 476 // Direct comparison for nodes to ensure safety on older compilers 477 static if (is(Ts == AliasSeq!Node)) { 478 const bothNull = expected[0] is null && received[0] is null; 479 enforce!AssertError(bothNull || expected[0].opEquals(received), 480 text("Expected ", expected.expand, ", but received ", received), 481 file, 482 lineNumber); 483 } 484 else { 485 enforce!AssertError(expected == tuple(received), 486 text("Expected ", expected.expand, ", but received ", received), 487 file, 488 lineNumber); 489 } 490 491 _event(received); 492 _eventEmpty(); 493 494 } 495 496 497 } 498 499 /// Combine multiple publishers to create one that waits until all of them are completed. 500 /// 501 /// --- 502 /// auto one = pipe({ 503 /// writeln("One finished"); 504 /// }); 505 /// auto two = pipe({ 506 /// writeln("Two finished"); 507 /// }); 508 /// join(one, two).then({ 509 /// writeln("Both done"); 510 /// }); 511 /// one(); // One finished 512 /// two(); // Two finished 513 /// // Both done 514 /// --- 515 JoinPublisher join(Publisher!()[] publishers...) { 516 517 return new JoinPublisher(publishers); 518 519 } 520 521 @("JoinPublisher emits an event when all tasks finish") 522 unittest { 523 524 int oneFinished; 525 int twoFinished; 526 int joinFinished; 527 528 auto one = pipe({ 529 oneFinished++; 530 }); 531 auto two = pipe({ 532 twoFinished++; 533 }); 534 join(one, two).then({ 535 joinFinished++; 536 }); 537 one(); 538 assert(oneFinished == 1); 539 assert(joinFinished == 0); 540 two(); 541 assert(twoFinished == 1); 542 assert(joinFinished == 1); 543 544 } 545 546 /// Listens to a number of publishers, and as soon as all of them emit a result at least once, emits an event. 547 /// 548 /// Only emits events once; if a publisher emits multiple events, only the first one is used. 549 /// Does not publish any values on finish. 550 class JoinPublisher : Publisher!() { 551 552 private { 553 554 Event!() _subscribers; 555 bool[] _done; 556 size_t _tasksLeft; 557 558 } 559 560 this(Publisher!()[] publishers...) { 561 562 this._done = new bool[publishers.length]; 563 this._tasksLeft = publishers.length; 564 565 foreach (i, publisher; publishers) { 566 add(i, publisher); 567 } 568 569 } 570 571 void subscribe(Subscriber!() subscriber) { 572 _subscribers ~= subscriber; 573 } 574 575 /// Assign a publisher to listen to by index. 576 private void add(size_t i, Publisher!() publisher) { 577 publisher.then({ 578 emit(i); 579 }); 580 } 581 582 /// Mark a task as finished using its index. 583 /// 584 /// Does nothing if the task has already finished. Counts the number of tasks that remains to be completed, 585 /// and if all have, emits an event. 586 private void emit(size_t i) { 587 588 if (_done[i]) return; 589 590 _done[i] = true; 591 _tasksLeft--; 592 593 if (_tasksLeft == 0) { 594 _subscribers(); 595 } 596 597 } 598 599 }