11package rx .javafx .sources ;
22
33import javafx .application .Platform ;
4+ import javafx .beans .property .Property ;
5+ import javafx .beans .property .ReadOnlyStringWrapper ;
6+ import javafx .beans .property .SimpleObjectProperty ;
47import javafx .collections .FXCollections ;
58import javafx .collections .ObservableList ;
69import javafx .embed .swing .JFXPanel ;
@@ -76,4 +79,186 @@ public void testRxObservableListRemoves() {
7679 e .printStackTrace ();
7780 }
7881 }
82+
83+ @ Test
84+ public void testRxObservableListChanges () {
85+ new JFXPanel ();
86+
87+ ObservableList <String > sourceList = FXCollections .observableArrayList ();
88+ Observable <ListChange <String >> emissions = JavaFxObservable .fromObservableListChanges (sourceList );
89+
90+ CountDownLatch gate = new CountDownLatch (1 );
91+
92+ class FlagAndCount {
93+ final Flag flag ;
94+ final int count ;
95+ FlagAndCount (Flag flag , int count ) {
96+ this .flag = flag ;
97+ this .count = count ;
98+ }
99+
100+ }
101+ emissions .observeOn (Schedulers .io ())
102+ .observeOn (JavaFxScheduler .getInstance ())
103+ .take (5 )
104+ .groupBy (ListChange ::getFlag )
105+ .flatMap (grp -> grp .count ().map (ct -> new FlagAndCount (grp .getKey (),ct )))
106+ .subscribe (l -> {
107+ if (l .flag .equals (Flag .ADDED )) { assertTrue (l .count == 3 ); }
108+ if (l .flag .equals (Flag .REMOVED )) { assertTrue (l .count == 2 ); }
109+ },Throwable ::printStackTrace ,gate ::countDown );
110+
111+ Platform .runLater (() -> {
112+ sourceList .add ("Alpha" );
113+ sourceList .add ("Beta" );
114+ sourceList .remove ("Alpha" );
115+ sourceList .add ("Gamma" );
116+ sourceList .remove ("Gamma" );
117+ });
118+
119+ try {
120+ gate .await ();
121+ } catch (InterruptedException e ) {
122+ e .printStackTrace ();
123+ }
124+ }
125+
126+
127+ @ Test
128+ public void testRxObservableListDistinctChangeMappings () {
129+ new JFXPanel ();
130+
131+ ObservableList <String > sourceList = FXCollections .observableArrayList ();
132+ Observable <ListChange <Integer >> emissions = JavaFxObservable .fromObservableListDistinctMappings (sourceList , String ::length );
133+
134+ CountDownLatch gate = new CountDownLatch (1 );
135+
136+ class FlagAndCount {
137+ final Flag flag ;
138+ final int count ;
139+ FlagAndCount (Flag flag , int count ) {
140+ this .flag = flag ;
141+ this .count = count ;
142+ }
143+
144+ }
145+ emissions .observeOn (Schedulers .io ())
146+ .observeOn (JavaFxScheduler .getInstance ())
147+ .take (3 )
148+ .groupBy (ListChange ::getFlag )
149+ .flatMap (grp -> grp .count ().map (ct -> new FlagAndCount (grp .getKey (),ct )))
150+ .subscribe (l -> {
151+ if (l .flag .equals (Flag .ADDED )) { assertTrue (l .count == 2 ); }
152+ if (l .flag .equals (Flag .REMOVED )) { assertTrue (l .count == 1 ); }
153+ },Throwable ::printStackTrace ,gate ::countDown );
154+
155+ Platform .runLater (() -> {
156+ sourceList .add ("Alpha" );
157+ sourceList .add ("Beta" );
158+ sourceList .add ("Alpha" );
159+ sourceList .remove ("Alpha" );
160+ sourceList .add ("Gamma" );
161+ sourceList .remove ("Gamma" );
162+ sourceList .remove ("Alpha" );
163+ });
164+
165+ try {
166+ gate .await ();
167+ } catch (InterruptedException e ) {
168+ e .printStackTrace ();
169+ }
170+ }
171+
172+
173+ @ Test
174+ public void testRxObservableListDistinctChanges () {
175+ new JFXPanel ();
176+
177+ ObservableList <String > sourceList = FXCollections .observableArrayList ();
178+ Observable <ListChange <String >> emissions = JavaFxObservable .fromObservableListDistinctChanges (sourceList );
179+
180+ CountDownLatch gate = new CountDownLatch (1 );
181+
182+ class FlagAndCount {
183+ final Flag flag ;
184+ final int count ;
185+ FlagAndCount (Flag flag , int count ) {
186+ this .flag = flag ;
187+ this .count = count ;
188+ }
189+
190+ }
191+ emissions .observeOn (Schedulers .io ())
192+ .observeOn (JavaFxScheduler .getInstance ())
193+ .take (5 )
194+ .groupBy (ListChange ::getFlag )
195+ .flatMap (grp -> grp .count ().map (ct -> new FlagAndCount (grp .getKey (),ct )))
196+ .subscribe (l -> {
197+ if (l .flag .equals (Flag .ADDED )) { assertTrue (l .count == 3 ); }
198+ if (l .flag .equals (Flag .REMOVED )) { assertTrue (l .count == 2 ); }
199+ },Throwable ::printStackTrace ,gate ::countDown );
200+
201+ Platform .runLater (() -> {
202+ sourceList .add ("Alpha" );
203+ sourceList .add ("Beta" );
204+ sourceList .add ("Alpha" );
205+ sourceList .remove ("Alpha" );
206+ sourceList .add ("Gamma" );
207+ sourceList .remove ("Gamma" );
208+ sourceList .remove ("Alpha" );
209+ });
210+
211+ try {
212+ gate .await ();
213+ } catch (InterruptedException e ) {
214+ e .printStackTrace ();
215+ }
216+ }
217+
218+
219+ @ Test
220+ public void testRxObservableListUpdates () {
221+ new JFXPanel ();
222+
223+ class Person {
224+ Property <String > name ;
225+ Property <Integer > age = new SimpleObjectProperty <>();
226+
227+ Person (String name , Integer age ) {
228+ this .name = new ReadOnlyStringWrapper (name );
229+ this .age .setValue (age );
230+ }
231+ @ Override
232+ public String toString () {
233+ return name .getValue ();
234+ }
235+ }
236+
237+ Person person1 = new Person ("Tom Salma" ,23 );
238+ Person person2 = new Person ("Jacob Mores" , 31 );
239+ Person person3 = new Person ("Sally Reyes" , 32 );
240+
241+ ObservableList <Person > sourceList = FXCollections .observableArrayList (user -> new javafx .beans .Observable []{user .age } );
242+ Observable <Person > emissions = JavaFxObservable .fromObservableListUpdates (sourceList );
243+
244+ CountDownLatch gate = new CountDownLatch (1 );
245+
246+ emissions .observeOn (Schedulers .io ())
247+ .observeOn (JavaFxScheduler .getInstance ())
248+ .take (2 )
249+ .count ()
250+ .subscribe (ct -> assertTrue (ct == 2 ),Throwable ::printStackTrace ,gate ::countDown );
251+
252+ Platform .runLater (() -> {
253+ sourceList .addAll (person1 ,person2 ,person3 );
254+ person1 .age .setValue (24 );
255+ person2 .age .setValue (32 );
256+ });
257+
258+ try {
259+ gate .await ();
260+ } catch (InterruptedException e ) {
261+ e .printStackTrace ();
262+ }
263+ }
79264}
0 commit comments