Skip to content

Commit a2c68e8

Browse files
Andrei Nadyktovlordgarrish
authored andcommitted
IGNITE-22530 Add CdcRegexManager
1 parent 853aeda commit a2c68e8

1 file changed

Lines changed: 223 additions & 0 deletions

File tree

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.cdc;
19+
20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.util.List;
24+
import java.util.Optional;
25+
import java.util.Set;
26+
import java.util.regex.Pattern;
27+
import java.util.regex.PatternSyntaxException;
28+
import java.util.stream.Collectors;
29+
30+
import org.apache.ignite.IgniteException;
31+
import org.apache.ignite.IgniteLogger;
32+
import org.apache.ignite.internal.util.typedef.internal.CU;
33+
34+
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
35+
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
36+
37+
/**
38+
* TODO: Add JavaDoc
39+
*/
40+
public class CdcRegexManager implements CdcRegexMatcher {
41+
42+
/** File with saved names of caches added by cache masks. */
43+
private static final String SAVED_CACHES_FILE = "caches";
44+
45+
/** Temporary file with saved names of caches added by cache masks. */
46+
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";
47+
48+
/** CDC directory path. */
49+
private final Path cdcDir;
50+
51+
/** Include regex patterns for cache names. */
52+
private Set<Pattern> includeFilters;
53+
54+
/** Exclude regex patterns for cache names. */
55+
private Set<Pattern> excludeFilters;
56+
57+
/** Logger. */
58+
private IgniteLogger log;
59+
60+
public CdcRegexManager(Path cdcDir, IgniteLogger log) {
61+
this.cdcDir = cdcDir;
62+
this.log = log;
63+
}
64+
65+
/** {@inheritDoc} */
66+
@Override public boolean match(String cacheName) {
67+
return matchAndSave(cacheName);
68+
}
69+
70+
/**
71+
* Get actual list of names of caches added by regex templates from cache list file.
72+
* Caches that added to replication through regex templates during the work of CDC application,
73+
* are saved to file so they can be restored after application restart.
74+
*
75+
* @return Caches names list.
76+
*/
77+
public List<String> getSavedCaches() {
78+
try {
79+
return loadCaches().stream()
80+
.filter(this::matchesFilters)
81+
.collect(Collectors.toList());
82+
}
83+
catch (IOException e) {
84+
throw new IgniteException(e);
85+
}
86+
}
87+
88+
/**
89+
* Finds match between cache name and user's regex templates.
90+
* If match is found, saves cache name to file.
91+
*
92+
* @param cacheName Cache name.
93+
* @return True if cache name matches user's regexp patterns.
94+
*/
95+
private boolean matchAndSave(String cacheName) {
96+
if (matchesFilters(cacheName)) {
97+
try {
98+
List<String> caches = loadCaches();
99+
100+
caches.add(cacheName);
101+
102+
save(caches);
103+
}
104+
catch (IOException e) {
105+
throw new IgniteException(e);
106+
}
107+
108+
if (log.isInfoEnabled())
109+
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
110+
111+
return true;
112+
}
113+
return false;
114+
}
115+
116+
/**
117+
* Matches cache name with compiled regex patterns.
118+
*
119+
* @param cacheName Cache name.
120+
* @return True if cache name matches include patterns and doesn't match exclude patterns.
121+
*/
122+
private boolean matchesFilters(String cacheName) {
123+
boolean matchesInclude = includeFilters.stream()
124+
.anyMatch(pattern -> pattern.matcher(cacheName).matches());
125+
126+
boolean notMatchesExclude = excludeFilters.stream()
127+
.noneMatch(pattern -> pattern.matcher(cacheName).matches());
128+
129+
return matchesInclude && notMatchesExclude;
130+
}
131+
132+
/**
133+
* Compiles regex patterns from user templates.
134+
*
135+
* @param includeTemplates Include regex templates.
136+
* @param excludeTemplates Exclude regex templates.
137+
* @throws PatternSyntaxException If the template's syntax is invalid
138+
*/
139+
public void compileRegexp(Set<String> includeTemplates, Set<String> excludeTemplates) {
140+
includeFilters = includeTemplates.stream()
141+
.map(Pattern::compile)
142+
.collect(Collectors.toSet());
143+
144+
excludeFilters = excludeTemplates.stream()
145+
.map(Pattern::compile)
146+
.collect(Collectors.toSet());
147+
}
148+
149+
/**
150+
* Loads saved CDC caches from file. If file not found, creates a new one containing empty list.
151+
*
152+
* @return List of saved caches names.
153+
*/
154+
private List<String> loadCaches() throws IOException {
155+
if (cdcDir == null) {
156+
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
157+
}
158+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
159+
160+
if (Files.notExists(savedCachesPath)) {
161+
Files.createFile(savedCachesPath);
162+
163+
if (log.isInfoEnabled())
164+
log.info("Cache list created: " + savedCachesPath);
165+
}
166+
167+
return Files.readAllLines(savedCachesPath);
168+
}
169+
170+
/**
171+
* Writes caches list to file.
172+
*
173+
* @param caches Caches list.
174+
*/
175+
private void save(List<String> caches) throws IOException {
176+
if (cdcDir == null) {
177+
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
178+
}
179+
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
180+
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);
181+
182+
StringBuilder cacheList = new StringBuilder();
183+
184+
for (String cache : caches) {
185+
cacheList.append(cache);
186+
187+
cacheList.append('\n');
188+
}
189+
190+
Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());
191+
192+
Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
193+
}
194+
195+
/**
196+
* Removes cache added by regexp from cache list if such cache is present in file to prevent disk space overflow.
197+
*
198+
* @param cacheId Cache id.
199+
*/
200+
public void deleteRegexpCacheIfPresent(Integer cacheId) {
201+
try {
202+
List<String> caches = loadCaches();
203+
204+
Optional<String> cacheName = caches.stream()
205+
.filter(name -> CU.cacheId(name) == cacheId)
206+
.findAny();
207+
208+
if (cacheName.isPresent()) {
209+
String name = cacheName.get();
210+
211+
caches.remove(name);
212+
213+
save(caches);
214+
215+
if (log.isInfoEnabled())
216+
log.info("Cache has been removed from replication [cacheName=" + name + ']');
217+
}
218+
}
219+
catch (IOException e) {
220+
throw new IgniteException(e);
221+
}
222+
}
223+
}

0 commit comments

Comments
 (0)