@@ -294,6 +294,10 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v
294294 mJni ->initializeForCurrentThread ();
295295 runCmdLooper ();
296296 });
297+ mTimerThread = std::thread ([this ]() {
298+ mJni ->initializeForCurrentThread ();
299+ runTimers ();
300+ });
297301
298302 const auto mountedRootNames = adoptMountedInstances ();
299303 mountExistingImages (mountedRootNames);
@@ -306,7 +310,13 @@ IncrementalService::~IncrementalService() {
306310 }
307311 mJobCondition .notify_all ();
308312 mJobProcessor .join ();
313+ mTimerCondition .notify_all ();
314+ mTimerThread .join ();
309315 mCmdLooperThread .join ();
316+ mTimedJobs .clear ();
317+ // Ensure that mounts are destroyed while the service is still valid.
318+ mBindsByPath .clear ();
319+ mMounts .clear ();
310320}
311321
312322static const char * toString (IncrementalService::BindKind kind) {
@@ -1700,6 +1710,55 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) {
17001710 }
17011711}
17021712
1713+ void IncrementalService::addTimedJob (MountId id, TimePoint when, Job what) {
1714+ if (id == kInvalidStorageId ) {
1715+ return ;
1716+ }
1717+ {
1718+ std::unique_lock lock (mTimerMutex );
1719+ mTimedJobs .insert (TimedJob{id, when, std::move (what)});
1720+ }
1721+ mTimerCondition .notify_all ();
1722+ }
1723+
1724+ void IncrementalService::removeTimedJobs (MountId id) {
1725+ if (id == kInvalidStorageId ) {
1726+ return ;
1727+ }
1728+ {
1729+ std::unique_lock lock (mTimerMutex );
1730+ std::erase_if (mTimedJobs , [id](auto && item) { return item.id == id; });
1731+ }
1732+ }
1733+
1734+ void IncrementalService::runTimers () {
1735+ static constexpr TimePoint kInfinityTs {Clock::duration::max ()};
1736+ TimePoint nextTaskTs = kInfinityTs ;
1737+ for (;;) {
1738+ std::unique_lock lock (mTimerMutex );
1739+ mTimerCondition .wait_until (lock, nextTaskTs, [this ]() {
1740+ auto now = Clock::now ();
1741+ return !mRunning || (!mTimedJobs .empty () && mTimedJobs .begin ()->when <= now);
1742+ });
1743+ if (!mRunning ) {
1744+ return ;
1745+ }
1746+
1747+ auto now = Clock::now ();
1748+ auto it = mTimedJobs .begin ();
1749+ // Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
1750+ for (; it != mTimedJobs .end () && it->when <= now; it = mTimedJobs .begin ()) {
1751+ auto job = it->what ;
1752+ mTimedJobs .erase (it);
1753+
1754+ lock.unlock ();
1755+ job ();
1756+ lock.lock ();
1757+ }
1758+ nextTaskTs = it != mTimedJobs .end () ? it->when : kInfinityTs ;
1759+ }
1760+ }
1761+
17031762IncrementalService::DataLoaderStub::DataLoaderStub (IncrementalService& service, MountId id,
17041763 DataLoaderParamsParcel&& params,
17051764 FileSystemControlParcel&& control,
@@ -1713,10 +1772,17 @@ IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service,
17131772 mControl(std::move(control)),
17141773 mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()),
17151774 mHealthListener(healthListener ? *healthListener : StorageHealthListener()),
1716- mHealthPath(std::move(healthPath)) {
1717- // TODO(b/153874006): enable external health listener.
1718- mHealthListener = {};
1719- healthStatusOk ();
1775+ mHealthPath(std::move(healthPath)),
1776+ mHealthCheckParams(std::move(healthCheckParams)) {
1777+ if (mHealthListener ) {
1778+ if (!isHealthParamsValid ()) {
1779+ mHealthListener = {};
1780+ }
1781+ } else {
1782+ // Disable advanced health check statuses.
1783+ mHealthCheckParams .blockedTimeoutMs = -1 ;
1784+ }
1785+ updateHealthStatus ();
17201786}
17211787
17221788IncrementalService::DataLoaderStub::~DataLoaderStub () {
@@ -1726,21 +1792,29 @@ IncrementalService::DataLoaderStub::~DataLoaderStub() {
17261792}
17271793
17281794void IncrementalService::DataLoaderStub::cleanupResources () {
1729- requestDestroy ();
1730-
17311795 auto now = Clock::now ();
1732- std::unique_lock lock (mMutex );
1796+ {
1797+ std::unique_lock lock (mMutex );
1798+ mHealthPath .clear ();
1799+ unregisterFromPendingReads ();
1800+ resetHealthControl ();
1801+ mService .removeTimedJobs (mId );
1802+ }
17331803
1734- unregisterFromPendingReads ();
1804+ requestDestroy ();
17351805
1736- mParams = {};
1737- mControl = {};
1738- mStatusCondition .wait_until (lock, now + 60s, [this ] {
1739- return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
1740- });
1741- mStatusListener = {};
1742- mHealthListener = {};
1743- mId = kInvalidStorageId ;
1806+ {
1807+ std::unique_lock lock (mMutex );
1808+ mParams = {};
1809+ mControl = {};
1810+ mHealthControl = {};
1811+ mHealthListener = {};
1812+ mStatusCondition .wait_until (lock, now + 60s, [this ] {
1813+ return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
1814+ });
1815+ mStatusListener = {};
1816+ mId = kInvalidStorageId ;
1817+ }
17441818}
17451819
17461820sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader () {
@@ -1838,7 +1912,7 @@ bool IncrementalService::DataLoaderStub::fsmStep() {
18381912 targetStatus = mTargetStatus ;
18391913 }
18401914
1841- LOG (DEBUG) << " fsmStep: " << mId << " : " << currentStatus << " -> " << targetStatus;
1915+ LOG (DEBUG) << " fsmStep: " << id () << " : " << currentStatus << " -> " << targetStatus;
18421916
18431917 if (currentStatus == targetStatus) {
18441918 return true ;
@@ -1920,42 +1994,167 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount
19201994 return binder::Status::ok ();
19211995}
19221996
1923- void IncrementalService::DataLoaderStub::healthStatusOk () {
1924- LOG (DEBUG) << " healthStatusOk: " << mId ;
1925- std::unique_lock lock (mMutex );
1926- registerForPendingReads ();
1997+ bool IncrementalService::DataLoaderStub::isHealthParamsValid () const {
1998+ return mHealthCheckParams .blockedTimeoutMs > 0 &&
1999+ mHealthCheckParams .blockedTimeoutMs < mHealthCheckParams .unhealthyTimeoutMs ;
2000+ }
2001+
2002+ void IncrementalService::DataLoaderStub::onHealthStatus (StorageHealthListener healthListener,
2003+ int healthStatus) {
2004+ LOG (DEBUG) << id () << " : healthStatus: " << healthStatus;
2005+ if (healthListener) {
2006+ healthListener->onHealthStatus (id (), healthStatus);
2007+ }
2008+ }
2009+
2010+ void IncrementalService::DataLoaderStub::updateHealthStatus (bool baseline) {
2011+ LOG (DEBUG) << id () << " : updateHealthStatus" << (baseline ? " (baseline)" : " " );
2012+
2013+ int healthStatusToReport = -1 ;
2014+ StorageHealthListener healthListener;
2015+
2016+ {
2017+ std::unique_lock lock (mMutex );
2018+ unregisterFromPendingReads ();
2019+
2020+ healthListener = mHealthListener ;
2021+
2022+ // Healthcheck depends on timestamp of the oldest pending read.
2023+ // To get it, we need to re-open a pendingReads FD to get a full list of reads.
2024+ // Additionally we need to re-register for epoll with fresh FDs in case there are no reads.
2025+ const auto now = Clock::now ();
2026+ const auto kernelTsUs = getOldestPendingReadTs ();
2027+ if (baseline) {
2028+ // Updating baseline only on looper/epoll callback, i.e. on new set of pending reads.
2029+ mHealthBase = {now, kernelTsUs};
2030+ }
2031+
2032+ if (kernelTsUs == kMaxBootClockTsUs || mHealthBase .userTs > now ||
2033+ mHealthBase .kernelTsUs > kernelTsUs) {
2034+ LOG (DEBUG) << id () << " : No pending reads or invalid base, report Ok and wait." ;
2035+ registerForPendingReads ();
2036+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
2037+ lock.unlock ();
2038+ onHealthStatus (healthListener, healthStatusToReport);
2039+ return ;
2040+ }
2041+
2042+ resetHealthControl ();
2043+
2044+ // Always make sure the data loader is started.
2045+ setTargetStatusLocked (IDataLoaderStatusListener::DATA_LOADER_STARTED);
2046+
2047+ // Skip any further processing if health check params are invalid.
2048+ if (!isHealthParamsValid ()) {
2049+ LOG (DEBUG) << id ()
2050+ << " : Skip any further processing if health check params are invalid." ;
2051+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
2052+ lock.unlock ();
2053+ onHealthStatus (healthListener, healthStatusToReport);
2054+ // Triggering data loader start. This is a one-time action.
2055+ fsmStep ();
2056+ return ;
2057+ }
2058+
2059+ const auto blockedTimeout = std::chrono::milliseconds (mHealthCheckParams .blockedTimeoutMs );
2060+ const auto unhealthyTimeout =
2061+ std::chrono::milliseconds (mHealthCheckParams .unhealthyTimeoutMs );
2062+ const auto unhealthyMonitoring =
2063+ std::max (1000ms,
2064+ std::chrono::milliseconds (mHealthCheckParams .unhealthyMonitoringMs ));
2065+
2066+ const auto kernelDeltaUs = kernelTsUs - mHealthBase .kernelTsUs ;
2067+ const auto userTs = mHealthBase .userTs + std::chrono::microseconds (kernelDeltaUs);
2068+ const auto delta = now - userTs;
2069+
2070+ TimePoint whenToCheckBack;
2071+ if (delta < blockedTimeout) {
2072+ LOG (DEBUG) << id () << " : Report reads pending and wait for blocked status." ;
2073+ whenToCheckBack = userTs + blockedTimeout;
2074+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
2075+ } else if (delta < unhealthyTimeout) {
2076+ LOG (DEBUG) << id () << " : Report blocked and wait for unhealthy." ;
2077+ whenToCheckBack = userTs + unhealthyTimeout;
2078+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
2079+ } else {
2080+ LOG (DEBUG) << id () << " : Report unhealthy and continue monitoring." ;
2081+ whenToCheckBack = now + unhealthyMonitoring;
2082+ healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
2083+ }
2084+ LOG (DEBUG) << id () << " : updateHealthStatus in "
2085+ << double (std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
2086+ now)
2087+ .count ()) /
2088+ 1000.0
2089+ << " secs" ;
2090+ mService .addTimedJob (id (), whenToCheckBack, [this ]() { updateHealthStatus (); });
2091+ }
2092+
2093+ if (healthStatusToReport != -1 ) {
2094+ onHealthStatus (healthListener, healthStatusToReport);
2095+ }
2096+
2097+ fsmStep ();
19272098}
19282099
1929- void IncrementalService::DataLoaderStub::healthStatusReadsPending () {
1930- LOG (DEBUG) << " healthStatusReadsPending: " << mId ;
1931- requestStart ();
2100+ const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl () {
2101+ if (mHealthPath .empty ()) {
2102+ resetHealthControl ();
2103+ return mHealthControl ;
2104+ }
2105+ if (mHealthControl .pendingReads () < 0 ) {
2106+ mHealthControl = mService .mIncFs ->openMount (mHealthPath );
2107+ }
2108+ if (mHealthControl .pendingReads () < 0 ) {
2109+ LOG (ERROR) << " Failed to open health control for: " << id () << " , path: " << mHealthPath
2110+ << " (" << mHealthControl .cmd () << " :" << mHealthControl .pendingReads () << " :"
2111+ << mHealthControl .logs () << " )" ;
2112+ }
2113+ return mHealthControl ;
2114+ }
19322115
1933- std::unique_lock lock ( mMutex );
1934- unregisterFromPendingReads () ;
2116+ void IncrementalService::DataLoaderStub::resetHealthControl () {
2117+ mHealthControl = {} ;
19352118}
19362119
1937- void IncrementalService::DataLoaderStub::healthStatusBlocked () {}
2120+ BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs () {
2121+ auto result = kMaxBootClockTsUs ;
2122+
2123+ const auto & control = initializeHealthControl ();
2124+ if (control.pendingReads () < 0 ) {
2125+ return result;
2126+ }
2127+
2128+ std::vector<incfs::ReadInfo> pendingReads;
2129+ if (mService .mIncFs ->waitForPendingReads (control, 0ms, &pendingReads) !=
2130+ android::incfs::WaitResult::HaveData ||
2131+ pendingReads.empty ()) {
2132+ return result;
2133+ }
2134+
2135+ LOG (DEBUG) << id () << " : pendingReads: " << control.pendingReads () << " , "
2136+ << pendingReads.size () << " : " << pendingReads.front ().bootClockTsUs ;
19382137
1939- void IncrementalService::DataLoaderStub::healthStatusUnhealthy () {}
2138+ for (auto && pendingRead : pendingReads) {
2139+ result = std::min (result, pendingRead.bootClockTsUs );
2140+ }
2141+ return result;
2142+ }
19402143
19412144void IncrementalService::DataLoaderStub::registerForPendingReads () {
1942- auto pendingReadsFd = mHealthControl .pendingReads ();
2145+ const auto pendingReadsFd = mHealthControl .pendingReads ();
19432146 if (pendingReadsFd < 0 ) {
1944- mHealthControl = mService .mIncFs ->openMount (mHealthPath );
1945- pendingReadsFd = mHealthControl .pendingReads ();
1946- if (pendingReadsFd < 0 ) {
1947- LOG (ERROR) << " Failed to open health control for: " << mId << " , path: " << mHealthPath
1948- << " (" << mHealthControl .cmd () << " :" << mHealthControl .pendingReads () << " :"
1949- << mHealthControl .logs () << " )" ;
1950- return ;
1951- }
2147+ return ;
19522148 }
19532149
2150+ LOG (DEBUG) << id () << " : addFd(pendingReadsFd): " << pendingReadsFd;
2151+
19542152 mService .mLooper ->addFd (
19552153 pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT,
19562154 [](int , int , void * data) -> int {
19572155 auto && self = (DataLoaderStub*)data;
1958- return self->onPendingReads ();
2156+ self->updateHealthStatus (/* baseline=*/ true );
2157+ return 0 ;
19592158 },
19602159 this );
19612160 mService .mLooper ->wake ();
@@ -1967,19 +2166,10 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() {
19672166 return ;
19682167 }
19692168
2169+ LOG (DEBUG) << id () << " : removeFd(pendingReadsFd): " << pendingReadsFd;
2170+
19702171 mService .mLooper ->removeFd (pendingReadsFd);
19712172 mService .mLooper ->wake ();
1972-
1973- mHealthControl = {};
1974- }
1975-
1976- int IncrementalService::DataLoaderStub::onPendingReads () {
1977- if (!mService .mRunning .load (std::memory_order_relaxed)) {
1978- return 0 ;
1979- }
1980-
1981- healthStatusReadsPending ();
1982- return 0 ;
19832173}
19842174
19852175void IncrementalService::DataLoaderStub::onDump (int fd) {
0 commit comments