Skip to content

Commit 9fd805b

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Refactor code
1 parent 7d4d500 commit 9fd805b

3 files changed

Lines changed: 27 additions & 14 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
7070
/** */
7171
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";
7272

73-
/** */
74-
public static final String DFLT_REGEXP = "";
75-
7673
/** Handle only primary entry flag. */
7774
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
7875

@@ -83,10 +80,10 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
8380
private CdcRegexManager regexManager;
8481

8582
/** Include regex template for cache names. */
86-
private String includeTemplate = DFLT_REGEXP;
83+
private String includeTemplate;
8784

8885
/** Exclude regex template for cache names. */
89-
private String excludeTemplate = DFLT_REGEXP;
86+
private String excludeTemplate;
9087

9188
/** Cache IDs. */
9289
protected Set<Integer> cachesIds;
@@ -122,7 +119,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
122119
@Override public void start(MetricRegistry reg, List<String> cacheNames) {
123120
A.notEmpty(caches, "caches");
124121

125-
regexManager = new CdcRegexManager();
122+
regexManager = new CdcRegexManager(log);
126123

127124
cachesIds = caches.stream()
128125
.mapToInt(CU::cacheId)

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.regex.PatternSyntaxException;
2222

2323
import org.apache.ignite.IgniteException;
24+
import org.apache.ignite.IgniteLogger;
2425

2526
/**
2627
* Contains logic to process user's regexp patterns for CDC.
@@ -33,13 +34,31 @@ public class CdcRegexManager {
3334
/** Exclude regex pattern for cache names. */
3435
private Pattern excludeFilter;
3536

37+
/** Logger. */
38+
private IgniteLogger log;
39+
40+
/**
41+
*
42+
* @param log Logger.
43+
*/
44+
public CdcRegexManager(IgniteLogger log) {
45+
this.log = log;
46+
}
47+
3648
/**
3749
* Matches cache name with compiled regex patterns.
3850
*
3951
* @param cacheName Cache name.
4052
* @return True if cache name matches include pattern and doesn't match exclude pattern.
4153
*/
4254
public boolean matchesFilters(String cacheName) {
55+
if (includeFilter.matcher(cacheName).matches() && excludeFilter.matcher(cacheName).matches()) {
56+
log.warning("Cache name matches both include and exclude regexp templates. Will ignore to prevent " +
57+
"ambiguity [cacheName=" + cacheName + ", includeTemplate=" + includeFilter + ", excludeTemplate=" +
58+
excludeFilter + "]");
59+
60+
return false;
61+
}
4362
return includeFilter.matcher(cacheName).matches() && !excludeFilter.matcher(cacheName).matches();
4463
}
4564

@@ -52,9 +71,9 @@ public boolean matchesFilters(String cacheName) {
5271
*/
5372
public void compileRegexp(String includeTemplate, String excludeTemplate) {
5473
try {
55-
includeFilter = Pattern.compile(includeTemplate);
74+
includeFilter = includeTemplate != null ? Pattern.compile(includeTemplate) : Pattern.compile("");
5675

57-
excludeFilter = Pattern.compile(excludeTemplate);
76+
excludeFilter = excludeTemplate != null ? Pattern.compile(excludeTemplate) : Pattern.compile("");
5877
}
5978
catch (PatternSyntaxException e) {
6079
throw new IgniteException("Invalid cache regexp template", e);

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,6 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumerEx {
115115
/** Count of metadata markers sent description. */
116116
public static final String MARKERS_SENT_CNT_DESC = "Count of metadata markers sent to Kafka";
117117

118-
/** */
119-
public static final String DFLT_REGEXP = "";
120-
121118
/** Default value for the flag that indicates whether entries only from primary nodes should be handled. */
122119
public static final boolean DFLT_IS_ONLY_PRIMARY = false;
123120

@@ -156,10 +153,10 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumerEx {
156153
private CdcRegexManager regexManager;
157154

158155
/** Include regex template for cache names. */
159-
private String includeTemplate = DFLT_REGEXP;
156+
private String includeTemplate;
160157

161158
/** Exclude regex template for cache names. */
162-
private String excludeTemplate = DFLT_REGEXP;
159+
private String excludeTemplate;
163160

164161
/** Max batch size. */
165162
private int maxBatchSz = DFLT_MAX_BATCH_SIZE;
@@ -346,7 +343,7 @@ private <T> void sendOneBatch(
346343
kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
347344
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
348345

349-
regexManager = new CdcRegexManager();
346+
regexManager = new CdcRegexManager(log);
350347

351348
cachesIds = caches.stream()
352349
.map(CU::cacheId)

0 commit comments

Comments
 (0)