Skip to content

Commit 44de7ee

Browse files
committed
Put ETL inside transaction
1 parent 29b56af commit 44de7ee

1 file changed

Lines changed: 95 additions & 85 deletions

File tree

SivStudies/src/org/labkey/sivstudies/etl/SubjectScopedSelect.java

Lines changed: 95 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.labkey.api.data.CompareType;
1010
import org.labkey.api.data.Container;
1111
import org.labkey.api.data.ContainerManager;
12+
import org.labkey.api.data.DbScope;
1213
import org.labkey.api.data.SimpleFilter;
1314
import org.labkey.api.data.TableInfo;
1415
import org.labkey.api.data.TableSelector;
@@ -100,7 +101,7 @@ public boolean isRequired()
100101
}
101102
}
102103

103-
final int BATCH_SIZE = 100;
104+
final int BATCH_SIZE = 250;
104105

105106
private MODE getMode()
106107
{
@@ -136,127 +137,136 @@ private void checkCancelled(PipelineJob job)
136137
private void processBatch(List<String> subjects, Logger log, PipelineJob job)
137138
{
138139
log.info("processing batch with " + subjects.size() + " subjects");
139-
TableInfo destinationTable = getDataDestinationTable();
140+
try (DbScope.Transaction t = DbScope.getLabKeyScope().ensureTransaction())
141+
{
142+
TableInfo destinationTable = getDataDestinationTable();
140143

141-
QueryUpdateService qus = destinationTable.getUpdateService();
142-
qus.setBulkLoad(true);
144+
QueryUpdateService qus = destinationTable.getUpdateService();
145+
qus.setBulkLoad(true);
143146

144-
try
145-
{
146-
if (getMode() == MODE.TRUNCATE)
147+
try
147148
{
148-
// Find / Delete existing values:
149-
Set<ColumnInfo> keyFields = destinationTable.getColumns().stream().filter(ColumnInfo::isKeyField).collect(Collectors.toSet());
150-
final SimpleFilter subjectFilter = new SimpleFilter(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name())), subjects, CompareType.IN);
151-
if (_settings.get(Settings.targetAdditionalFilters.name()) != null)
149+
if (getMode() == MODE.TRUNCATE)
152150
{
153-
List<CompareType.AbstractCompareClause> additionalFilters = parseAdditionalFilters(_settings.get(Settings.targetAdditionalFilters.name()));
154-
additionalFilters.forEach(subjectFilter::addCondition);
155-
}
151+
// Find / Delete existing values:
152+
Set<ColumnInfo> keyFields = destinationTable.getColumns().stream().filter(ColumnInfo::isKeyField).collect(Collectors.toSet());
153+
final SimpleFilter subjectFilter = new SimpleFilter(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name())), subjects, CompareType.IN);
154+
if (_settings.get(Settings.targetAdditionalFilters.name()) != null)
155+
{
156+
List<CompareType.AbstractCompareClause> additionalFilters = parseAdditionalFilters(_settings.get(Settings.targetAdditionalFilters.name()));
157+
additionalFilters.forEach(subjectFilter::addCondition);
158+
}
156159

157-
if (destinationTable.getColumn(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name()))) == null)
158-
{
159-
throw new IllegalStateException("Unknown column on table " + destinationTable.getName() + ": " + _settings.get(Settings.targetSubjectColumn.name()));
160-
}
160+
if (destinationTable.getColumn(FieldKey.fromString(_settings.get(Settings.targetSubjectColumn.name()))) == null)
161+
{
162+
throw new IllegalStateException("Unknown column on table " + destinationTable.getName() + ": " + _settings.get(Settings.targetSubjectColumn.name()));
163+
}
161164

162-
List<Map<String, Object>> existingRows = new ArrayList<>(new TableSelector(destinationTable, keyFields, subjectFilter, null).getMapCollection());
163-
if (!existingRows.isEmpty())
164-
{
165-
List<List<Map<String, Object>>> batches = Lists.partition(existingRows, 5000);
166-
log.info("deleting " + existingRows.size() + " rows in " + batches.size() + " batches");
167-
int i = 0;
168-
for (List<Map<String, Object>> batch : batches)
165+
List<Map<String, Object>> existingRows = new ArrayList<>(new TableSelector(destinationTable, keyFields, subjectFilter, null).getMapCollection());
166+
if (!existingRows.isEmpty())
169167
{
170-
i++;
171-
log.info("batch " + i);
172-
checkCancelled(job);
168+
List<List<Map<String, Object>>> batches = Lists.partition(existingRows, 5000);
169+
log.info("deleting " + existingRows.size() + " rows in " + batches.size() + " batches");
170+
int i = 0;
171+
for (List<Map<String, Object>> batch : batches)
172+
{
173+
i++;
174+
log.info("batch " + i);
175+
checkCancelled(job);
173176

174-
qus.deleteRows(_containerUser.getUser(), _containerUser.getContainer(), batch, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null);
177+
qus.deleteRows(_containerUser.getUser(), _containerUser.getContainer(), batch, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null);
178+
t.commitAndKeepConnection();
179+
}
180+
}
181+
else
182+
{
183+
log.info("No rows to delete for this subject batch");
175184
}
176185
}
177186
else
178187
{
179-
log.info("No rows to delete for this subject batch");
188+
log.info("Using " + getMode().name() + " mode, source records will not be deleted");
180189
}
181-
}
182-
else
183-
{
184-
log.info("Using " + getMode().name() + " mode, source records will not be deleted");
185-
}
186190

187-
// Query data and import
188-
List<Map<String, Object>> toImportOrUpdate = getRowsToImport(subjects, log);
189-
if (!toImportOrUpdate.isEmpty())
190-
{
191-
if (getMode() == MODE.TRUNCATE)
191+
// Query data and import
192+
List<Map<String, Object>> toImportOrUpdate = getRowsToImport(subjects, log);
193+
if (!toImportOrUpdate.isEmpty())
192194
{
193-
List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000);
194-
log.info("inserting " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches");
195-
196-
int i = 0;
197-
for (List<Map<String, Object>> batch : batches)
195+
if (getMode() == MODE.TRUNCATE)
198196
{
199-
i++;
200-
log.info("batch " + i);
201-
checkCancelled(job);
197+
List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000);
198+
log.info("inserting " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches");
202199

203-
BatchValidationException bve = new BatchValidationException();
204-
qus.insertRows(_containerUser.getUser(), _containerUser.getContainer(), batch, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null);
205-
if (bve.hasErrors())
200+
int i = 0;
201+
for (List<Map<String, Object>> batch : batches)
206202
{
207-
throw bve;
203+
i++;
204+
log.info("batch " + i);
205+
checkCancelled(job);
206+
207+
BatchValidationException bve = new BatchValidationException();
208+
qus.insertRows(_containerUser.getUser(), _containerUser.getContainer(), batch, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null);
209+
if (bve.hasErrors())
210+
{
211+
throw bve;
212+
}
213+
t.commitAndKeepConnection();
208214
}
209215
}
210-
}
211-
else if (getMode() == MODE.UPDATE_ONLY)
212-
{
213-
List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000);
214-
log.info("updating " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches");
215-
216-
int i = 0;
217-
for (List<Map<String, Object>> batch : batches)
216+
else if (getMode() == MODE.UPDATE_ONLY)
218217
{
218+
List<List<Map<String, Object>>> batches = Lists.partition(toImportOrUpdate, 5000);
219+
log.info("updating " + toImportOrUpdate.size() + " rows in " + batches.size() + " batches");
219220

220-
i++;
221-
log.info("batch " + i);
222-
checkCancelled(job);
221+
int i = 0;
222+
for (List<Map<String, Object>> batch : batches)
223+
{
223224

224-
BatchValidationException bve = new BatchValidationException();
225+
i++;
226+
log.info("batch " + i);
227+
checkCancelled(job);
225228

226-
Collection<String> keyFields = destinationTable.getPkColumnNames();
227-
List<Map<String, Object>> keys = batch.stream().map(x -> {
228-
Map<String, Object> map = new HashMap<>();
229-
for (String keyField : keyFields)
230-
{
231-
if (x.get(keyField) != null)
229+
BatchValidationException bve = new BatchValidationException();
230+
231+
Collection<String> keyFields = destinationTable.getPkColumnNames();
232+
List<Map<String, Object>> keys = batch.stream().map(x -> {
233+
Map<String, Object> map = new HashMap<>();
234+
for (String keyField : keyFields)
232235
{
233-
map.put(keyField, x.get(keyField));
236+
if (x.get(keyField) != null)
237+
{
238+
map.put(keyField, x.get(keyField));
239+
}
234240
}
235-
}
236241

237-
return map;
238-
}).toList();
242+
return map;
243+
}).toList();
239244

240-
qus.updateRows(_containerUser.getUser(), _containerUser.getContainer(), batch, keys, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null);
241-
if (bve.hasErrors())
242-
{
243-
throw bve;
245+
qus.updateRows(_containerUser.getUser(), _containerUser.getContainer(), batch, keys, bve, new HashMap<>(Map.of(DetailedAuditLogDataIterator.AuditConfigs.AuditBehavior, NONE, QueryUpdateService.ConfigParameters.BulkLoad, true)), null);
246+
if (bve.hasErrors())
247+
{
248+
throw bve;
249+
}
250+
t.commitAndKeepConnection();
244251
}
245252
}
253+
else
254+
{
255+
throw new IllegalStateException("Unknown mode: " + getMode());
256+
}
246257
}
247258
else
248259
{
249-
throw new IllegalStateException("Unknown mode: " + getMode());
260+
log.info("No rows to import/update for this subject batch");
250261
}
251262
}
252-
else
263+
catch (SQLException | InvalidKeyException | BatchValidationException | QueryUpdateServiceException |
264+
DuplicateKeyException e)
253265
{
254-
log.info("No rows to import/update for this subject batch");
266+
throw new IllegalStateException("Error Importing/Updating Rows", e);
255267
}
256-
}
257-
catch (SQLException | InvalidKeyException | BatchValidationException | QueryUpdateServiceException | DuplicateKeyException e)
258-
{
259-
throw new IllegalStateException("Error Importing/Updating Rows", e);
268+
269+
t.commit();
260270
}
261271
}
262272

0 commit comments

Comments
 (0)