1 import std.algorithm; 2 import rx; 3 4 auto source = new SubjectObject!int; 5 auto stopper = new SubjectObject!int; 6 7 int[] buf; 8 auto disposable = source.takeUntil(stopper).doSubscribe!((n) { buf ~= n; }); 9 10 source.put(0); 11 source.put(1); 12 source.put(2); 13 14 stopper.put(0); 15 16 source.put(3); 17 source.put(4); 18 19 assert(equal(buf, [0, 1, 2]));