Skip to content

Commit b6e803f

Browse files
author
Andy Ford
authored
Merge pull request #18 from ECFMP/async-ops-lag
fix: euroscope freezes when api is unavailable
2 parents 8f45f94 + 46bdbc2 commit b6e803f

11 files changed

Lines changed: 194 additions & 30 deletions

src/CMakeLists.txt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ set(src_plugin
3636
plugin/InternalSdk.cpp
3737
plugin/SdkFactory.cpp ../include/ECFMP/SdkEvents.h plugin/InternalSdkEvents.h)
3838

39+
set(src_thread
40+
thread/ThreadPool.cpp
41+
thread/ThreadPool.h)
42+
3943
set(src_time time/Clock.cpp time/Clock.h)
4044

4145
set(ALL_FILES
@@ -49,6 +53,7 @@ set(ALL_FILES
4953
${src_log}
5054
${src_pch}
5155
${src_plugin}
56+
${src_thread}
5257
${src_time})
5358

5459
add_library(${PROJECT_NAME} STATIC ${ALL_FILES})
@@ -59,17 +64,17 @@ set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 20)
5964
target_include_directories(${PROJECT_NAME} PRIVATE
6065
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann;"
6166
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;"
62-
)
67+
)
6368

6469
target_include_directories(${PROJECT_NAME} PUBLIC
6570
"${CMAKE_CURRENT_SOURCE_DIR};"
6671
"${CMAKE_CURRENT_SOURCE_DIR}/../include;"
67-
)
72+
)
6873

6974
# Treat Euroscope as a system include directory to suppress warning, because they have lots
7075
target_include_directories(${PROJECT_NAME} SYSTEM PRIVATE
7176
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;"
72-
)
77+
)
7378

7479
target_compile_options(${PROJECT_NAME} PRIVATE
7580
$<$<CONFIG:Debug>:
@@ -89,4 +94,4 @@ target_compile_options(${PROJECT_NAME} PRIVATE
8994
/WX;
9095
-Wno-unused-parameter; # Lots of interfaces don't use everything
9196
-Wno-missing-field-initializers; # Windows has loads of this sadly
92-
)
97+
)
Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include "ECFMP/eventbus/EventListener.h"
33
#include "EventDispatcher.h"
4+
#include "thread/ThreadPool.h"
45
#include <cassert>
56
#include <future>
67

@@ -10,9 +11,13 @@ namespace ECFMP::EventBus {
1011
class AsynchronousEventDispatcher : public EventDispatcher<EventType>
1112
{
1213
public:
13-
explicit AsynchronousEventDispatcher(std::shared_ptr<EventListener<EventType>> listener) : listener(listener)
14+
explicit AsynchronousEventDispatcher(
15+
std::shared_ptr<EventListener<EventType>> listener, std::shared_ptr<Thread::ThreadPool> threadPool
16+
)
17+
: listener(listener), threadPool(threadPool)
1418
{
1519
assert(listener != nullptr && "Listener cannot be null");
20+
assert(threadPool != nullptr && "Thread pool cannot be null");
1621
}
1722

1823
/**
@@ -22,17 +27,19 @@ namespace ECFMP::EventBus {
2227
{
2328
// Copy the listener to ensure it is not destroyed before the event is dispatched
2429
auto listenerCopy = listener;
25-
static_cast<void>(std::async(
26-
std::launch::async,
27-
[listenerCopy](const auto& event) {
28-
listenerCopy->OnEvent(event);
29-
},
30-
event
31-
));
30+
auto eventCopy = event;
31+
32+
// Dispatch the event asynchronously using the thread pool
33+
threadPool->Schedule([listenerCopy, eventCopy]() {
34+
listenerCopy->OnEvent(eventCopy);
35+
});
3236
};
3337

3438
private:
3539
// The event listener for this dispatcher
3640
std::shared_ptr<EventListener<EventType>> listener;
41+
42+
// The thread pool to use for dispatching events
43+
std::shared_ptr<Thread::ThreadPool> threadPool;
3744
};
3845
}// namespace ECFMP::EventBus

src/eventbus/EventDispatcherFactory.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,20 @@
55
#include "PendingEuroscopeEvents.h"
66
#include "SubscriptionFlags.h"
77
#include "SynchronousEventDispatcher.h"
8-
#include <assert.h>
8+
#include "thread/ThreadPool.h"
99

1010
namespace ECFMP::EventBus {
1111
class EventDispatcherFactory
1212
{
1313
public:
14-
explicit EventDispatcherFactory(const std::shared_ptr<PendingEuroscopeEvents>& pendingEuroscopeEvents)
15-
: pendingEuroscopeEvents(pendingEuroscopeEvents)
14+
EventDispatcherFactory(
15+
const std::shared_ptr<PendingEuroscopeEvents>& pendingEuroscopeEvents,
16+
const std::shared_ptr<Thread::ThreadPool>& threadPool
17+
)
18+
: pendingEuroscopeEvents(pendingEuroscopeEvents), threadPool(threadPool)
1619
{
1720
assert(pendingEuroscopeEvents != nullptr && "pendingEuroscopeEvents cannot be null");
21+
assert(threadPool != nullptr && "threadPool cannot be null");
1822
}
1923
virtual ~EventDispatcherFactory() = default;
2024

@@ -26,7 +30,7 @@ namespace ECFMP::EventBus {
2630
case EventDispatchMode::Sync:
2731
return std::make_shared<SynchronousEventDispatcher<EventType>>(listener);
2832
case EventDispatchMode::Async:
29-
return std::make_shared<AsynchronousEventDispatcher<EventType>>(listener);
33+
return std::make_shared<AsynchronousEventDispatcher<EventType>>(listener, threadPool);
3034
case EventDispatchMode::Euroscope:
3135
return std::make_shared<EuroscopeEventDispatcher<EventType>>(listener, pendingEuroscopeEvents);
3236
default:
@@ -37,5 +41,8 @@ namespace ECFMP::EventBus {
3741
private:
3842
// A place that stores events that should be dispatched on the Euroscope thread.
3943
std::shared_ptr<PendingEuroscopeEvents> pendingEuroscopeEvents;
44+
45+
// The thread pool to use for dispatching events
46+
std::shared_ptr<Thread::ThreadPool> threadPool;
4047
};
4148
}// namespace ECFMP::EventBus

src/eventbus/InternalEventBusFactory.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#include "eventbus/InternalEventBusFactory.h"
22
#include "eventbus/PendingEuroscopeEvents.h"
33
#include "plugin/InternalSdkEvents.h"
4+
#include "thread/ThreadPool.h"
45

56
namespace ECFMP::EventBus {
67
[[nodiscard]] auto MakeEventBus() -> std::shared_ptr<InternalEventBus>
78
{
9+
// Thread pool will get stopped when the eventbus is destroyed
10+
auto threadPool = std::make_shared<Thread::ThreadPool>();
811
auto pendingEuroscopeEvents = std::make_shared<PendingEuroscopeEvents>();
9-
auto eventDispatcherFactory = std::make_shared<EventDispatcherFactory>(pendingEuroscopeEvents);
12+
auto eventDispatcherFactory = std::make_shared<EventDispatcherFactory>(pendingEuroscopeEvents, threadPool);
1013

1114
auto eventBus = std::make_shared<InternalEventBus>(eventDispatcherFactory);
1215
eventBus->SubscribeSync<Plugin::EuroscopeTimerTickEvent>(pendingEuroscopeEvents);

src/pch.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include <Windows.h>
3+
#include <algorithm>
34
#include <cassert>
45
#include <chrono>
56
#include <functional>

src/plugin/InternalSdk.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace ECFMP::Plugin {
1919

2020
void InternalSdk::Destroy()
2121
{
22+
// Resetting the event bus naturally means we wait for threads to finish.
2223
eventBus.reset();
2324
}
2425

src/thread/ThreadPool.cpp

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#include "ThreadPool.h"
2+
#include <condition_variable>
3+
#include <mutex>
4+
#include <queue>
5+
#include <thread>
6+
7+
namespace ECFMP::Thread {
8+
struct ThreadPool::Impl {
9+
std::vector<std::thread> threads;
10+
11+
std::mutex mutex;
12+
13+
std::condition_variable condition;
14+
15+
std::queue<std::function<void()>> tasks;
16+
17+
bool running = true;
18+
};
19+
20+
ThreadPool::ThreadPool() : impl(std::make_unique<ThreadPool::Impl>())
21+
{
22+
// Create 2 threads in the pool
23+
for (int i = 0; i < 2; i++) {
24+
impl->threads.emplace_back([this]() {
25+
while (true) {
26+
std::function<void()> task;
27+
28+
// Run this block in a lock
29+
{
30+
std::unique_lock<std::mutex> lock(impl->mutex);
31+
32+
// Wait for a task to be available, or for the pool to be stopped
33+
impl->condition.wait(lock, [this]() {
34+
return !impl->running || !impl->tasks.empty();
35+
});
36+
37+
// If the pool is stopped, exit
38+
if (!impl->running) {
39+
return;
40+
}
41+
42+
// If the pool doesn't have a task, continue
43+
if (impl->tasks.empty()) {
44+
continue;
45+
}
46+
47+
// Grab the next task
48+
task = std::move(impl->tasks.front());
49+
impl->tasks.pop();
50+
}
51+
52+
// Run the task
53+
task();
54+
}
55+
});
56+
}
57+
}
58+
59+
ThreadPool::~ThreadPool()
60+
{
61+
// Join all threads
62+
{
63+
std::unique_lock<std::mutex> lock(impl->mutex);
64+
impl->running = false;
65+
}
66+
67+
impl->condition.notify_all();
68+
for (auto& thread: impl->threads) {
69+
thread.join();
70+
}
71+
}
72+
73+
void ThreadPool::Schedule(const std::function<void()>& function)
74+
{
75+
// Run this block in a lock, add the task to the queue
76+
{
77+
std::unique_lock<std::mutex> lock(impl->mutex);
78+
impl->tasks.emplace(function);
79+
}
80+
81+
// Notify a thread that a task is available
82+
impl->condition.notify_one();
83+
}
84+
}// namespace ECFMP::Thread

src/thread/ThreadPool.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#pragma once
2+
3+
namespace ECFMP::Thread {
4+
class ThreadPool
5+
{
6+
public:
7+
ThreadPool();
8+
~ThreadPool();
9+
10+
void Schedule(const std::function<void()>& function);
11+
12+
private:
13+
struct Impl;
14+
std::unique_ptr<Impl> impl;
15+
};
16+
}// namespace ECFMP::Thread

test/CMakeLists.txt

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ set(test__eventbus
3232

3333
set(test__flightinformationregion
3434
flightinformationregion/ConcreteFlightInformationRegionTest.cpp
35-
)
35+
)
3636

3737
set(test__flowmeasure
3838
flowmeasure/ConcreteAirportFilterTest.cpp
@@ -46,7 +46,7 @@ set(test__flowmeasure
4646

4747
set(test__log
4848
log/LogDecoratorTest.cpp
49-
)
49+
)
5050

5151
set(test__mock
5252
mock/MockEuroscopeAircraft.h
@@ -57,12 +57,15 @@ set(test__mock
5757
set(test__pch
5858
pch/pch.cpp
5959
pch/pch.h
60-
)
60+
)
6161

6262
set(test__plugin
6363
plugin/SdkFactoryTest.cpp
6464
plugin/InternalSdkTest.cpp)
6565

66+
set(test__thread
67+
thread/ThreadPoolTest.cpp)
68+
6669
set(test__other
6770
main.cpp
6871
pch/pch.cpp pch/pch.h
@@ -80,8 +83,9 @@ add_executable(${PROJECT_NAME}
8083
${test__log}
8184
${test__mock}
8285
${test__plugin}
86+
${test__thread}
8387
${test__other}
84-
)
88+
)
8589
add_test(NAME ${PROJECT_NAME} COMMAND ${PROJECT_NAME})
8690
target_precompile_headers(${PROJECT_NAME} PRIVATE "pch/pch.h")
8791

@@ -90,7 +94,7 @@ add_dependencies(${PROJECT_NAME}
9094
ecfmp_sdk
9195
gtest
9296
gmock
93-
)
97+
)
9498

9599
#### INCLUDES
96100
target_include_directories(${PROJECT_NAME} PUBLIC
@@ -99,12 +103,12 @@ target_include_directories(${PROJECT_NAME} PUBLIC
99103
"${CMAKE_CURRENT_SOURCE_DIR}/../include;"
100104
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/googletest/googlemock/include;"
101105
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/nlohmann;"
102-
)
106+
)
103107

104108
# Treat Euroscope as a system include directory to suppress warning, because they have lots
105109
target_include_directories(${PROJECT_NAME} SYSTEM PRIVATE
106110
"${CMAKE_CURRENT_SOURCE_DIR}/../third_party/euroscope;"
107-
)
111+
)
108112

109113
#### LINKS
110114
target_link_directories(
@@ -117,7 +121,7 @@ target_link_libraries(${PROJECT_NAME} PRIVATE
117121
gmock
118122
ecfmp_sdk
119123
"EuroScopePlugInDll;"
120-
)
124+
)
121125

122126
set_target_properties(${PROJECT_NAME} PROPERTIES COMPILE_FLAGS " -m32" LINK_FLAGS "-m32" JSON_MultipleHeaders "ON ")
123127

@@ -142,7 +146,7 @@ target_compile_options(${PROJECT_NAME} PRIVATE
142146
-Wno-unused-parameter; # Lots of interfaces don't use everything
143147
-Wno-missing-field-initializers; # Windows has loads of this sadly
144148
/EHa;
145-
)
149+
)
146150

147151
#### LINK OPTIONS
148152
target_link_options(${PROJECT_NAME} PRIVATE
@@ -159,10 +163,10 @@ target_link_options(${PROJECT_NAME} PRIVATE
159163
>
160164
/NODEFAULTLIB:LIBCMT;
161165
/SUBSYSTEM:CONSOLE;
162-
)
166+
)
163167

164168
# Post-build copy the EuroScope binary
165169
add_custom_command(TARGET ${PROJECT_NAME} POST_BUILD
166170
COMMAND ${CMAKE_COMMAND} -E copy "${CMAKE_CURRENT_SOURCE_DIR}/../lib/EuroScopePlugInDll.dll" "${PROJECT_BINARY_DIR}/EuroScopePlugInDll.dll"
167171
COMMENT "Copied EuroScope shared library to ${PROJECT_BINARY_DIR}/EuroScopePlugInDll.dll"
168-
)
172+
)

0 commit comments

Comments
 (0)