[DataStore] Introduce observer interface and provide implementation

Bug: 325144964
Test: follow up UT
Change-Id: I53bfff3c1e2dbd2f0a2304eb558c2734f34472ec
diff --git a/packages/SettingsLib/DataStore/Android.bp b/packages/SettingsLib/DataStore/Android.bp
new file mode 100644
index 0000000..c5957c6
--- /dev/null
+++ b/packages/SettingsLib/DataStore/Android.bp
@@ -0,0 +1,15 @@
+package {
+    default_applicable_licenses: ["frameworks_base_license"],
+}
+
+android_library {
+    name: "SettingsLibDataStore",
+    defaults: [
+        "SettingsLintDefaults",
+    ],
+    srcs: ["src/**/*"],
+    static_libs: [
+        "androidx.annotation_annotation",
+        "androidx.collection_collection-ktx",
+    ],
+}
diff --git a/packages/SettingsLib/DataStore/AndroidManifest.xml b/packages/SettingsLib/DataStore/AndroidManifest.xml
new file mode 100644
index 0000000..fb44627
--- /dev/null
+++ b/packages/SettingsLib/DataStore/AndroidManifest.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="utf-8"?>
+<manifest xmlns:android="http://schemas.android.com/apk/res/android"
+    package="com.android.settingslib.datastore">
+
+    <uses-sdk android:minSdkVersion="21" />
+</manifest>
diff --git a/packages/SettingsLib/DataStore/src/com/android/settingslib/datastore/KeyedObserver.kt b/packages/SettingsLib/DataStore/src/com/android/settingslib/datastore/KeyedObserver.kt
new file mode 100644
index 0000000..3ed4d46
--- /dev/null
+++ b/packages/SettingsLib/DataStore/src/com/android/settingslib/datastore/KeyedObserver.kt
@@ -0,0 +1,183 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.settingslib.datastore
+
+import androidx.annotation.AnyThread
+import androidx.annotation.GuardedBy
+import androidx.collection.MutableScatterMap
+import java.util.WeakHashMap
+import java.util.concurrent.Executor
+
+/**
+ * Callback to be informed of changes in [KeyedObservable] object.
+ *
+ * The observer is weakly referenced, a strong reference must be kept.
+ */
+fun interface KeyedObserver<in K> {
+    /**
+     * Called by [KeyedObservable] in the event of changes.
+     *
+     * This callback will run in the given [Executor] when observer is added.
+     *
+     * @param key key that has been changed
+     * @param reason the reason of change
+     * @see KeyedObservable.addObserver
+     */
+    fun onKeyChanged(key: K, @ChangeReason reason: Int)
+}
+
+/**
+ * A key-value observable object allows to observe change with [KeyedObserver].
+ *
+ * Notes:
+ * - The order in which observers will be notified is unspecified.
+ * - The observer is weakly referenced to avoid memory leaking, the call site must keep a strong
+ *   reference of the observer.
+ * - It is possible that the callback may be triggered even there is no real data change. For
+ *   example, when data restore/clear happens, it might be too complex to check if data is really
+ *   changed, thus all the registered observers are notified directly.
+ */
+@AnyThread
+interface KeyedObservable<K> {
+    /**
+     * Adds an observer for any key.
+     *
+     * The observer will be notified whenever a change happens. The [KeyedObserver.onKeyChanged]
+     * callback will be invoked with specific key that is modified. However, `null` key is passed in
+     * the cases that a bunch of keys are changed simultaneously (e.g. clear data, restore happens).
+     *
+     * @param observer observer to be notified
+     * @param executor executor to run the callback
+     */
+    fun addObserver(observer: KeyedObserver<K?>, executor: Executor)
+
+    /**
+     * Adds an observer on given key.
+     *
+     * The observer will be notified only when the given key is changed.
+     *
+     * @param key key to observe
+     * @param observer observer to be notified
+     * @param executor executor to run the callback
+     */
+    fun addObserver(key: K, observer: KeyedObserver<K>, executor: Executor)
+
+    /** Removes observer. */
+    fun removeObserver(observer: KeyedObserver<K?>)
+
+    /** Removes observer on given key. */
+    fun removeObserver(key: K, observer: KeyedObserver<K>)
+
+    /**
+     * Notifies all observers that a change occurs.
+     *
+     * All the any key and keyed observers are notified.
+     *
+     * @param reason reason of the change
+     */
+    fun notifyChange(@ChangeReason reason: Int)
+
+    /**
+     * Notifies observers that a change occurs on given key.
+     *
+     * The any key and specific key observers are notified.
+     *
+     * @param key key of the change
+     * @param reason reason of the change
+     */
+    fun notifyChange(key: K, @ChangeReason reason: Int)
+}
+
+/** A thread safe implementation of [KeyedObservable]. */
+class KeyedDataObservable<K> : KeyedObservable<K> {
+    // Instead of @GuardedBy("this"), guarded by itself because KeyedDataObservable object could be
+    // synchronized outside by the holder
+    @GuardedBy("itself") private val observers = WeakHashMap<KeyedObserver<K?>, Executor>()
+
+    @GuardedBy("itself")
+    private val keyedObservers = MutableScatterMap<K, WeakHashMap<KeyedObserver<K>, Executor>>()
+
+    override fun addObserver(observer: KeyedObserver<K?>, executor: Executor) {
+        val oldExecutor = synchronized(observers) { observers.put(observer, executor) }
+        if (oldExecutor != null && oldExecutor != executor) {
+            throw IllegalStateException("Add $observer twice, old=$oldExecutor, new=$executor")
+        }
+    }
+
+    override fun addObserver(key: K, observer: KeyedObserver<K>, executor: Executor) {
+        val oldExecutor =
+            synchronized(keyedObservers) {
+                keyedObservers.getOrPut(key) { WeakHashMap() }.put(observer, executor)
+            }
+        if (oldExecutor != null && oldExecutor != executor) {
+            throw IllegalStateException("Add $observer twice, old=$oldExecutor, new=$executor")
+        }
+    }
+
+    override fun removeObserver(observer: KeyedObserver<K?>) {
+        synchronized(observers) { observers.remove(observer) }
+    }
+
+    override fun removeObserver(key: K, observer: KeyedObserver<K>) {
+        synchronized(keyedObservers) {
+            val observers = keyedObservers[key]
+            if (observers?.remove(observer) != null && observers.isEmpty()) {
+                keyedObservers.remove(key)
+            }
+        }
+    }
+
+    override fun notifyChange(@ChangeReason reason: Int) {
+        // make a copy to avoid potential ConcurrentModificationException
+        val observers = synchronized(observers) { observers.entries.toTypedArray() }
+        val keyedObservers = synchronized(keyedObservers) { keyedObservers.copy() }
+        for (entry in observers) {
+            val observer = entry.key // avoid reference "entry"
+            entry.value.execute { observer.onKeyChanged(null, reason) }
+        }
+        for (pair in keyedObservers) {
+            val key = pair.first
+            for (entry in pair.second) {
+                val observer = entry.key // avoid reference "entry"
+                entry.value.execute { observer.onKeyChanged(key, reason) }
+            }
+        }
+    }
+
+    private fun MutableScatterMap<K, WeakHashMap<KeyedObserver<K>, Executor>>.copy():
+        List<Pair<K, Array<Map.Entry<KeyedObserver<K>, Executor>>>> {
+        val result = ArrayList<Pair<K, Array<Map.Entry<KeyedObserver<K>, Executor>>>>(size)
+        forEach { key, value -> result.add(Pair(key, value.entries.toTypedArray())) }
+        return result
+    }
+
+    override fun notifyChange(key: K, @ChangeReason reason: Int) {
+        // make a copy to avoid potential ConcurrentModificationException
+        val observers = synchronized(observers) { observers.entries.toTypedArray() }
+        val keyedObservers =
+            synchronized(keyedObservers) { keyedObservers[key]?.entries?.toTypedArray() }
+                ?: arrayOf()
+        for (entry in observers) {
+            val observer = entry.key // avoid reference "entry"
+            entry.value.execute { observer.onKeyChanged(key, reason) }
+        }
+        for (entry in keyedObservers) {
+            val observer = entry.key // avoid reference "entry"
+            entry.value.execute { observer.onKeyChanged(key, reason) }
+        }
+    }
+}
diff --git a/packages/SettingsLib/DataStore/src/com/android/settingslib/datastore/Observer.kt b/packages/SettingsLib/DataStore/src/com/android/settingslib/datastore/Observer.kt
new file mode 100644
index 0000000..6d0ca669
--- /dev/null
+++ b/packages/SettingsLib/DataStore/src/com/android/settingslib/datastore/Observer.kt
@@ -0,0 +1,121 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.settingslib.datastore
+
+import androidx.annotation.AnyThread
+import androidx.annotation.GuardedBy
+import androidx.annotation.IntDef
+import java.util.WeakHashMap
+import java.util.concurrent.Executor
+
+/** The reason of a change. */
+@IntDef(
+    ChangeReason.UNKNOWN,
+    ChangeReason.UPDATE,
+    ChangeReason.DELETE,
+    ChangeReason.RESTORE,
+    ChangeReason.SYNC_ACROSS_PROFILES,
+)
+@Retention(AnnotationRetention.SOURCE)
+annotation class ChangeReason {
+    companion object {
+        /** Unknown reason of the change. */
+        const val UNKNOWN = 0
+        /** Data is updated. */
+        const val UPDATE = 1
+        /** Data is deleted. */
+        const val DELETE = 2
+        /** Data is restored from backup/restore framework. */
+        const val RESTORE = 3
+        /** Data is synced from another profile (e.g. personal profile to work profile). */
+        const val SYNC_ACROSS_PROFILES = 4
+    }
+}
+
+/**
+ * Callback to be informed of changes in [Observable] object.
+ *
+ * The observer is weakly referenced, a strong reference must be kept.
+ */
+fun interface Observer {
+    /**
+     * Called by [Observable] in the event of changes.
+     *
+     * This callback will run in the given [Executor] when observer is added.
+     *
+     * @param reason the reason of change
+     * @see [Observable.addObserver] for the notices.
+     */
+    fun onChanged(@ChangeReason reason: Int)
+}
+
+/** An observable object allows to observe change with [Observer]. */
+@AnyThread
+interface Observable {
+    /**
+     * Adds an observer.
+     *
+     * Notes:
+     * - The order in which observers will be notified is unspecified.
+     * - The observer is weakly referenced to avoid memory leaking, the call site must keep a strong
+     *   reference of the observer.
+     * - It is possible that the callback may be triggered even there is no real data change. For
+     *   example, when data restore/clear happens, it might be too complex to check if data is
+     *   really changed, thus all the registered observers are notified directly.
+     *
+     * @param observer observer to be notified
+     * @param executor executor to run the [Observer.onChanged] callback
+     */
+    fun addObserver(observer: Observer, executor: Executor)
+
+    /** Removes given observer. */
+    fun removeObserver(observer: Observer)
+
+    /**
+     * Notifies observers that a change occurs.
+     *
+     * @param reason reason of the change
+     */
+    fun notifyChange(@ChangeReason reason: Int)
+}
+
+/** A thread safe implementation of [Observable]. */
+class DataObservable : Observable {
+    // Instead of @GuardedBy("this"), guarded by itself because DataObservable object could be
+    // synchronized outside by the holder
+    @GuardedBy("itself") private val observers = WeakHashMap<Observer, Executor>()
+
+    override fun addObserver(observer: Observer, executor: Executor) {
+        val oldExecutor = synchronized(observers) { observers.put(observer, executor) }
+        if (oldExecutor != null && oldExecutor != executor) {
+            throw IllegalStateException("Add $observer twice, old=$oldExecutor, new=$executor")
+        }
+    }
+
+    override fun removeObserver(observer: Observer) {
+        synchronized(observers) { observers.remove(observer) }
+    }
+
+    override fun notifyChange(@ChangeReason reason: Int) {
+        // make a copy to avoid potential ConcurrentModificationException
+        val entries = synchronized(observers) { observers.entries.toTypedArray() }
+        for (entry in entries) {
+            val observer = entry.key // avoid reference "entry"
+            entry.value.execute { observer.onChanged(reason) }
+        }
+    }
+}