1 import rx.algorithm.groupby : groupBy; 2 import rx.algorithm.map : map; 3 import rx.algorithm.fold : fold; 4 import rx.subject : SubjectObject, CounterObserver; 5 6 auto subject = new SubjectObject!int; 7 auto counted = subject.groupBy!(n => n % 10).map!(o => o.fold!((a, b) => a + 1)(0)).merge(); 8 9 auto counter = new CounterObserver!int; 10 11 auto disposable = counted.subscribe(counter); 12 13 subject.put(0); 14 subject.put(0); 15 assert(counter.putCount == 0); 16 subject.completed(); 17 assert(counter.putCount == 1); 18 assert(counter.lastValue == 2);