Skip to content

Commit d59bdf3

Browse files
authored
[1.7.x] Support paralleledInBatch in promotion tool (#1209)
* Support paralleledInBatch in promotion tool * Move batchsize to config and fix commented issues
1 parent 46982a6 commit d59bdf3

4 files changed

Lines changed: 185 additions & 4 deletions

File tree

addons/promote/common/src/main/java/org/commonjava/indy/promote/conf/PromoteConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class PromoteConfig
3939

4040
private static final String AUTOLOCK_HOSTED_REPOS = "autolock.hosted.repos";
4141

42+
public static final int DEFAULT_PARALLELED_BATCH_SIZE = 100;
43+
4244
public static final long DEFAULT_LOCK_TIMEOUT_SECONDS = 30;
4345

4446
public static final boolean DEFAULT_AUTOLOCK = true;
@@ -53,6 +55,8 @@ public class PromoteConfig
5355

5456
private Long lockTimeoutSeconds;
5557

58+
private Integer paralleledBatchSize;
59+
5660
public PromoteConfig()
5761
{
5862
}
@@ -131,4 +135,19 @@ public InputStream getDefaultConfig()
131135
.getResourceAsStream( "default-promote.conf" );
132136
}
133137

138+
@ConfigName( "paralleled.batch.size" )
139+
public void setParalleledBatchSize( Integer paralleledBatchSize )
140+
{
141+
this.paralleledBatchSize = paralleledBatchSize;
142+
}
143+
144+
public int getParalleledBatchSize()
145+
{
146+
int ret = paralleledBatchSize == null ? DEFAULT_PARALLELED_BATCH_SIZE : paralleledBatchSize;
147+
if ( ret <= 0 )
148+
{
149+
ret = DEFAULT_PARALLELED_BATCH_SIZE;
150+
}
151+
return ret;
152+
}
134153
}

addons/promote/common/src/main/java/org/commonjava/indy/promote/validate/PromotionValidationTools.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.commonjava.indy.data.ArtifactStoreQuery;
2828
import org.commonjava.indy.model.core.ArtifactStore;
2929
import org.commonjava.indy.model.core.StoreKey;
30+
import org.commonjava.indy.promote.conf.PromoteConfig;
3031
import org.commonjava.indy.promote.validate.model.ValidationRequest;
3132
import org.commonjava.indy.util.LocationUtils;
3233
import org.commonjava.atlas.maven.graph.rel.ProjectRelationship;
@@ -112,6 +113,9 @@ public class PromotionValidationTools
112113
@Inject
113114
private ContentDigester contentDigester;
114115

116+
@Inject
117+
private PromoteConfig promoteConfig;
118+
115119
@Inject
116120
@WeftManaged
117121
@ExecutorConfig( named = "promote-validation-rules-executor", threads = 8 )
@@ -124,7 +128,8 @@ protected PromotionValidationTools()
124128
public PromotionValidationTools( final ContentManager manager, final StoreDataManager storeDataManager,
125129
final MavenPomReader pomReader, final MavenMetadataReader metadataReader,
126130
final MavenModelProcessor modelProcessor, final TypeMapper typeMapper,
127-
final TransferManager transferManager, final ContentDigester contentDigester )
131+
final TransferManager transferManager, final ContentDigester contentDigester,
132+
final Executor ruleParallelExecutor, final PromoteConfig config )
128133
{
129134
contentManager = manager;
130135
this.storeDataManager = storeDataManager;
@@ -134,6 +139,8 @@ public PromotionValidationTools( final ContentManager manager, final StoreDataMa
134139
this.typeMapper = typeMapper;
135140
this.transferManager = transferManager;
136141
this.contentDigester = contentDigester;
142+
this.ruleParallelExecutor = ruleParallelExecutor;
143+
this.promoteConfig = config;
137144
}
138145

139146
public StoreKey[] getValidationStoreKeys( final ValidationRequest request, final boolean includeSource )
@@ -561,6 +568,72 @@ public <K, V> void paralleledEach( Map<K, V> map, Closure closure )
561568
runParallelAndWait( entries, closure, logger );
562569
}
563570

571+
public <T> void paralleledInBatch( Collection<T> collection, Closure closure )
572+
{
573+
int batchSize = promoteConfig.getParalleledBatchSize();
574+
logger.trace( "Exe parallel on collection {} with closure {} in batch {}", collection, closure, batchSize );
575+
Collection<Collection<T>> batches = batch( collection, batchSize );
576+
runParallelInBatchAndWait( batches, closure, logger );
577+
}
578+
579+
public <T> void paralleledInBatch( T[] array, Closure closure )
580+
{
581+
int batchSize = promoteConfig.getParalleledBatchSize();
582+
logger.trace( "Exe parallel on array {} with closure {} in batch {}", array, closure, batchSize );
583+
Collection<Collection<T>> batches = batch( Arrays.asList( array ), batchSize );
584+
runParallelInBatchAndWait( batches, closure, logger );
585+
}
586+
587+
public <K, V> void paralleledInBatch( Map<K, V> map, Closure closure )
588+
{
589+
int batchSize = promoteConfig.getParalleledBatchSize();
590+
Set<Map.Entry<K, V>> entries = map.entrySet();
591+
logger.trace( "Exe parallel on map {} with closure {} in batch {}", entries, closure, batchSize );
592+
Collection<Collection<Map.Entry<K, V>>> batches = batch( entries, batchSize );
593+
runParallelInBatchAndWait( batches, closure, logger );
594+
}
595+
596+
private <T> void runParallelInBatchAndWait( Collection<Collection<T>> batches, Closure closure, Logger logger )
597+
{
598+
final CountDownLatch latch = new CountDownLatch( batches.size() );
599+
batches.forEach( batch -> ruleParallelExecutor.execute( () -> {
600+
try
601+
{
602+
logger.trace( "The paralleled exe on batch {}", batch );
603+
batch.forEach( e -> closure.call( e ) );
604+
}
605+
finally
606+
{
607+
latch.countDown();
608+
}
609+
} ) );
610+
611+
waitForCompletion( latch );
612+
}
613+
614+
private <T> Collection<Collection<T>> batch( Collection<T> collection, int batchSize )
615+
{
616+
Collection<Collection<T>> batches = new ArrayList<>();
617+
Collection<T> batch = new ArrayList<>( batchSize );
618+
int count = 0;
619+
for ( T t : collection )
620+
{
621+
( (ArrayList<T>) batch ).add( t );
622+
count++;
623+
if ( count >= batchSize )
624+
{
625+
( (ArrayList<Collection<T>>) batches ).add( batch );
626+
batch = new ArrayList<>( batchSize );
627+
count = 0;
628+
}
629+
}
630+
if ( batch != null && !batch.isEmpty() )
631+
{
632+
( (ArrayList<Collection<T>>) batches ).add( batch ); // first batch
633+
}
634+
return batches;
635+
}
636+
564637
private <T> void runParallelAndWait( Collection<T> runCollection, Closure closure, Logger logger )
565638
{
566639
Set<T> todo = new HashSet<>( runCollection );
@@ -577,6 +650,11 @@ private <T> void runParallelAndWait( Collection<T> runCollection, Closure closur
577650
}
578651
} ) );
579652

653+
waitForCompletion( latch );
654+
}
655+
656+
private void waitForCompletion( CountDownLatch latch )
657+
{
580658
try
581659
{
582660
// true if the count reached zero and false if timeout

addons/promote/common/src/test/java/org/commonjava/indy/promote/data/PromotionManagerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,15 @@ public void setup()
172172
WeftExecutorService validateService =
173173
new PoolWeftExecutorService( "test-validate-executor", (ThreadPoolExecutor) Executors.newCachedThreadPool(), 2, 10f, false,null, null );
174174
MavenModelProcessor modelProcessor = new MavenModelProcessor();
175+
176+
PromoteConfig config = new PromoteConfig();
175177
validator = new PromotionValidator( validationsManager,
176178
new PromotionValidationTools( contentManager, storeManager,
177179
galleyParts.getPomReader(),
178180
galleyParts.getMavenMetadataReader(),
179181
modelProcessor, galleyParts.getTypeMapper(),
180182
galleyParts.getTransferManager(),
181-
contentDigester ), storeManager, downloadManager, validateService, null );
182-
183-
PromoteConfig config = new PromoteConfig();
183+
contentDigester, null, config ), storeManager, downloadManager, validateService, null );
184184

185185
WeftExecutorService svc =
186186
new PoolWeftExecutorService( "test-executor", (ThreadPoolExecutor) Executors.newCachedThreadPool(), 2, 10f, false,null, null );
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package org.commonjava.indy.promote.validate;
2+
3+
import groovy.lang.Closure;
4+
import org.commonjava.indy.promote.conf.PromoteConfig;
5+
import org.junit.Test;
6+
7+
import java.util.ArrayList;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.concurrent.Executor;
11+
import java.util.concurrent.Executors;
12+
13+
import static junit.framework.TestCase.assertTrue;
14+
import static org.hamcrest.CoreMatchers.equalTo;
15+
import static org.junit.Assert.assertThat;
16+
17+
public class PromotionValidationToolsTest
18+
{
19+
final String[] array = { "this", "is", "a", "err_weird", "test", "err_for", "paralleled", "err_in", "batch" };
20+
21+
@Test
22+
public void testParalleledInBatch()
23+
{
24+
PromoteConfig config = new PromoteConfig();
25+
Executor executor = Executors.newCachedThreadPool();
26+
PromotionValidationTools tools =
27+
new PromotionValidationTools( null, null, null, null, null, null, null, null, executor,
28+
config );
29+
30+
List<String> errors = Collections.synchronizedList( new ArrayList<>() );
31+
Closure closure = new Closure<String>( null )
32+
{
33+
@Override
34+
public String call( Object arg )
35+
{
36+
if ( ( (String) arg ).startsWith( "err_" ) )
37+
{
38+
errors.add( (String) arg );
39+
}
40+
return null;
41+
}
42+
};
43+
44+
tools.paralleledInBatch( array, closure );
45+
verifyIt( errors );
46+
}
47+
48+
@Test
49+
public void testParalleledInBatch_smallSize()
50+
{
51+
PromoteConfig config = new PromoteConfig();
52+
config.setParalleledBatchSize( 2 );
53+
54+
Executor executor = Executors.newCachedThreadPool();
55+
PromotionValidationTools tools =
56+
new PromotionValidationTools( null, null, null, null, null, null, null, null, executor,
57+
config );
58+
59+
List<String> errors = Collections.synchronizedList( new ArrayList<>() );
60+
Closure closure = new Closure<String>( null )
61+
{
62+
@Override
63+
public String call( Object arg )
64+
{
65+
if ( ( (String) arg ).startsWith( "err_" ) )
66+
{
67+
errors.add( (String) arg );
68+
}
69+
return null;
70+
}
71+
};
72+
73+
tools.paralleledInBatch( array, closure );
74+
verifyIt( errors );
75+
}
76+
77+
private void verifyIt( List<String> errors )
78+
{
79+
assertThat( errors.size(), equalTo( 3 ) );
80+
assertTrue( errors.contains( "err_weird" ) );
81+
assertTrue( errors.contains( "err_for" ) );
82+
assertTrue( errors.contains( "err_in" ) );
83+
}
84+
}

0 commit comments

Comments
 (0)