Skip to content

Commit 3d092a3

Browse files
committed
Добавление EventStream как хранителя событий
1 parent 2fe6046 commit 3d092a3

10 files changed

Lines changed: 389 additions & 277 deletions

File tree

heroes/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,21 @@
2222
<artifactId>reactor-core</artifactId>
2323
<version>3.7.12</version>
2424
</dependency>
25+
<dependency>
26+
<groupId>io.projectreactor.netty</groupId>
27+
<artifactId>reactor-netty-http</artifactId>
28+
<version>1.2.2</version>
29+
</dependency>
2530
<dependency>
2631
<groupId>com.fasterxml.jackson.core</groupId>
2732
<artifactId>jackson-databind</artifactId>
2833
<version>2.17.2</version>
2934
</dependency>
35+
<dependency>
36+
<groupId>com.fasterxml.jackson.datatype</groupId>
37+
<artifactId>jackson-datatype-jsr310</artifactId>
38+
<version>2.19.2</version>
39+
</dependency>
3040
<dependency>
3141
<groupId>org.slf4j</groupId>
3242
<artifactId>slf4j-api</artifactId>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package ru.mifi.practice.voln.event;
2+
3+
import lombok.Builder;
4+
5+
import java.time.LocalDateTime;
6+
7+
@Builder(toBuilder = true)
8+
public record Event(long streamId, long offset, LocalDateTime createdAt, Data data, Type type, Object source, Object target) {
9+
10+
public Event(long id, long offset, Data data) {
11+
this(id, offset, LocalDateTime.now(), data, Type.PUBLIC, null, null);
12+
}
13+
14+
public enum Type {
15+
PRIVATE,
16+
PUBLIC
17+
}
18+
19+
public record Data(Object message) {
20+
}
21+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package ru.mifi.practice.voln.event;
2+
3+
import java.util.concurrent.atomic.AtomicLong;
4+
5+
public interface EventSequence {
6+
long next();
7+
8+
final class Default implements EventSequence {
9+
private final AtomicLong counter = new AtomicLong(0);
10+
11+
@Override
12+
public long next() {
13+
return counter.incrementAndGet();
14+
}
15+
}
16+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package ru.mifi.practice.voln.event;
2+
3+
import lombok.Builder;
4+
5+
import java.time.LocalDateTime;
6+
import java.util.List;
7+
import java.util.stream.Stream;
8+
9+
public interface EventStream {
10+
long id();
11+
12+
Event add(Event.Data data, Event.Type type, Object source, Object target);
13+
14+
long size(long offset);
15+
16+
Stream<Event> getEvents(long offset, Event.Type type);
17+
18+
default Stream<Event> getEvents(long offset) {
19+
return getEvents(offset, Event.Type.PUBLIC);
20+
}
21+
22+
@Builder(toBuilder = true)
23+
record Default(long id, List<Event> events) implements EventStream {
24+
@Override
25+
public synchronized Event add(Event.Data data, Event.Type type, Object source, Object target) {
26+
Event event = Event.builder()
27+
.streamId(id)
28+
.offset(events.size())
29+
.createdAt(LocalDateTime.now())
30+
.data(data)
31+
.type(type)
32+
.source(source)
33+
.target(target)
34+
.build();
35+
events.add(event);
36+
return event;
37+
}
38+
39+
@Override
40+
public long size(long offset) {
41+
return Math.max(0, (long) events.size() - offset);
42+
}
43+
44+
@Override
45+
public Stream<Event> getEvents(long offset, Event.Type type) {
46+
return events.stream()
47+
.filter(event -> event.offset() >= offset && event.type() == type);
48+
}
49+
}
50+
}

heroes/src/main/java/ru/mifi/practice/voln/polling/Event.java

Lines changed: 0 additions & 10 deletions
This file was deleted.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package ru.mifi.practice.voln.polling;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6+
import io.netty.handler.codec.http.QueryStringDecoder;
7+
import lombok.extern.slf4j.Slf4j;
8+
import reactor.core.publisher.Mono;
9+
import reactor.netty.DisposableServer;
10+
import reactor.netty.http.server.HttpServer;
11+
import reactor.netty.http.server.HttpServerRequest;
12+
import reactor.netty.http.server.HttpServerResponse;
13+
import ru.mifi.practice.voln.event.Event;
14+
15+
import java.util.Map;
16+
import java.util.Objects;
17+
import java.util.stream.Collectors;
18+
19+
@Slf4j
20+
public final class EventHttpServer {
21+
22+
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
23+
private final int port;
24+
private final EventService eventService;
25+
private final TransportMode transportMode;
26+
private DisposableServer server;
27+
28+
public EventHttpServer(int port, EventService eventService) {
29+
this(port, eventService, TransportMode.LONG_POLLING);
30+
}
31+
32+
public EventHttpServer(int port, EventService eventService, TransportMode transportMode) {
33+
this.port = port;
34+
this.eventService = eventService;
35+
this.transportMode = transportMode;
36+
}
37+
38+
public void start() {
39+
server = HttpServer.create()
40+
.port(port)
41+
.route(routes -> routes
42+
.get("/events", this::handleGetEvents)
43+
.post("/events", this::handlePostEvent)
44+
.post("/streams", this::handleCreateStream)
45+
.get("/streams/{id}/events", this::handleGetStreamEvents)
46+
.post("/streams/{id}/events", this::handlePostStreamEvent))
47+
.bindNow();
48+
if (log.isInfoEnabled()) {
49+
log.info("Server started on port {}", server.port());
50+
}
51+
}
52+
53+
public void stop() {
54+
if (server != null) {
55+
server.disposeNow();
56+
}
57+
}
58+
59+
public int getPort() {
60+
return server.port();
61+
}
62+
63+
private Mono<Void> handleGetEvents(HttpServerRequest request, HttpServerResponse response) {
64+
Map<String, String> params = parseQueryParams(request.uri());
65+
long lastOffset = Long.parseLong(params.getOrDefault("last-offset", "0"));
66+
67+
if (transportMode == TransportMode.SSE) {
68+
return handleSseEvents(response, EventService.DEFAULT_STREAM_ID, lastOffset);
69+
} else {
70+
return handleLongPollingEvents(response, EventService.DEFAULT_STREAM_ID, lastOffset, params);
71+
}
72+
}
73+
74+
private Mono<Void> handleGetStreamEvents(HttpServerRequest request, HttpServerResponse response) {
75+
long streamId = Long.parseLong(Objects.requireNonNull(request.param("id")));
76+
Map<String, String> params = parseQueryParams(request.uri());
77+
long lastOffset = Long.parseLong(params.getOrDefault("last-offset", "0"));
78+
79+
if (transportMode == TransportMode.SSE) {
80+
return handleSseEvents(response, streamId, lastOffset);
81+
} else {
82+
return handleLongPollingEvents(response, streamId, lastOffset, params);
83+
}
84+
}
85+
86+
private Mono<Void> handleLongPollingEvents(HttpServerResponse response, long streamId, long lastOffset, Map<String, String> params) {
87+
long timeout = Long.parseLong(params.getOrDefault("timeout", "30"));
88+
return eventService.getEvents(streamId, lastOffset, timeout)
89+
.collectList()
90+
.flatMap(events -> {
91+
try {
92+
byte[] bytes = MAPPER.writeValueAsBytes(events);
93+
return response.header("Content-Type", "application/json")
94+
.sendByteArray(Mono.just(bytes))
95+
.then();
96+
} catch (JsonProcessingException e) {
97+
return response.status(500).send();
98+
}
99+
});
100+
}
101+
102+
private Mono<Void> handleSseEvents(HttpServerResponse response, long streamId, long lastOffset) {
103+
return response.header("Content-Type", "text/event-stream")
104+
.header("Cache-Control", "no-cache")
105+
.header("Connection", "keep-alive")
106+
.sendString(eventService.getEventStream(streamId, lastOffset)
107+
.map(event -> {
108+
try {
109+
return "data: " + MAPPER.writeValueAsString(event) + "\n\n";
110+
} catch (JsonProcessingException e) {
111+
throw new RuntimeException(e);
112+
}
113+
}))
114+
.then();
115+
}
116+
117+
@SuppressWarnings("PMD.UnusedFormalParameter")
118+
private Mono<Void> handleCreateStream(HttpServerRequest request, HttpServerResponse response) {
119+
return Mono.fromCallable(eventService::newEventStream)
120+
.flatMap(stream -> response.status(201)
121+
.header("Content-Type", "application/json")
122+
.sendString(Mono.just("{\"id\":" + stream.id() + "}"))
123+
.then());
124+
}
125+
126+
private Mono<Void> handlePostEvent(HttpServerRequest request, HttpServerResponse response) {
127+
return handlePostStreamEvent(request, response, EventService.DEFAULT_STREAM_ID);
128+
}
129+
130+
private Mono<Void> handlePostStreamEvent(HttpServerRequest request, HttpServerResponse response) {
131+
long streamId = Long.parseLong(Objects.requireNonNull(request.param("id")));
132+
return handlePostStreamEvent(request, response, streamId);
133+
}
134+
135+
private Mono<Void> handlePostStreamEvent(HttpServerRequest request, HttpServerResponse response, long streamId) {
136+
return request.receive()
137+
.aggregate()
138+
.asByteArray()
139+
.flatMap(bytes -> {
140+
try {
141+
Event.Data data = MAPPER.readValue(bytes, Event.Data.class);
142+
eventService.addEvent(streamId, data);
143+
return response.status(201).send();
144+
} catch (Exception e) {
145+
return response.status(400).send();
146+
}
147+
});
148+
}
149+
150+
private Map<String, String> parseQueryParams(String uri) {
151+
QueryStringDecoder decoder = new QueryStringDecoder(uri);
152+
return decoder.parameters().entrySet().stream()
153+
.collect(Collectors.toMap(
154+
Map.Entry::getKey,
155+
e -> e.getValue().isEmpty() ? "" : e.getValue().get(0),
156+
(v1, v2) -> v1
157+
));
158+
}
159+
160+
public enum TransportMode {
161+
LONG_POLLING,
162+
SSE
163+
}
164+
}

0 commit comments

Comments
 (0)