[incremental/pm] set health listener on commit and on reboot
This changes allow Incremental Service to directly report health status
to package manager service.
A health listener is created during package installation session to
monitor incremental storage health. After commit, a new listener is
created and will overwrite the old one.
The new listener will listen to incremental storage health and report
the status to package manager service, which will then send the status
to IncrementalStates, where the startability state and unstartable
reason might change, based on the health status code.
During reboot, for each incremental package, if it is not fully loaded,
the package manager service will register a health status listener to
continue monitor the health status of this package.
Test: unit test
Test: manual
BUG: 170435166
Change-Id: I220f230c523cfaf2c96019f9478554665e6af486
diff --git a/services/incremental/BinderIncrementalService.cpp b/services/incremental/BinderIncrementalService.cpp
index 2f8825b..a31aac9 100644
--- a/services/incremental/BinderIncrementalService.cpp
+++ b/services/incremental/BinderIncrementalService.cpp
@@ -323,6 +323,22 @@
return ok();
}
+binder::Status BinderIncrementalService::registerStorageHealthListener(
+ int32_t storageId,
+ const ::android::os::incremental::StorageHealthCheckParams& healthCheckParams,
+ const ::android::sp<IStorageHealthListener>& healthListener, bool* _aidl_return) {
+ *_aidl_return = mImpl.registerStorageHealthListener(storageId,
+ const_cast<StorageHealthCheckParams&&>(
+ healthCheckParams),
+ healthListener);
+ return ok();
+}
+
+binder::Status BinderIncrementalService::unregisterStorageHealthListener(int32_t storageId) {
+ mImpl.unregisterStorageHealthListener(storageId);
+ return ok();
+}
+
} // namespace android::os::incremental
jlong Incremental_IncrementalService_Start(JNIEnv* env) {
diff --git a/services/incremental/BinderIncrementalService.h b/services/incremental/BinderIncrementalService.h
index 0a89166..8afa0f7 100644
--- a/services/incremental/BinderIncrementalService.h
+++ b/services/incremental/BinderIncrementalService.h
@@ -89,6 +89,11 @@
progressListener,
bool* _aidl_return) final;
binder::Status unregisterLoadingProgressListener(int32_t storageId, bool* _aidl_return) final;
+ binder::Status registerStorageHealthListener(
+ int32_t storageId,
+ const ::android::os::incremental::StorageHealthCheckParams& healthCheckParams,
+ const ::android::sp<IStorageHealthListener>& healthListener, bool* _aidl_return) final;
+ binder::Status unregisterStorageHealthListener(int32_t storageId) final;
private:
android::incremental::IncrementalService mImpl;
diff --git a/services/incremental/IncrementalService.cpp b/services/incremental/IncrementalService.cpp
index 5f145f3..599ac93 100644
--- a/services/incremental/IncrementalService.cpp
+++ b/services/incremental/IncrementalService.cpp
@@ -1801,6 +1801,31 @@
return removeTimedJobs(*mProgressUpdateJobQueue, storage);
}
+bool IncrementalService::registerStorageHealthListener(
+ StorageId storage, StorageHealthCheckParams&& healthCheckParams,
+ const StorageHealthListener& healthListener) {
+ DataLoaderStubPtr dataLoaderStub;
+ {
+ std::unique_lock l(mLock);
+ const auto& ifs = getIfsLocked(storage);
+ if (!ifs) {
+ return false;
+ }
+ dataLoaderStub = ifs->dataLoaderStub;
+ if (!dataLoaderStub) {
+ return false;
+ }
+ }
+ dataLoaderStub->setHealthListener(std::move(healthCheckParams), &healthListener);
+ return true;
+}
+
+void IncrementalService::unregisterStorageHealthListener(StorageId storage) {
+ StorageHealthCheckParams invalidCheckParams;
+ invalidCheckParams.blockedTimeoutMs = -1;
+ registerStorageHealthListener(storage, std::move(invalidCheckParams), {});
+}
+
bool IncrementalService::perfLoggingEnabled() {
static const bool enabled = base::GetBoolProperty("incremental.perflogging", false);
return enabled;
@@ -2137,6 +2162,19 @@
binder::Status IncrementalService::DataLoaderStub::reportStreamHealth(MountId mountId,
int newStatus) {
+ if (!isValid()) {
+ return binder::Status::
+ fromServiceSpecificError(-EINVAL,
+ "reportStreamHealth came to invalid DataLoaderStub");
+ }
+ if (id() != mountId) {
+ LOG(ERROR) << "Mount ID mismatch: expected " << id() << ", but got: " << mountId;
+ return binder::Status::fromServiceSpecificError(-EPERM, "Mount ID mismatch.");
+ }
+ {
+ std::lock_guard lock(mMutex);
+ mStreamStatus = newStatus;
+ }
return binder::Status::ok();
}
@@ -2153,6 +2191,33 @@
}
}
+static int adjustHealthStatus(int healthStatus, int streamStatus) {
+ if (healthStatus == IStorageHealthListener::HEALTH_STATUS_OK) {
+ // everything is good; no need to change status
+ return healthStatus;
+ }
+ int newHeathStatus = healthStatus;
+ switch (streamStatus) {
+ case IDataLoaderStatusListener::STREAM_STORAGE_ERROR:
+ // storage is limited and storage not healthy
+ newHeathStatus = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_STORAGE;
+ break;
+ case IDataLoaderStatusListener::STREAM_INTEGRITY_ERROR:
+ // fall through
+ case IDataLoaderStatusListener::STREAM_SOURCE_ERROR:
+ // fall through
+ case IDataLoaderStatusListener::STREAM_TRANSPORT_ERROR:
+ if (healthStatus == IStorageHealthListener::HEALTH_STATUS_UNHEALTHY) {
+ newHeathStatus = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_TRANSPORT;
+ }
+ // pending/blocked status due to transportation issues is not regarded as unhealthy
+ break;
+ default:
+ break;
+ }
+ return newHeathStatus;
+}
+
void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : "");
@@ -2232,6 +2297,8 @@
checkBackAfter = unhealthyMonitoring;
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
}
+ // Adjust health status based on stream status
+ healthStatusToReport = adjustHealthStatus(healthStatusToReport, mStreamStatus);
LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
<< "secs";
mService.addTimedJob(*mService.mTimedQueue, id(), checkBackAfter,
@@ -2321,6 +2388,18 @@
mService.mLooper->wake();
}
+void IncrementalService::DataLoaderStub::setHealthListener(
+ StorageHealthCheckParams&& healthCheckParams, const StorageHealthListener* healthListener) {
+ std::lock_guard lock(mMutex);
+ mHealthCheckParams = std::move(healthCheckParams);
+ if (healthListener == nullptr) {
+ // reset listener and params
+ mHealthListener = {};
+ } else {
+ mHealthListener = *healthListener;
+ }
+}
+
void IncrementalService::DataLoaderStub::onDump(int fd) {
dprintf(fd, " dataLoader: {\n");
dprintf(fd, " currentStatus: %d\n", mCurrentStatus);
diff --git a/services/incremental/IncrementalService.h b/services/incremental/IncrementalService.h
index 504c02a..4c4b8bd 100644
--- a/services/incremental/IncrementalService.h
+++ b/services/incremental/IncrementalService.h
@@ -140,7 +140,10 @@
bool registerLoadingProgressListener(StorageId storage,
const StorageLoadingProgressListener& progressListener);
bool unregisterLoadingProgressListener(StorageId storage);
-
+ bool registerStorageHealthListener(StorageId storage,
+ StorageHealthCheckParams&& healthCheckParams,
+ const StorageHealthListener& healthListener);
+ void unregisterStorageHealthListener(StorageId storage);
RawMetadata getMetadata(StorageId storage, std::string_view path) const;
RawMetadata getMetadata(StorageId storage, FileId node) const;
@@ -197,6 +200,8 @@
MountId id() const { return mId.load(std::memory_order_relaxed); }
const content::pm::DataLoaderParamsParcel& params() const { return mParams; }
+ void setHealthListener(StorageHealthCheckParams&& healthCheckParams,
+ const StorageHealthListener* healthListener);
private:
binder::Status onStatusChanged(MountId mount, int newStatus) final;
@@ -251,6 +256,7 @@
BootClockTsUs kernelTsUs;
} mHealthBase = {TimePoint::max(), kMaxBootClockTsUs};
StorageHealthCheckParams mHealthCheckParams;
+ int mStreamStatus = content::pm::IDataLoaderStatusListener::STREAM_HEALTHY;
};
using DataLoaderStubPtr = sp<DataLoaderStub>;
diff --git a/services/incremental/test/IncrementalServiceTest.cpp b/services/incremental/test/IncrementalServiceTest.cpp
index aec9fa1..867312e 100644
--- a/services/incremental/test/IncrementalServiceTest.cpp
+++ b/services/incremental/test/IncrementalServiceTest.cpp
@@ -177,6 +177,18 @@
}
return binder::Status::ok();
}
+ binder::Status storageError(int32_t id) {
+ if (mListener) {
+ mListener->reportStreamHealth(id, IDataLoaderStatusListener::STREAM_STORAGE_ERROR);
+ }
+ return binder::Status::ok();
+ }
+ binder::Status transportError(int32_t id) {
+ if (mListener) {
+ mListener->reportStreamHealth(id, IDataLoaderStatusListener::STREAM_INTEGRITY_ERROR);
+ }
+ return binder::Status::ok();
+ }
int32_t setStorageParams(bool enableReadLogs) {
int32_t result = -1;
EXPECT_NE(mServiceConnector.get(), nullptr);
@@ -1221,4 +1233,83 @@
EXPECT_CALL(*listenerMock, onStorageLoadingProgressChanged(_, _)).Times(0);
mIncrementalService->registerLoadingProgressListener(storageId, listener);
}
+
+TEST_F(IncrementalServiceTest, testRegisterStorageHealthListenerSuccess) {
+ mIncFs->openMountSuccess();
+ sp<NiceMock<MockStorageHealthListener>> listener{new NiceMock<MockStorageHealthListener>};
+ sp<NiceMock<MockStorageHealthListener>> newListener{new NiceMock<MockStorageHealthListener>};
+ NiceMock<MockStorageHealthListener>* newListenerMock = newListener.get();
+
+ TemporaryDir tempDir;
+ int storageId = mIncrementalService->createStorage(tempDir.path, std::move(mDataLoaderParcel),
+ IncrementalService::CreateOptions::CreateNew,
+ {}, StorageHealthCheckParams{}, listener);
+ ASSERT_GE(storageId, 0);
+ StorageHealthCheckParams newParams;
+ newParams.blockedTimeoutMs = 10000;
+ newParams.unhealthyTimeoutMs = 20000;
+ newParams.unhealthyMonitoringMs = 30000;
+ ASSERT_TRUE(mIncrementalService->registerStorageHealthListener(storageId, std::move(newParams),
+ newListener));
+
+ using MS = std::chrono::milliseconds;
+ using MCS = std::chrono::microseconds;
+
+ const auto blockedTimeout = MS(newParams.blockedTimeoutMs);
+ const auto unhealthyTimeout = MS(newParams.unhealthyTimeoutMs);
+
+ const uint64_t kFirstTimestampUs = 1000000000ll;
+ const uint64_t kBlockedTimestampUs =
+ kFirstTimestampUs - std::chrono::duration_cast<MCS>(blockedTimeout).count();
+ const uint64_t kUnhealthyTimestampUs =
+ kFirstTimestampUs - std::chrono::duration_cast<MCS>(unhealthyTimeout).count();
+
+ // test that old listener was not called
+ EXPECT_CALL(*listener.get(),
+ onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_READS_PENDING))
+ .Times(0);
+ EXPECT_CALL(*newListenerMock,
+ onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_READS_PENDING))
+ .Times(1);
+ EXPECT_CALL(*newListenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_BLOCKED))
+ .Times(1);
+ EXPECT_CALL(*newListenerMock,
+ onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_STORAGE))
+ .Times(1);
+ EXPECT_CALL(*newListenerMock,
+ onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_TRANSPORT))
+ .Times(1);
+ mIncFs->waitForPendingReadsSuccess(kFirstTimestampUs);
+ mLooper->mCallback(-1, -1, mLooper->mCallbackData);
+
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_READS_PENDING, newListener->mStatus);
+ ASSERT_EQ(storageId, newListener->mStorageId);
+
+ auto timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // test when health status is blocked with transport error
+ mDataLoader->transportError(storageId);
+ mIncFs->waitForPendingReadsSuccess(kBlockedTimestampUs);
+ timedCallback();
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_BLOCKED, newListener->mStatus);
+ timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // test when health status is blocked with storage error
+ mDataLoader->storageError(storageId);
+ mIncFs->waitForPendingReadsSuccess(kBlockedTimestampUs);
+ timedCallback();
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_STORAGE, newListener->mStatus);
+ timedCallback = mTimedQueue->mWhat;
+ mTimedQueue->clearJob(storageId);
+
+ // test when health status is unhealthy with transport error
+ mDataLoader->transportError(storageId);
+ mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
+ timedCallback();
+ ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY_TRANSPORT, newListener->mStatus);
+ mTimedQueue->clearJob(storageId);
+}
+
} // namespace android::os::incremental