Skip to content

Commit af9bed7

Browse files
fix bad test
1 parent fd6d7fb commit af9bed7

3 files changed

Lines changed: 21 additions & 97 deletions

File tree

include/livekit/room.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,6 @@ class LIVEKIT_API Room {
306306

307307
mutable std::mutex lock_;
308308
ConnectionState connection_state_ = ConnectionState::Disconnected;
309-
bool connect_in_progress_ = false;
310309
RoomDelegate* delegate_ = nullptr; // Not owned
311310
RoomInfoData room_info_;
312311
std::shared_ptr<FfiHandle> room_handle_;

src/room.cpp

Lines changed: 9 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616

1717
#include "livekit/room.h"
1818

19-
#include <chrono>
2019
#include <functional>
21-
#include <thread>
2220

2321
#include "data_track.pb.h"
2422
#include "ffi.pb.h"
@@ -111,7 +109,6 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO
111109
if (connection_state_ != ConnectionState::Disconnected) {
112110
throw std::runtime_error("already connected");
113111
}
114-
connect_in_progress_ = true;
115112
connection_state_ = ConnectionState::Reconnecting;
116113
}
117114

@@ -190,16 +187,11 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO
190187
connection_state_ = ConnectionState::Connected;
191188
}
192189

193-
{
194-
const std::scoped_lock<std::mutex> g(lock_);
195-
connect_in_progress_ = false;
196-
}
197190
return true;
198191
} catch (const std::exception& e) {
199192
int listener_to_remove = 0;
200193
{
201194
const std::scoped_lock<std::mutex> g(lock_);
202-
connect_in_progress_ = false;
203195
connection_state_ = ConnectionState::Disconnected;
204196
if (listener_id_ == listenerId) {
205197
listener_to_remove = listener_id_;
@@ -361,39 +353,28 @@ void Room::OnEvent(const FfiEvent& event) {
361353
// Take a snapshot of the delegate under lock, but do NOT call it under the
362354
// lock.
363355
RoomDelegate* delegate_snapshot = nullptr;
364-
bool connect_in_progress = false;
365356
{
366357
const std::scoped_lock<std::mutex> guard(lock_);
367358
delegate_snapshot = delegate_;
368-
connect_in_progress = connect_in_progress_;
369359
}
370360

371361
// First, handle RPC method invocations (not part of RoomEvent).
372362
if (event.message_case() == FfiEvent::kRpcMethodInvocation) {
373363
const auto& rpc = event.rpc_method_invocation();
374364

375365
LocalParticipant* lp = nullptr;
376-
bool local_participant_missing_during_connect = false;
377366
{
378367
const std::scoped_lock<std::mutex> guard(lock_);
379368
if (!local_participant_) {
380-
local_participant_missing_during_connect = connect_in_progress_;
381-
} else {
382-
auto local_handle = local_participant_->ffiHandleId();
383-
if (local_handle == INVALID_HANDLE ||
384-
rpc.local_participant_handle() != static_cast<std::uint64_t>(local_handle)) {
385-
// RPC is not targeted at this room's local participant; ignore.
386-
return;
387-
}
388-
lp = local_participant_.get();
369+
return;
389370
}
390-
}
391-
if (local_participant_missing_during_connect) {
392-
LK_LOG_WARN("dropping RPC invocation while Room::Connect is still publishing local participant state");
393-
return;
394-
}
395-
if (!lp) {
396-
return;
371+
auto local_handle = local_participant_->ffiHandleId();
372+
if (local_handle == INVALID_HANDLE ||
373+
rpc.local_participant_handle() != static_cast<std::uint64_t>(local_handle)) {
374+
// RPC is not targeted at this room's local participant; ignore.
375+
return;
376+
}
377+
lp = local_participant_.get();
397378
}
398379

399380
// Call outside the lock to avoid deadlocks / re-entrancy issues.
@@ -408,43 +389,13 @@ void Room::OnEvent(const FfiEvent& event) {
408389
const proto::RoomEvent& re = event.room_event();
409390

410391
// Check if this event is for our room handle
411-
bool missing_room_handle = false;
412-
bool missing_local_participant = false;
413-
ConnectionState connection_state = ConnectionState::Disconnected;
414392
{
415393
const std::scoped_lock<std::mutex> guard(lock_);
416-
connection_state = connection_state_;
417-
if (!room_handle_) {
418-
missing_room_handle = true;
419-
} else if (re.room_handle() != static_cast<std::uint64_t>(room_handle_->get())) {
394+
if (!room_handle_ || re.room_handle() != static_cast<std::uint64_t>(room_handle_->get())) {
420395
return;
421-
} else if (!local_participant_) {
422-
missing_local_participant = true;
423396
}
424397
}
425398

426-
if (missing_room_handle) {
427-
LK_LOG_WARN(
428-
"dropping room event {} before room handle is initialized; "
429-
"connect_in_progress={}, connection_state={}",
430-
static_cast<int>(re.message_case()), connect_in_progress, static_cast<int>(connection_state));
431-
return;
432-
}
433-
434-
if (missing_local_participant) {
435-
LK_LOG_ERROR(
436-
"room event {} matched the room handle before local participant state was initialized; "
437-
"room state is incomplete",
438-
static_cast<int>(re.message_case()));
439-
}
440-
441-
if (connect_in_progress && delegate_snapshot) {
442-
LK_LOG_WARN(
443-
"room event {} arrived while Room::Connect is still in progress; "
444-
"any delegate callback from this event will run before Connect() returns",
445-
static_cast<int>(re.message_case()));
446-
}
447-
448399
switch (re.message_case()) {
449400
case proto::RoomEvent::kParticipantConnected: {
450401
std::shared_ptr<RemoteParticipant> new_participant;
@@ -1189,7 +1140,6 @@ void Room::OnEvent(const FfiEvent& event) {
11891140
listener_id_ = 0;
11901141

11911142
// Reset connection state
1192-
connect_in_progress_ = false;
11931143
connection_state_ = ConnectionState::Disconnected;
11941144

11951145
// Move state out for cleanup outside lock

src/tests/integration/test_late_join_track_publication.cpp

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,9 @@ class PublishedTrackGuard {
271271
std::vector<std::shared_ptr<LocalDataTrack>> data_tracks_;
272272
};
273273

274-
bool hasExpectedMediaCallbacks(const LateJoinPublicationState& state,
275-
const std::vector<ExpectedPublication>& expected_media) {
274+
bool hasExpectedMediaSubscriptions(const LateJoinPublicationState& state,
275+
const std::vector<ExpectedPublication>& expected_media) {
276276
for (const auto& expected : expected_media) {
277-
const auto published_it = state.published_media_tracks.find(expected.name);
278-
if (published_it == state.published_media_tracks.end() || published_it->second != expected.kind) {
279-
return false;
280-
}
281-
282277
const auto subscribed_it = state.subscribed_media_tracks.find(expected.name);
283278
if (subscribed_it == state.subscribed_media_tracks.end() || subscribed_it->second != expected.kind) {
284279
return false;
@@ -401,38 +396,28 @@ TEST_P(LateJoinTrackPublicationIntegrationTest, ConsumerReceivesAlreadyPublished
401396

402397
{
403398
std::unique_lock<std::mutex> lock(state.mutex);
399+
// Pre-existing media publications are delivered in the Connect snapshot, not as
400+
// TrackPublished room events. The late-joiner should still receive TrackSubscribed
401+
// callbacks once auto-subscribe attaches to those snapshot publications.
404402
const bool got_expected =
405-
state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedMediaCallbacks(state, expected_media); });
406-
EXPECT_TRUE(got_expected) << "Timed out waiting for late-join audio publication/subscription events\n"
403+
state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedMediaSubscriptions(state, expected_media); });
404+
EXPECT_TRUE(got_expected) << "Timed out waiting for late-join audio subscription events\n"
407405
<< "Published media events: " << describeMediaTracks(state.published_media_tracks) << "\n"
408406
<< "Subscribed media events: " << describeMediaTracks(state.subscribed_media_tracks);
409407
}
410408

411-
std::map<std::string, TrackKind> published_media_snapshot;
412409
std::map<std::string, TrackKind> subscribed_media_snapshot;
413-
std::map<std::string, int> published_media_counts;
414410
std::map<std::string, int> subscribed_media_counts;
415411
std::vector<std::string> invariant_failures;
416412
{
417413
std::lock_guard<std::mutex> lock(state.mutex);
418-
published_media_snapshot = state.published_media_tracks;
419414
subscribed_media_snapshot = state.subscribed_media_tracks;
420-
published_media_counts = state.published_media_counts;
421415
subscribed_media_counts = state.subscribed_media_counts;
422416
invariant_failures = state.invariant_failures;
423417
}
424418
EXPECT_TRUE(invariant_failures.empty()) << describeInvariantFailures(invariant_failures);
425419

426420
for (const auto& expected : expected_media) {
427-
const auto published_it = published_media_snapshot.find(expected.name);
428-
EXPECT_NE(published_it, published_media_snapshot.end())
429-
<< "Missing onTrackPublished event for " << expected.name
430-
<< "; received: " << describeMediaTracks(published_media_snapshot);
431-
if (published_it != published_media_snapshot.end()) {
432-
EXPECT_EQ(published_it->second, expected.kind) << "Published track kind mismatch for " << expected.name;
433-
}
434-
EXPECT_EQ(published_media_counts[expected.name], 1) << "Unexpected onTrackPublished count for " << expected.name;
435-
436421
const auto subscribed_it = subscribed_media_snapshot.find(expected.name);
437422
EXPECT_NE(subscribed_it, subscribed_media_snapshot.end())
438423
<< "Missing onTrackSubscribed event for " << expected.name
@@ -516,38 +501,28 @@ TEST_P(LateJoinTrackPublicationIntegrationTest, ConsumerReceivesAlreadyPublished
516501

517502
{
518503
std::unique_lock<std::mutex> lock(state.mutex);
504+
// Pre-existing media publications are delivered in the Connect snapshot, not as
505+
// TrackPublished room events. The late-joiner should still receive TrackSubscribed
506+
// callbacks once auto-subscribe attaches to those snapshot publications.
519507
const bool got_expected =
520-
state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedMediaCallbacks(state, expected_media); });
521-
EXPECT_TRUE(got_expected) << "Timed out waiting for late-join video publication/subscription events\n"
508+
state.cv.wait_for(lock, kWaitTimeout, [&]() { return hasExpectedMediaSubscriptions(state, expected_media); });
509+
EXPECT_TRUE(got_expected) << "Timed out waiting for late-join video subscription events\n"
522510
<< "Published media events: " << describeMediaTracks(state.published_media_tracks) << "\n"
523511
<< "Subscribed media events: " << describeMediaTracks(state.subscribed_media_tracks);
524512
}
525513

526-
std::map<std::string, TrackKind> published_media_snapshot;
527514
std::map<std::string, TrackKind> subscribed_media_snapshot;
528-
std::map<std::string, int> published_media_counts;
529515
std::map<std::string, int> subscribed_media_counts;
530516
std::vector<std::string> invariant_failures;
531517
{
532518
std::lock_guard<std::mutex> lock(state.mutex);
533-
published_media_snapshot = state.published_media_tracks;
534519
subscribed_media_snapshot = state.subscribed_media_tracks;
535-
published_media_counts = state.published_media_counts;
536520
subscribed_media_counts = state.subscribed_media_counts;
537521
invariant_failures = state.invariant_failures;
538522
}
539523
EXPECT_TRUE(invariant_failures.empty()) << describeInvariantFailures(invariant_failures);
540524

541525
for (const auto& expected : expected_media) {
542-
const auto published_it = published_media_snapshot.find(expected.name);
543-
EXPECT_NE(published_it, published_media_snapshot.end())
544-
<< "Missing onTrackPublished event for " << expected.name
545-
<< "; received: " << describeMediaTracks(published_media_snapshot);
546-
if (published_it != published_media_snapshot.end()) {
547-
EXPECT_EQ(published_it->second, expected.kind) << "Published track kind mismatch for " << expected.name;
548-
}
549-
EXPECT_EQ(published_media_counts[expected.name], 1) << "Unexpected onTrackPublished count for " << expected.name;
550-
551526
const auto subscribed_it = subscribed_media_snapshot.find(expected.name);
552527
EXPECT_NE(subscribed_it, subscribed_media_snapshot.end())
553528
<< "Missing onTrackSubscribed event for " << expected.name

0 commit comments

Comments
 (0)