4343import java .util .Collection ;
4444import java .util .Collections ;
4545import java .util .HashMap ;
46+ import java .util .HashSet ;
4647import java .util .List ;
4748import java .util .Map ;
4849import java .util .Set ;
@@ -65,6 +66,7 @@ private enum Settings
6566 dataSourceContainerPath (false ),
6667 dataSourceSchema (true ),
6768 dataSourceQuery (true ),
69+ dataSourceSubjectColumn (true ),
6870 dataSourceColumns (true ),
6971 dataSourceColumnMapping (false ),
7072 dataSourceAdditionalFilters (false ),
@@ -120,6 +122,11 @@ private void processBatch(List<String> subjects, Logger log)
120122 additionalFilters .forEach (subjectFilter ::addCondition );
121123 }
122124
125+ if (destinationTable .getColumn (FieldKey .fromString (_settings .get (Settings .targetSubjectColumn .name ()))) == null )
126+ {
127+ throw new IllegalStateException ("Unknown column on table " + destinationTable .getName () + ": " + _settings .get (Settings .targetSubjectColumn .name ()));
128+ }
129+
123130 Collection <Map <String , Object >> existingRows = new TableSelector (destinationTable , keyFields , subjectFilter , null ).getMapCollection ();
124131 if (!existingRows .isEmpty ())
125132 {
@@ -132,11 +139,16 @@ private void processBatch(List<String> subjects, Logger log)
132139 }
133140
134141 // Query data and import
135- List <Map <String , Object >> toImport = getRowsToImport (log );
142+ List <Map <String , Object >> toImport = getRowsToImport (subjects , log );
136143 if (!toImport .isEmpty ())
137144 {
138145 log .info ("inserting " + toImport .size () + " rows" );
139- qus .insertRows (_containerUser .getUser (), _containerUser .getContainer (), toImport , new BatchValidationException (), null , null );
146+ BatchValidationException bve = new BatchValidationException ();
147+ qus .insertRows (_containerUser .getUser (), _containerUser .getContainer (), toImport , bve , null , null );
148+ if (bve .hasErrors ())
149+ {
150+ throw bve ;
151+ }
140152 }
141153 else
142154 {
@@ -158,7 +170,7 @@ private List<CompareType.CompareClause> parseAdditionalFilters(String rawVal)
158170 }
159171
160172 SimpleFilter filter = new SimpleFilter ();
161- String [] filters = rawVal .split (", " );
173+ String [] filters = rawVal .split ("; " );
162174 for (String queryParam : filters )
163175 {
164176 filter .addUrlFilters (new ActionURL ().setRawQuery (queryParam ), null );
@@ -234,7 +246,7 @@ private Map<String, String> parseColumnDefaultMap(String rawVal)
234246 return colMap ;
235247 }
236248
237- private List <Map <String , Object >> getRowsToImport (Logger log )
249+ private List <Map <String , Object >> getRowsToImport (List < String > subjects , Logger log )
238250 {
239251 if (_settings .get (Settings .dataSourceColumns .name ()) == null )
240252 {
@@ -250,6 +262,7 @@ private List<Map<String, Object>> getRowsToImport(Logger log)
250262 DataIntegrationService .RemoteConnection rc = getRemoteDataSource (_settings .get (Settings .dataRemoteSource .name ()), log );
251263 SelectRowsCommand sr = new SelectRowsCommand (_settings .get (Settings .dataSourceSchema .name ()), _settings .get (Settings .dataSourceQuery .name ()));
252264 sr .setColumns (sourceColumns );
265+ sr .addFilter (_settings .get (Settings .dataSourceSubjectColumn .name ()), StringUtils .join (subjects , ";" ), Filter .Operator .IN );
253266 if (_settings .get (Settings .dataSourceAdditionalFilters .name ()) != null )
254267 {
255268 List <CompareType .CompareClause > additionalFilters = parseAdditionalFilters (_settings .get (Settings .dataSourceAdditionalFilters .name ()));
@@ -269,7 +282,13 @@ else if (f.getParamVals().length == 1)
269282 value = StringUtils .join (f .getParamVals (), ";" );
270283 }
271284
272- sr .addFilter (new Filter (f .getFieldKey ().toString (), value , Filter .Operator .valueOf (f .getCompareType ().getFilterValueText ())));
285+ Filter .Operator o = Filter .Operator .getOperatorFromUrlKey (f .getCompareType ().getPreferredUrlKey ());
286+ if (o == null )
287+ {
288+ throw new IllegalStateException ("Unknown operator: " + f .getCompareType ().getPreferredUrlKey () + ", raw filter: " + f .getCompareType ().name ());
289+ }
290+
291+ sr .addFilter (new Filter (f .getFieldKey ().toString (), value , o ));
273292 }
274293 }
275294
@@ -326,15 +345,19 @@ else if (f.getParamVals().length == 1)
326345 }
327346 }
328347
348+ if (sourceTable .getColumn (_settings .get (Settings .dataSourceSubjectColumn .name ())) == null )
349+ {
350+ throw new IllegalStateException ("Table is missing column: " + _settings .get (Settings .dataSourceSubjectColumn .name ()));
351+ }
329352
330- final SimpleFilter filter = new SimpleFilter ();
353+ final SimpleFilter filter = new SimpleFilter (_settings . get ( Settings . dataSourceSubjectColumn . name ()), subjects , CompareType . IN );
331354 if (_settings .get (Settings .dataSourceAdditionalFilters .name ()) != null )
332355 {
333356 List <CompareType .CompareClause > additionalFilters = parseAdditionalFilters (_settings .get (Settings .dataSourceAdditionalFilters .name ()));
334357 additionalFilters .forEach (filter ::addCondition );
335358 }
336359
337- TableSelector ts = new TableSelector (sourceTable , PageFlowUtil . set ( _settings . get ( Settings . subjectSourceColumn . name ()) ), filter , null );
360+ TableSelector ts = new TableSelector (sourceTable , new HashSet <>( sourceColumns ), filter , null );
338361
339362 return doNameMapping (new ArrayList <>(ts .getMapCollection ()), sourceToDestColumMap , columnToDefaultMap );
340363 }
@@ -400,7 +423,7 @@ public List<ValidationError> preFlightCheck(Container c)
400423 List <ValidationError > errors = new ArrayList <>();
401424 for (String setting : getRequiredSettings ())
402425 {
403- if (_settings .get (setting ) == null )
426+ if (_settings .get (setting ) == null || StringUtils . isEmpty ( _settings . get ( setting )) )
404427 {
405428 errors .add (new SimpleValidationError ("Missing required setting: " + setting ));
406429 }
0 commit comments