1 import rx.subject : SubjectObject;
2 import std.array : appender;
3
4 auto sub = new SubjectObject!int;
5 auto buf = appender!(int[]);
6
7 auto d = sub.buffered(2).doSubscribe(buf);
8
9 sub.put(0);
10 sub.put(1);
11 assert(buf.data.length == 2);
12 assert(buf.data[0] == 0);
13 assert(buf.data[1] == 1);
14 sub.put(2);
15 assert(buf.data.length == 2);
16 sub.completed();
17 assert(buf.data.length == 3);
18 assert(buf.data[2] == 2);
1 import rx.subject : SubjectObject;
2 import std.array : appender;
3 import std.parallelism : taskPool, task;
4
5 auto sub = new SubjectObject!int;
6 auto buf = appender!(int[]);
7 auto d = sub.buffered(100).doSubscribe(buf);
8
9 import std.range : iota;
10
11 auto t1 = task({ .put(sub, iota(100)); });
12 auto t2 = task({ .put(sub, iota(100)); });
13 auto t3 = task({ .put(sub, iota(100)); });
14 taskPool.put(t1);
15 taskPool.put(t2);
16 taskPool.put(t3);
17
18 t1.workForce;
19 t2.workForce;
20 t3.workForce;
21
22 sub.completed();
23
24 assert(buf.data.length == 300);