1 // use simple
2 import rx;
3
4 auto s0 = new SubjectObject!int;
5 auto s1 = new SubjectObject!int;
6
7 auto zipped = zip(s0, s1);
8
9 int[] buf;
10 auto disposable = zipped.doSubscribe!(t => buf ~= (t[0] * t[1]));
11 scope (exit)
12 disposable.dispose();
13
14 .put(s0, [0, 1, 2, 3]);
15 assert(buf.length == 0);
16
17 .put(s1, 0);
18 assert(buf == [0]);
19 .put(s1, 1);
20 assert(buf == [0, 1]);
21 .put(s1, 2);
22 assert(buf == [0, 1, 4]);
23 .put(s1, 3);
24 assert(buf == [0, 1, 4, 9]);
1 // call completed
2 import rx;
3 import std.typecons;
4
5 auto s0 = new SubjectObject!int;
6 auto s1 = new SubjectObject!int;
7 auto s2 = new SubjectObject!int;
8
9 auto observer = new CounterObserver!(Tuple!(int, int, int));
10 auto disposable = zip(s0, s1, s2).doSubscribe(observer);
11 scope (exit)
12 disposable.dispose();
13
14 .put(s0, 100);
15 .put(s1, 10);
16 .put(s2, 1);
17 assert(observer.putCount == 1);
18 assert(observer.lastValue == tuple(100, 10, 1));
19
20 s0.completed();
21 assert(observer.completedCount == 0);
22 s1.completed();
23 assert(observer.completedCount == 0);
24 s2.completed();
25 assert(observer.completedCount == 1);
1 // use selector
2 import rx;
3
4 auto s0 = new SubjectObject!int;
5 auto s1 = new SubjectObject!int;
6
7 int[] buf;
8 auto disposable = zip!((a, b) => a + b)(s0, s1).doSubscribe!(n => buf ~= n);
9 scope (exit)
10 disposable.dispose();
11
12 .put(s0, 100);
13 .put(s0, 200);
14 .put(s1, 10);
15 .put(s1, 20);
16
17 assert(buf == [110, 220]);