Skip to content

Commit b2cb801

Browse files
committed
Long Polling
1 parent 78ae71f commit b2cb801

6 files changed

Lines changed: 391 additions & 62 deletions

File tree

vol_/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
<artifactId>reactor-core</artifactId>
4848
<version>3.7.12</version>
4949
</dependency>
50+
<dependency>
51+
<groupId>com.fasterxml.jackson.core</groupId>
52+
<artifactId>jackson-databind</artifactId>
53+
<version>2.17.2</version>
54+
</dependency>
5055
<dependency>
5156
<groupId>io.projectreactor</groupId>
5257
<artifactId>reactor-test</artifactId>

vol_/src/main/java/ru/mifi/practice/voln/object/ObjectPool.java

Lines changed: 98 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,31 @@
11
package ru.mifi.practice.voln.object;
22

3-
import lombok.SneakyThrows;
43
import lombok.extern.slf4j.Slf4j;
54
import net.bytebuddy.ByteBuddy;
65
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
6+
import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
77
import net.bytebuddy.implementation.MethodDelegation;
8+
import net.bytebuddy.implementation.bind.annotation.AllArguments;
9+
import net.bytebuddy.implementation.bind.annotation.Origin;
10+
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
811

912
import java.io.Closeable;
10-
import java.lang.reflect.Field;
13+
import java.io.IOException;
14+
import java.lang.reflect.Constructor;
15+
import java.lang.reflect.InvocationTargetException;
16+
import java.lang.reflect.Method;
1117
import java.util.Optional;
1218
import java.util.concurrent.BlockingQueue;
1319
import java.util.concurrent.LinkedBlockingQueue;
1420
import java.util.concurrent.Semaphore;
1521
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
1623
import java.util.concurrent.atomic.AtomicInteger;
1724
import java.util.function.Consumer;
1825
import java.util.function.Predicate;
1926
import java.util.function.Supplier;
2027

2128
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
22-
import static net.bytebuddy.matcher.ElementMatchers.named;
23-
import static net.bytebuddy.matcher.ElementMatchers.not;
2429

2530
public interface ObjectPool<T extends Closeable> extends Closeable {
2631
Optional<T> getObject(long timeout, TimeUnit unit);
@@ -31,6 +36,10 @@ default Optional<T> getObject() {
3136

3237
void dispose(T object);
3338

39+
interface ProxyMarker {
40+
Object getTarget();
41+
}
42+
3443
@SuppressWarnings("PMD.CloseResource")
3544
@Slf4j
3645
final class Generic<T extends Closeable> implements ObjectPool<T> {
@@ -42,6 +51,7 @@ final class Generic<T extends Closeable> implements ObjectPool<T> {
4251
private final AtomicInteger createdCount;
4352
private final Semaphore semaphore;
4453
private final Class<T> type;
54+
private final Object lock = new Object();
4555

4656
public Generic(Supplier<T> creator,
4757
Consumer<T> refresh,
@@ -77,24 +87,32 @@ private void initializePool(int minSize) {
7787
}
7888
}
7989

80-
@SneakyThrows
8190
@Override
8291
public Optional<T> getObject(long timeout, TimeUnit unit) {
83-
if (!semaphore.tryAcquire(timeout, unit)) {
92+
try {
93+
if (!semaphore.tryAcquire(timeout, unit)) {
94+
return Optional.empty();
95+
}
96+
} catch (InterruptedException e) {
97+
Thread.currentThread().interrupt();
8498
return Optional.empty();
8599
}
86100

87101
try {
88102
T obj = pool.poll();
89103
if (obj == null) {
90-
synchronized (this) {
104+
synchronized (lock) {
91105
if (createdCount.get() < maxSize) {
92106
obj = creator.get();
93107
createdCount.incrementAndGet();
94108
}
95109
}
96110
if (obj == null) {
97-
obj = pool.poll(timeout, unit);
111+
try {
112+
obj = pool.poll(timeout, unit);
113+
} catch (InterruptedException e) {
114+
Thread.currentThread().interrupt();
115+
}
98116
}
99117
}
100118

@@ -110,51 +128,50 @@ public Optional<T> getObject(long timeout, TimeUnit unit) {
110128
}
111129

112130
return Optional.of(Wrapper.proxy(this, obj, type));
113-
} catch (Exception e) {
131+
} catch (RuntimeException e) {
114132
semaphore.release();
115133
throw e;
134+
} catch (Exception e) {
135+
semaphore.release();
136+
throw new RuntimeException(e);
116137
}
117138
}
118139

119-
@SuppressWarnings("PMD.AvoidAccessibilityAlteration")
120140
@Override
121141
public void dispose(T object) {
122142
if (object == null) {
123143
semaphore.release();
124144
return;
125145
}
126-
try {
127-
Field target = object.getClass().getDeclaredField("target");
128-
target.setAccessible(true);
129-
if (target.get(object).getClass().equals(type)) {
130-
object = (T) target.get(object);
131-
}
132-
} catch (Exception e) {
133-
if (log.isErrorEnabled()) {
134-
log.error("Error fetch field: {}", e.getMessage());
135-
}
146+
147+
T target = object;
148+
if (object instanceof ProxyMarker) {
149+
target = (T) ((ProxyMarker) object).getTarget();
136150
}
137151

138152
try {
139-
if (validator.test(object)) {
140-
refresh.accept(object);
141-
if (!pool.offer(object)) {
142-
destroyObject(object);
153+
if (validator.test(target)) {
154+
refresh.accept(target);
155+
if (!pool.offer(target)) {
156+
destroyObject(target);
143157
}
144158
} else {
145-
destroyObject(object);
159+
destroyObject(target);
146160
}
147161
} catch (Exception e) {
148-
destroyObject(object);
162+
destroyObject(target);
149163
} finally {
150164
semaphore.release();
151165
}
152166
}
153167

154-
@SneakyThrows
155168
private void destroyObject(T object) {
156169
try {
157170
object.close();
171+
} catch (IOException e) {
172+
if (log.isErrorEnabled()) {
173+
log.error("Error closing object: {}", e.getMessage());
174+
}
158175
} finally {
159176
createdCount.decrementAndGet();
160177
}
@@ -169,28 +186,67 @@ public void close() {
169186
createdCount.set(0);
170187
}
171188

172-
private record Wrapper<T extends Closeable>(T target, ObjectPool<T> pool) {
189+
private static final class Wrapper<T extends Closeable> {
190+
private final T target;
191+
private final ObjectPool<T> pool;
192+
private final AtomicBoolean closed = new AtomicBoolean(false);
193+
194+
private Wrapper(T target, ObjectPool<T> pool) {
195+
this.target = target;
196+
this.pool = pool;
197+
}
173198

174-
@SuppressWarnings("resource")
175-
@SneakyThrows
199+
@SuppressWarnings("unchecked")
176200
public static <T extends Closeable> T proxy(ObjectPool<T> pool, T target, Class<T> clazz) {
177-
return new ByteBuddy()
178-
.subclass(clazz)
179-
.method(isPublic().and(not(named("close"))))
180-
.intercept(MethodDelegation.to(target))
181-
.method(named("close"))
182-
.intercept(MethodDelegation.to(new Wrapper<>(target, pool)))
201+
Wrapper<T> wrapper = new Wrapper<>(target, pool);
202+
Class<? extends T> proxyClass = new ByteBuddy()
203+
.subclass(clazz, ConstructorStrategy.Default.NO_CONSTRUCTORS)
204+
.implement(ProxyMarker.class)
205+
.method(isPublic())
206+
.intercept(MethodDelegation.to(wrapper))
183207
.make()
184208
.load(clazz.getClassLoader() != null ? clazz.getClassLoader() : ClassLoader.getSystemClassLoader(),
185209
ClassLoadingStrategy.Default.INJECTION)
186-
.getLoaded()
187-
.getDeclaredConstructor()
188-
.newInstance();
210+
.getLoaded();
211+
212+
try {
213+
try {
214+
return proxyClass.getDeclaredConstructor().newInstance();
215+
} catch (NoSuchMethodException e) {
216+
Constructor<?> objCtx = Object.class.getDeclaredConstructor();
217+
Constructor<?> cons = sun.reflect.ReflectionFactory.getReflectionFactory()
218+
.newConstructorForSerialization(proxyClass, objCtx);
219+
return (T) cons.newInstance();
220+
}
221+
} catch (Exception e) {
222+
throw new RuntimeException("Failed to create proxy", e);
223+
}
189224
}
190225

191-
@SuppressWarnings("unused")
192-
public void close() {
193-
pool.dispose(target);
226+
@RuntimeType
227+
public Object intercept(@Origin Method method, @AllArguments Object[] args) throws Throwable {
228+
String name = method.getName();
229+
int params = method.getParameterCount();
230+
231+
if ("close".equals(name) && params == 0) {
232+
if (closed.compareAndSet(false, true)) {
233+
pool.dispose(target);
234+
}
235+
return null;
236+
}
237+
if ("getTarget".equals(name) && params == 0) {
238+
return target;
239+
}
240+
241+
if (closed.get()) {
242+
throw new IllegalStateException("Object pool proxy is closed");
243+
}
244+
245+
try {
246+
return method.invoke(target, args);
247+
} catch (InvocationTargetException e) {
248+
throw e.getCause();
249+
}
194250
}
195251
}
196252
}

vol_/src/main/java/ru/mifi/practice/voln/polling/Event.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
@Builder(toBuilder = true)
66
public record Event(long id, Data data) {
7-
public record Data() {
87

8+
public record Data(Object message) {
99
}
1010
}
Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package ru.mifi.practice.voln.polling;
22

3-
import reactor.core.publisher.Flux;
4-
import reactor.core.publisher.Sinks;
5-
63
import java.time.Duration;
74
import java.util.concurrent.ConcurrentNavigableMap;
85
import java.util.concurrent.ConcurrentSkipListMap;
96
import java.util.concurrent.atomic.AtomicLong;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.Mono;
9+
import reactor.core.publisher.Sinks;
1010

1111
public interface EventService {
1212

@@ -19,6 +19,7 @@ final class Default implements EventService {
1919
private final Sinks.Many<Event> eventSink = Sinks.many().multicast().onBackpressureBuffer(1000);
2020
private final AtomicLong eventId = new AtomicLong(1);
2121
private final long bufferSize;
22+
private final AtomicLong currentSize = new AtomicLong(0);
2223

2324
public Default(long bufferSize) {
2425
this.bufferSize = bufferSize;
@@ -27,32 +28,27 @@ public Default(long bufferSize) {
2728
@Override
2829
public synchronized void addEvent(Event.Data data) {
2930
long id = eventId.getAndIncrement();
30-
var event = new Event(id, data);
31+
Event event = new Event(id, data);
3132
eventBuffer.put(event.id(), event);
32-
if (eventBuffer.size() > bufferSize) {
33+
if (currentSize.incrementAndGet() > bufferSize) {
3334
eventBuffer.pollFirstEntry();
35+
currentSize.decrementAndGet();
3436
}
3537
eventSink.tryEmitNext(event);
3638
}
3739

3840
@Override
3941
public Flux<Event> getEvents(Long lastOffset, long timeoutSeconds) {
40-
return Flux.defer(() -> {
41-
Flux<Event> historicalEvents = getHistoricalEvents(lastOffset);
42-
Flux<Event> newEvents = eventSink.asFlux()
43-
.filter(event -> event.id() > (lastOffset != null ? lastOffset : 0))
44-
.timeout(Duration.ofSeconds(timeoutSeconds))
45-
.onErrorResume(e -> Flux.empty());
46-
47-
return Flux.concat(historicalEvents, newEvents);
48-
});
42+
long offset = lastOffset != null ? lastOffset : 0L;
43+
return getHistoricalEvents(offset)
44+
.switchIfEmpty(Mono.defer(() -> eventSink.asFlux()
45+
.filter(event -> event.id() > offset)
46+
.next()
47+
.timeout(Duration.ofSeconds(timeoutSeconds), Mono.empty())));
4948
}
5049

51-
private Flux<Event> getHistoricalEvents(Long lastOffset) {
52-
return Flux.fromIterable(() -> {
53-
Long startKey = lastOffset != null ? lastOffset + 1 : 1L;
54-
return eventBuffer.tailMap(startKey).values().iterator();
55-
});
50+
private Flux<Event> getHistoricalEvents(long lastOffset) {
51+
return Flux.fromIterable(eventBuffer.tailMap(lastOffset + 1).values());
5652
}
5753
}
5854
}

0 commit comments

Comments
 (0)