Skip to content

relay: isolate relay state on dedicated executor to support multiple I/O threads#362

Open
afrind wants to merge 2 commits into
mainfrom
pr362
Open

relay: isolate relay state on dedicated executor to support multiple I/O threads#362
afrind wants to merge 2 commits into
mainfrom
pr362

Conversation

@afrind

@afrind afrind commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

@akash-a-n akash-a-n left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

@akash-a-n reviewed 25 files and all commit messages, and made 2 comments.
Reviewable status: 25 of 26 files reviewed, 1 unresolved discussion (waiting on afrind, gmarzot, michalhosna, mondain, Oxyd, peterchave, suhasHere, and TimEvens).


src/MoqxRelay.cpp line 872 at r4 (raw file):

    }
  }
  if (session.get() == upstreamSession.get()) {

is self fetch not a concern anymore? Claude just flagged this.

@michalhosna michalhosna left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michalhosna made 1 comment.
Reviewable status: 25 of 26 files reviewed, 2 unresolved discussions (waiting on afrind, gmarzot, mondain, Oxyd, peterchave, suhasHere, and TimEvens).


src/MoqxRelay.h line 129 at r4 (raw file):

  // cross-executor filters. relayExec must outlive this relay.
  // If not set (default), all operations run on the calling thread.
  void setRelayExec(folly::Executor* relayExec) { relayExec_ = relayExec; }

It seem unsafe to call setRelayExec after the relay starts to do anything, so why is it not in constructor? Or some kind of safe-check that it's called only once.

Looking at #363 its used there in resetRelay in the test fixture, but I am not sure it's correct. When alling this on relay where ownedRelayExec_ != null, the ownedRelayExec_ stays dangling, that's probably fine?

Is this whole method with sharing executor tests only? Should we gate it more?

Reviewing this one without #363 seems to not make much sense. I'll redo my review there.

@afrind afrind left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afrind made 8 comments.
Reviewable status: 25 of 26 files reviewed, 8 unresolved discussions (waiting on akash-a-n, gmarzot, michalhosna, mondain, Oxyd, peterchave, suhasHere, and TimEvens).


src/MoqxRelay.h line 129 at r4 (raw file):

Previously, michalhosna (Michal Hošna) wrote…

It seem unsafe to call setRelayExec after the relay starts to do anything, so why is it not in constructor? Or some kind of safe-check that it's called only once.

Looking at #363 its used there in resetRelay in the test fixture, but I am not sure it's correct. When alling this on relay where ownedRelayExec_ != null, the ownedRelayExec_ stays dangling, that's probably fine?

Is this whole method with sharing executor tests only? Should we gate it more?

Reviewing this one without #363 seems to not make much sense. I'll redo my review there.

👍


src/MoqxRelay.cpp line 872 at r4 (raw file):

Previously, akash-a-n wrote…

is self fetch not a concern anymore? Claude just flagged this.

I asked in moq slack, folks seem to think self-fetch should be allowed.


src/MoqxRelay.cpp line 403 at r4 (raw file):

  if (relayExec_) {
    forwarder->setCallback(

It's not obvious why this needs CrossExec and I had to run through it to remember. Most of the forwarder callbacks are invoked directly from the relay thread, making it not necessary. However, when the subscriber unsubscribes or does a requestUpdate, etc, that needs to x-exec.

Add a comment


src/MoqxRelay.cpp line 518 at r4 (raw file):

      forward,
      pinned,
      relayExec_ ? session->getExecutor() : nullptr

This would be clearer with a /*subscriberExec=*/


src/MoqxRelay.cpp line 527 at r4 (raw file):

  // single-thread (relayExec_ == nullptr) this is the subscriber's exec.
  co_withExecutor(
      relayExec_ ? static_cast<folly::Executor*>(relayExec_) : session->getExecutor(),

Same. /subscriberExec=/


src/MoqxRelay.cpp line 1105 at r4 (raw file):

    auto exec = relayExec();
    co_withExecutor(
        folly::getKeepAliveToken(exec),

We seem inconsistent about using a raw exec vs keepAliveToken


src/MoqxRelayContext.h line 118 at r4 (raw file):

  // own executor, isolating relay state from I/O threads.  Null when disabled.
  // Each relay owns its MoQFollyExecutorImpl; the pool just keeps threads alive.
  std::shared_ptr<folly::IOThreadPoolExecutor> relayThreadPool_;

Can/should this be unique_ptr?


src/MoqxRelayContext.cpp line 39 at r4 (raw file):

    size_t i = 0;
    for (const auto& [name, svc] : services) {
      auto relay = std::make_shared<MoqxRelay>(svc.cache, relayID);

I think passing the exec to the ctor seems very reasonable.

Adds a use_relay_thread boolean config option (default: true) that
controls whether relay state is isolated on a dedicated executor thread.
Disabling it is intended for baseline performance comparison only.

Also removes the hard error that rejected threads > 1, replacing it
with a targeted check: threads > 1 requires use_relay_thread=true.
This unlocks the config validation only — threads > 1 with
use_relay_thread=true will race on shared relay state until the
following commit wires up the dedicated relay executor.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants