[BOT.6] Make BpfCoordinator to support data limit

The BPF tethering coordinator listens to the forwarding rule
changes for updating data limit which is applied in the following
conditions.
- After adding the first rule on a given upstream, add data limit.
- After removing the last rule on a given upstream, clear data limit.
- The service applies a new data limit on current upstream.

The reason for relying on rule changes is because the Tethering and
IpServer objects have multi-internal state machines. It is hard to
synchronize all of their states.

Note that the data limit cleanup for stopping or switching upstream
relies on offload rules are all removed as well.

Bug: 150736748
Test: manual
Original-Change: https://android-review.googlesource.com/1302436
Merged-In: I829d36339973f9473fe6b616c48aa288f18d1c46
Change-Id: I829d36339973f9473fe6b616c48aa288f18d1c46
diff --git a/Tethering/src/android/net/ip/IpServer.java b/Tethering/src/android/net/ip/IpServer.java
index 830f6a0..1671dda 100644
--- a/Tethering/src/android/net/ip/IpServer.java
+++ b/Tethering/src/android/net/ip/IpServer.java
@@ -77,7 +77,6 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
@@ -272,9 +271,6 @@
         }
     }
 
-    private final LinkedHashMap<Inet6Address, Ipv6ForwardingRule> mIpv6ForwardingRules =
-            new LinkedHashMap<>();
-
     private final IpNeighborMonitor mIpNeighborMonitor;
 
     private LinkAddress mIpv4Address;
@@ -843,43 +839,29 @@
         // TODO: Perhaps remove this protection check.
         if (!mUsingBpfOffload) return;
 
-        try {
-            mNetd.tetherOffloadRuleAdd(rule.toTetherOffloadRuleParcel());
-            mIpv6ForwardingRules.put(rule.address, rule);
-        } catch (RemoteException | ServiceSpecificException e) {
-            mLog.e("Could not add IPv6 downstream rule: ", e);
-        }
+        mBpfCoordinator.tetherOffloadRuleAdd(this, rule);
     }
 
-    private void removeIpv6ForwardingRule(Ipv6ForwardingRule rule, boolean removeFromMap) {
-        // Theoretically, we don't need this check because IP neighbor monitor doesn't start if BPF
-        // offload is disabled. Add this check just in case.
+    private void removeIpv6ForwardingRule(Ipv6ForwardingRule rule) {
         // TODO: Perhaps remove this protection check.
+        // See the related comment in #addIpv6ForwardingRule.
         if (!mUsingBpfOffload) return;
 
-        try {
-            mNetd.tetherOffloadRuleRemove(rule.toTetherOffloadRuleParcel());
-            if (removeFromMap) {
-                mIpv6ForwardingRules.remove(rule.address);
-            }
-        } catch (RemoteException | ServiceSpecificException e) {
-            mLog.e("Could not remove IPv6 downstream rule: ", e);
-        }
+        mBpfCoordinator.tetherOffloadRuleRemove(this, rule);
     }
 
     private void clearIpv6ForwardingRules() {
-        for (Ipv6ForwardingRule rule : mIpv6ForwardingRules.values()) {
-            removeIpv6ForwardingRule(rule, false /*removeFromMap*/);
-        }
-        mIpv6ForwardingRules.clear();
+        if (!mUsingBpfOffload) return;
+
+        mBpfCoordinator.tetherOffloadRuleClear(this);
     }
 
-    // Convenience method to replace a rule with the same rule on a new upstream interface.
-    // Allows replacing the rules in one iteration pass without ConcurrentModificationExceptions.
-    // Relies on the fact that rules are in a map indexed by IP address.
-    private void updateIpv6ForwardingRule(Ipv6ForwardingRule rule, int newIfindex) {
-        addIpv6ForwardingRule(rule.onNewUpstream(newIfindex));
-        removeIpv6ForwardingRule(rule, false /*removeFromMap*/);
+    private void updateIpv6ForwardingRule(int newIfindex) {
+        // TODO: Perhaps remove this protection check.
+        // See the related comment in #addIpv6ForwardingRule.
+        if (!mUsingBpfOffload) return;
+
+        mBpfCoordinator.tetherOffloadRuleUpdate(this, newIfindex);
     }
 
     // Handles all updates to IPv6 forwarding rules. These can currently change only if the upstream
@@ -895,9 +877,7 @@
         // If the upstream interface has changed, remove all rules and re-add them with the new
         // upstream interface.
         if (prevUpstreamIfindex != upstreamIfindex) {
-            for (Ipv6ForwardingRule rule : mIpv6ForwardingRules.values()) {
-                updateIpv6ForwardingRule(rule, upstreamIfindex);
-            }
+            updateIpv6ForwardingRule(upstreamIfindex);
         }
 
         // If we're here to process a NeighborEvent, do so now.
@@ -917,7 +897,7 @@
         if (e.isValid()) {
             addIpv6ForwardingRule(rule);
         } else {
-            removeIpv6ForwardingRule(rule, true /*removeFromMap*/);
+            removeIpv6ForwardingRule(rule);
         }
     }
 
diff --git a/Tethering/src/com/android/networkstack/tethering/BpfCoordinator.java b/Tethering/src/com/android/networkstack/tethering/BpfCoordinator.java
index 089b12a..fc27b6a 100644
--- a/Tethering/src/com/android/networkstack/tethering/BpfCoordinator.java
+++ b/Tethering/src/com/android/networkstack/tethering/BpfCoordinator.java
@@ -32,6 +32,7 @@
 import android.net.NetworkStats.Entry;
 import android.net.TetherOffloadRuleParcel;
 import android.net.TetherStatsParcel;
+import android.net.ip.IpServer;
 import android.net.netstats.provider.NetworkStatsProvider;
 import android.net.util.SharedLog;
 import android.net.util.TetheringUtils.ForwardedStats;
@@ -48,11 +49,17 @@
 import com.android.internal.annotations.VisibleForTesting;
 
 import java.net.Inet6Address;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Objects;
 
 /**
  *  This coordinator is responsible for providing BPF offload relevant functionality.
  *  - Get tethering stats.
+ *  - Set data limit.
  *  - Set global alert.
+ *  - Add/remove forwarding rules.
  *
  * @hide
  */
@@ -77,7 +84,14 @@
     private final Dependencies mDeps;
     @Nullable
     private final BpfTetherStatsProvider mStatsProvider;
-    private boolean mStarted = false;
+
+    // Tracks whether BPF tethering is started or not. This is set by tethering before it
+    // starts the first IpServer and is cleared by tethering shortly before the last IpServer
+    // is stopped. Note that rule updates (especially deletions, but sometimes additions as
+    // well) may arrive when this is false. If they do, they must be communicated to netd.
+    // Changes in data limits may also arrive when this is false, and if they do, they must
+    // also be communicated to netd.
+    private boolean mPollingStarted = false;
 
     // Tracking remaining alert quota. Unlike limit quota is subject to interface, the alert
     // quota is interface independent and global for tether offload.
@@ -86,13 +100,40 @@
     // Maps upstream interface index to offloaded traffic statistics.
     // Always contains the latest total bytes/packets, since each upstream was started, received
     // from the BPF maps for each interface.
-    private SparseArray<ForwardedStats> mStats = new SparseArray<>();
+    private final SparseArray<ForwardedStats> mStats = new SparseArray<>();
+
+    // Maps upstream interface names to interface quotas.
+    // Always contains the latest value received from the framework for each interface, regardless
+    // of whether offload is currently running (or is even supported) on that interface. Only
+    // includes interfaces that have a quota set. Note that this map is used for storing the quota
+    // which is set from the service. Because the service uses the interface name to present the
+    // interface, this map uses the interface name to be the mapping index.
+    private final HashMap<String, Long> mInterfaceQuotas = new HashMap<>();
 
     // Maps upstream interface index to interface names.
     // Store all interface name since boot. Used for lookup what interface name it is from the
     // tether stats got from netd because netd reports interface index to present an interface.
     // TODO: Remove the unused interface name.
-    private SparseArray<String> mInterfaceNames = new SparseArray<>();
+    private final SparseArray<String> mInterfaceNames = new SparseArray<>();
+
+    // Map of downstream rule maps. Each of these maps represents the IPv6 forwarding rules for a
+    // given downstream. Each map:
+    // - Is owned by the IpServer that is responsible for that downstream.
+    // - Must only be modified by that IpServer.
+    // - Is created when the IpServer adds its first rule, and deleted when the IpServer deletes
+    //   its last rule (or clears its rules).
+    // TODO: Perhaps seal the map and rule operations which communicates with netd into a class.
+    // TODO: Does this need to be a LinkedHashMap or can it just be a HashMap? Also, could it be
+    // a ConcurrentHashMap, in order to avoid the copies in tetherOffloadRuleClear
+    // and tetherOffloadRuleUpdate?
+    // TODO: Perhaps use one-dimensional map and access specific downstream rules via downstream
+    // index. For doing that, IpServer must guarantee that it always has a valid IPv6 downstream
+    // interface index while calling function to clear all rules. IpServer may be calling clear
+    // rules function without a valid IPv6 downstream interface index even if it may have one
+    // before. IpServer would need to call getInterfaceParams() in the constructor instead of when
+    // startIpv6() is called, and make mInterfaceParams final.
+    private final HashMap<IpServer, LinkedHashMap<Inet6Address, Ipv6ForwardingRule>>
+            mIpv6ForwardingRules = new LinkedHashMap<>();
 
     // Runnable that used by scheduling next polling of stats.
     private final Runnable mScheduledPollingTask = () -> {
@@ -101,14 +142,15 @@
     };
 
     @VisibleForTesting
-    static class Dependencies {
+    public static class Dependencies {
         int getPerformPollInterval() {
             // TODO: Consider make this configurable.
             return DEFAULT_PERFORM_POLL_INTERVAL_MS;
         }
     }
 
-    BpfCoordinator(@NonNull Handler handler, @NonNull INetd netd,
+    @VisibleForTesting
+    public BpfCoordinator(@NonNull Handler handler, @NonNull INetd netd,
             @NonNull NetworkStatsManager nsm, @NonNull SharedLog log, @NonNull Dependencies deps) {
         mHandler = handler;
         mNetd = netd;
@@ -132,31 +174,153 @@
      * TODO: Perhaps check BPF support before starting.
      * TODO: Start the stats polling only if there is any client on the downstream.
      */
-    public void start() {
-        if (mStarted) return;
+    public void startPolling() {
+        if (mPollingStarted) return;
 
-        mStarted = true;
+        mPollingStarted = true;
         maybeSchedulePollingStats();
 
-        mLog.i("BPF tethering coordinator started");
+        mLog.i("Polling started");
     }
 
     /**
-     * Stop BPF tethering offload stats polling and cleanup upstream parameters.
+     * Stop BPF tethering offload stats polling.
+     * The data limit cleanup and the tether stats maps cleanup are not implemented here.
+     * These cleanups rely on all IpServers calling #tetherOffloadRuleRemove. After the
+     * last rule is removed from the upstream, #tetherOffloadRuleRemove does the cleanup
+     * functionality.
      * Note that this can be only called on handler thread.
      */
-    public void stop() {
-        if (!mStarted) return;
+    public void stopPolling() {
+        if (!mPollingStarted) return;
 
         // Stop scheduled polling tasks and poll the latest stats from BPF maps.
         if (mHandler.hasCallbacks(mScheduledPollingTask)) {
             mHandler.removeCallbacks(mScheduledPollingTask);
         }
         updateForwardedStatsFromNetd();
+        mPollingStarted = false;
 
-        mStarted = false;
+        mLog.i("Polling stopped");
+    }
 
-        mLog.i("BPF tethering coordinator stopped");
+    /**
+     * Add forwarding rule. After adding the first rule on a given upstream, must add the data
+     * limit on the given upstream.
+     * Note that this can be only called on handler thread.
+     */
+    public void tetherOffloadRuleAdd(
+            @NonNull final IpServer ipServer, @NonNull final Ipv6ForwardingRule rule) {
+        try {
+            // TODO: Perhaps avoid to add a duplicate rule.
+            mNetd.tetherOffloadRuleAdd(rule.toTetherOffloadRuleParcel());
+        } catch (RemoteException | ServiceSpecificException e) {
+            mLog.e("Could not add IPv6 forwarding rule: ", e);
+            return;
+        }
+
+        if (!mIpv6ForwardingRules.containsKey(ipServer)) {
+            mIpv6ForwardingRules.put(ipServer, new LinkedHashMap<Inet6Address,
+                    Ipv6ForwardingRule>());
+        }
+        LinkedHashMap<Inet6Address, Ipv6ForwardingRule> rules = mIpv6ForwardingRules.get(ipServer);
+
+        // Setup the data limit on the given upstream if the first rule is added.
+        final int upstreamIfindex = rule.upstreamIfindex;
+        if (!isAnyRuleOnUpstream(upstreamIfindex)) {
+            // If failed to set a data limit, probably should not use this upstream, because
+            // the upstream may not want to blow through the data limit that was told to apply.
+            // TODO: Perhaps stop the coordinator.
+            boolean success = updateDataLimit(upstreamIfindex);
+            if (!success) {
+                final String iface = mInterfaceNames.get(upstreamIfindex);
+                mLog.e("Setting data limit for " + iface + " failed.");
+            }
+        }
+
+        // Must update the adding rule after calling #isAnyRuleOnUpstream because it needs to
+        // check if it is about adding a first rule for a given upstream.
+        rules.put(rule.address, rule);
+    }
+
+    /**
+     * Remove forwarding rule. After removing the last rule on a given upstream, must clear
+     * data limit, update the last tether stats and remove the tether stats in the BPF maps.
+     * Note that this can be only called on handler thread.
+     */
+    public void tetherOffloadRuleRemove(
+            @NonNull final IpServer ipServer, @NonNull final Ipv6ForwardingRule rule) {
+        try {
+            // TODO: Perhaps avoid to remove a non-existent rule.
+            mNetd.tetherOffloadRuleRemove(rule.toTetherOffloadRuleParcel());
+        } catch (RemoteException | ServiceSpecificException e) {
+            mLog.e("Could not remove IPv6 forwarding rule: ", e);
+            return;
+        }
+
+        LinkedHashMap<Inet6Address, Ipv6ForwardingRule> rules = mIpv6ForwardingRules.get(ipServer);
+        if (rules == null) return;
+
+        // Must remove rules before calling #isAnyRuleOnUpstream because it needs to check if
+        // the last rule is removed for a given upstream. If no rule is removed, return early.
+        // Avoid unnecessary work on a non-existent rule which may have never been added or
+        // removed already.
+        if (rules.remove(rule.address) == null) return;
+
+        // Remove the downstream entry if it has no more rule.
+        if (rules.isEmpty()) {
+            mIpv6ForwardingRules.remove(ipServer);
+        }
+
+        // Do cleanup functionality if there is no more rule on the given upstream.
+        final int upstreamIfindex = rule.upstreamIfindex;
+        if (!isAnyRuleOnUpstream(upstreamIfindex)) {
+            try {
+                final TetherStatsParcel stats =
+                        mNetd.tetherOffloadGetAndClearStats(upstreamIfindex);
+                // Update the last stats delta and delete the local cache for a given upstream.
+                updateQuotaAndStatsFromSnapshot(new TetherStatsParcel[] {stats});
+                mStats.remove(upstreamIfindex);
+            } catch (RemoteException | ServiceSpecificException e) {
+                Log.wtf(TAG, "Exception when cleanup tether stats for upstream index "
+                        + upstreamIfindex + ": ", e);
+            }
+        }
+    }
+
+    /**
+     * Clear all forwarding rules for a given downstream.
+     * Note that this can be only called on handler thread.
+     */
+    public void tetherOffloadRuleClear(@NonNull final IpServer ipServer) {
+        final LinkedHashMap<Inet6Address, Ipv6ForwardingRule> rules = mIpv6ForwardingRules.get(
+                ipServer);
+        if (rules == null) return;
+
+        // Need to build a rule list because the rule map may be changed in the iteration.
+        for (final Ipv6ForwardingRule rule : new ArrayList<Ipv6ForwardingRule>(rules.values())) {
+            tetherOffloadRuleRemove(ipServer, rule);
+        }
+    }
+
+    /**
+     * Update existing forwarding rules to new upstream for a given downstream.
+     * Note that this can be only called on handler thread.
+     */
+    public void tetherOffloadRuleUpdate(@NonNull final IpServer ipServer, int newUpstreamIfindex) {
+        final LinkedHashMap<Inet6Address, Ipv6ForwardingRule> rules = mIpv6ForwardingRules.get(
+                ipServer);
+        if (rules == null) return;
+
+        // Need to build a rule list because the rule map may be changed in the iteration.
+        for (final Ipv6ForwardingRule rule : new ArrayList<Ipv6ForwardingRule>(rules.values())) {
+            // Remove the old rule before adding the new one because the map uses the same key for
+            // both rules. Reversing the processing order causes that the new rule is removed as
+            // unexpected.
+            // TODO: Add new rule first to reduce the latency which has no rule.
+            tetherOffloadRuleRemove(ipServer, rule);
+            tetherOffloadRuleAdd(ipServer, rule.onNewUpstream(newUpstreamIfindex));
+        }
     }
 
     /**
@@ -184,12 +348,17 @@
     public static class Ipv6ForwardingRule {
         public final int upstreamIfindex;
         public final int downstreamIfindex;
+
+        @NonNull
         public final Inet6Address address;
+        @NonNull
         public final MacAddress srcMac;
+        @NonNull
         public final MacAddress dstMac;
 
-        public Ipv6ForwardingRule(int upstreamIfindex, int downstreamIfIndex, Inet6Address address,
-                MacAddress srcMac, MacAddress dstMac) {
+        public Ipv6ForwardingRule(int upstreamIfindex, int downstreamIfIndex,
+                @NonNull Inet6Address address, @NonNull MacAddress srcMac,
+                @NonNull MacAddress dstMac) {
             this.upstreamIfindex = upstreamIfindex;
             this.downstreamIfindex = downstreamIfIndex;
             this.address = address;
@@ -198,6 +367,7 @@
         }
 
         /** Return a new rule object which updates with new upstream index. */
+        @NonNull
         public Ipv6ForwardingRule onNewUpstream(int newUpstreamIfindex) {
             return new Ipv6ForwardingRule(newUpstreamIfindex, downstreamIfindex, address, srcMac,
                     dstMac);
@@ -207,6 +377,7 @@
          * Don't manipulate TetherOffloadRuleParcel directly because implementing onNewUpstream()
          * would be error-prone due to generated stable AIDL classes not having a copy constructor.
          */
+        @NonNull
         public TetherOffloadRuleParcel toTetherOffloadRuleParcel() {
             final TetherOffloadRuleParcel parcel = new TetherOffloadRuleParcel();
             parcel.inputInterfaceIndex = upstreamIfindex;
@@ -217,6 +388,24 @@
             parcel.dstL2Address = dstMac.toByteArray();
             return parcel;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Ipv6ForwardingRule)) return false;
+            Ipv6ForwardingRule that = (Ipv6ForwardingRule) o;
+            return this.upstreamIfindex == that.upstreamIfindex
+                    && this.downstreamIfindex == that.downstreamIfindex
+                    && Objects.equals(this.address, that.address)
+                    && Objects.equals(this.srcMac, that.srcMac)
+                    && Objects.equals(this.dstMac, that.dstMac);
+        }
+
+        @Override
+        public int hashCode() {
+            // TODO: if this is ever used in production code, don't pass ifindices
+            // to Objects.hash() to avoid autoboxing overhead.
+            return Objects.hash(upstreamIfindex, downstreamIfindex, address, srcMac, dstMac);
+        }
     }
 
     /**
@@ -245,7 +434,22 @@
 
         @Override
         public void onSetLimit(@NonNull String iface, long quotaBytes) {
-            // no-op
+            if (quotaBytes < QUOTA_UNLIMITED) {
+                throw new IllegalArgumentException("invalid quota value " + quotaBytes);
+            }
+
+            mHandler.post(() -> {
+                final Long curIfaceQuota = mInterfaceQuotas.get(iface);
+
+                if (null == curIfaceQuota && QUOTA_UNLIMITED == quotaBytes) return;
+
+                if (quotaBytes == QUOTA_UNLIMITED) {
+                    mInterfaceQuotas.remove(iface);
+                } else {
+                    mInterfaceQuotas.put(iface, quotaBytes);
+                }
+                maybeUpdateDataLimit(iface);
+            });
         }
 
         @VisibleForTesting
@@ -270,9 +474,79 @@
         }
     }
 
+    private int getInterfaceIndexFromRules(@NonNull String ifName) {
+        for (LinkedHashMap<Inet6Address, Ipv6ForwardingRule> rules : mIpv6ForwardingRules
+                .values()) {
+            for (Ipv6ForwardingRule rule : rules.values()) {
+                final int upstreamIfindex = rule.upstreamIfindex;
+                if (TextUtils.equals(ifName, mInterfaceNames.get(upstreamIfindex))) {
+                    return upstreamIfindex;
+                }
+            }
+        }
+        return 0;
+    }
+
+    private long getQuotaBytes(@NonNull String iface) {
+        final Long limit = mInterfaceQuotas.get(iface);
+        final long quotaBytes = (limit != null) ? limit : QUOTA_UNLIMITED;
+
+        return quotaBytes;
+    }
+
+    private boolean sendDataLimitToNetd(int ifIndex, long quotaBytes) {
+        if (ifIndex == 0) {
+            Log.wtf(TAG, "Invalid interface index.");
+            return false;
+        }
+
+        try {
+            mNetd.tetherOffloadSetInterfaceQuota(ifIndex, quotaBytes);
+        } catch (RemoteException | ServiceSpecificException e) {
+            mLog.e("Exception when updating quota " + quotaBytes + ": ", e);
+            return false;
+        }
+
+        return true;
+    }
+
+    // Handle the data limit update from the service which is the stats provider registered for.
+    private void maybeUpdateDataLimit(@NonNull String iface) {
+        // Set data limit only on a given upstream which has at least one rule. If we can't get
+        // an interface index for a given interface name, it means either there is no rule for
+        // a given upstream or the interface name is not an upstream which is monitored by the
+        // coordinator.
+        final int ifIndex = getInterfaceIndexFromRules(iface);
+        if (ifIndex == 0) return;
+
+        final long quotaBytes = getQuotaBytes(iface);
+        sendDataLimitToNetd(ifIndex, quotaBytes);
+    }
+
+    // Handle the data limit update while adding forwarding rules.
+    private boolean updateDataLimit(int ifIndex) {
+        final String iface = mInterfaceNames.get(ifIndex);
+        if (iface == null) {
+            mLog.e("Fail to get the interface name for index " + ifIndex);
+            return false;
+        }
+        final long quotaBytes = getQuotaBytes(iface);
+        return sendDataLimitToNetd(ifIndex, quotaBytes);
+    }
+
+    private boolean isAnyRuleOnUpstream(int upstreamIfindex) {
+        for (LinkedHashMap<Inet6Address, Ipv6ForwardingRule> rules : mIpv6ForwardingRules
+                .values()) {
+            for (Ipv6ForwardingRule rule : rules.values()) {
+                if (upstreamIfindex == rule.upstreamIfindex) return true;
+            }
+        }
+        return false;
+    }
+
     @NonNull
     private NetworkStats buildNetworkStats(@NonNull StatsType type, int ifIndex,
-            @NonNull ForwardedStats diff) {
+            @NonNull final ForwardedStats diff) {
         NetworkStats stats = new NetworkStats(0L, 0);
         final String iface = mInterfaceNames.get(ifIndex);
         if (iface == null) {
@@ -302,17 +576,8 @@
         }
     }
 
-    private void updateForwardedStatsFromNetd() {
-        final TetherStatsParcel[] tetherStatsList;
-        try {
-            // The reported tether stats are total data usage for all currently-active upstream
-            // interfaces since tethering start.
-            tetherStatsList = mNetd.tetherOffloadGetStats();
-        } catch (RemoteException | ServiceSpecificException e) {
-            mLog.e("Problem fetching tethering stats: ", e);
-            return;
-        }
-
+    private void updateQuotaAndStatsFromSnapshot(
+            @NonNull final TetherStatsParcel[] tetherStatsList) {
         long usedAlertQuota = 0;
         for (TetherStatsParcel tetherStats : tetherStatsList) {
             final Integer ifIndex = tetherStats.ifIndex;
@@ -332,7 +597,7 @@
                             buildNetworkStats(StatsType.STATS_PER_IFACE, ifIndex, diff),
                             buildNetworkStats(StatsType.STATS_PER_UID, ifIndex, diff));
                 } catch (ArrayIndexOutOfBoundsException e) {
-                    Log.wtf("Fail to update the accumulated stats delta for interface index "
+                    Log.wtf(TAG, "Fail to update the accumulated stats delta for interface index "
                             + ifIndex + " : ", e);
                 }
             }
@@ -344,10 +609,24 @@
             updateAlertQuota(newQuota);
         }
 
+        // TODO: Count the used limit quota for notifying data limit reached.
+    }
+
+    private void updateForwardedStatsFromNetd() {
+        final TetherStatsParcel[] tetherStatsList;
+        try {
+            // The reported tether stats are total data usage for all currently-active upstream
+            // interfaces since tethering start.
+            tetherStatsList = mNetd.tetherOffloadGetStats();
+        } catch (RemoteException | ServiceSpecificException e) {
+            mLog.e("Problem fetching tethering stats: ", e);
+            return;
+        }
+        updateQuotaAndStatsFromSnapshot(tetherStatsList);
     }
 
     private void maybeSchedulePollingStats() {
-        if (!mStarted) return;
+        if (!mPollingStarted) return;
 
         if (mHandler.hasCallbacks(mScheduledPollingTask)) {
             mHandler.removeCallbacks(mScheduledPollingTask);
diff --git a/Tethering/src/com/android/networkstack/tethering/Tethering.java b/Tethering/src/com/android/networkstack/tethering/Tethering.java
index cfe9fea..df67458 100644
--- a/Tethering/src/com/android/networkstack/tethering/Tethering.java
+++ b/Tethering/src/com/android/networkstack/tethering/Tethering.java
@@ -1709,7 +1709,7 @@
                 }
 
                 // TODO: Check the upstream interface if it is managed by BPF offload.
-                mBpfCoordinator.start();
+                mBpfCoordinator.startPolling();
             }
 
             @Override
@@ -1722,7 +1722,7 @@
                     mTetherUpstream = null;
                     reportUpstreamChanged(null);
                 }
-                mBpfCoordinator.stop();
+                mBpfCoordinator.stopPolling();
             }
 
             private boolean updateUpstreamWanted() {
diff --git a/Tethering/tests/unit/src/android/net/ip/IpServerTest.java b/Tethering/tests/unit/src/android/net/ip/IpServerTest.java
index 433aacf..c3bc915 100644
--- a/Tethering/tests/unit/src/android/net/ip/IpServerTest.java
+++ b/Tethering/tests/unit/src/android/net/ip/IpServerTest.java
@@ -54,12 +54,14 @@
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import android.app.usage.NetworkStatsManager;
 import android.net.INetd;
 import android.net.InetAddresses;
 import android.net.InterfaceConfigurationParcel;
@@ -69,6 +71,7 @@
 import android.net.MacAddress;
 import android.net.RouteInfo;
 import android.net.TetherOffloadRuleParcel;
+import android.net.TetherStatsParcel;
 import android.net.dhcp.DhcpServingParamsParcel;
 import android.net.dhcp.IDhcpEventCallbacks;
 import android.net.dhcp.IDhcpServer;
@@ -80,14 +83,17 @@
 import android.net.util.InterfaceSet;
 import android.net.util.PrefixUtils;
 import android.net.util.SharedLog;
+import android.os.Handler;
 import android.os.RemoteException;
 import android.os.test.TestLooper;
 import android.text.TextUtils;
 
+import androidx.annotation.NonNull;
 import androidx.test.filters.SmallTest;
 import androidx.test.runner.AndroidJUnit4;
 
 import com.android.networkstack.tethering.BpfCoordinator;
+import com.android.networkstack.tethering.BpfCoordinator.Ipv6ForwardingRule;
 import com.android.networkstack.tethering.PrivateAddressCoordinator;
 
 import org.junit.Before;
@@ -101,6 +107,7 @@
 import org.mockito.MockitoAnnotations;
 
 import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.List;
@@ -127,7 +134,6 @@
     private final IpPrefix mBluetoothPrefix = new IpPrefix("192.168.44.0/24");
 
     @Mock private INetd mNetd;
-    @Mock private BpfCoordinator mBpfCoordinator;
     @Mock private IpServer.Callback mCallback;
     @Mock private SharedLog mSharedLog;
     @Mock private IDhcpServer mDhcpServer;
@@ -135,6 +141,7 @@
     @Mock private IpNeighborMonitor mIpNeighborMonitor;
     @Mock private IpServer.Dependencies mDependencies;
     @Mock private PrivateAddressCoordinator mAddressCoordinator;
+    @Mock private NetworkStatsManager mStatsManager;
 
     @Captor private ArgumentCaptor<DhcpServingParamsParcel> mDhcpParamsCaptor;
 
@@ -144,6 +151,7 @@
     private IpServer mIpServer;
     private InterfaceConfigurationParcel mInterfaceConfiguration;
     private NeighborEventConsumer mNeighborEventConsumer;
+    private BpfCoordinator mBpfCoordinator;
 
     private void initStateMachine(int interfaceType) throws Exception {
         initStateMachine(interfaceType, false /* usingLegacyDhcp */, DEFAULT_USING_BPF_OFFLOAD);
@@ -217,6 +225,10 @@
         MockitoAnnotations.initMocks(this);
         when(mSharedLog.forSubComponent(anyString())).thenReturn(mSharedLog);
         when(mAddressCoordinator.requestDownstreamAddress(any())).thenReturn(mTestAddress);
+
+        BpfCoordinator bc = new BpfCoordinator(new Handler(mLooper.getLooper()), mNetd,
+                mStatsManager, mSharedLog, new BpfCoordinator.Dependencies());
+        mBpfCoordinator = spy(bc);
     }
 
     @Test
@@ -621,6 +633,10 @@
      * (actual: "android.net.TetherOffloadRuleParcel@8c827b0" or some such), but at least it does
      * work.
      *
+     * TODO: consider making the error message more readable by adding a method that catching the
+     * AssertionFailedError and throwing a new assertion with more details. See
+     * NetworkMonitorTest#verifyNetworkTested.
+     *
      * See ConnectivityServiceTest#assertRoutesAdded for an alternative approach which solves the
      * TooManyActualInvocations problem described above by forcing the caller of the custom assert
      * method to specify all expected invocations in one call. This is useful when the stable
@@ -660,6 +676,27 @@
         return argThat(new TetherOffloadRuleParcelMatcher(upstreamIfindex, dst, dstMac));
     }
 
+    private static Ipv6ForwardingRule makeForwardingRule(
+            int upstreamIfindex, @NonNull InetAddress dst, @NonNull MacAddress dstMac) {
+        return new Ipv6ForwardingRule(upstreamIfindex, TEST_IFACE_PARAMS.index,
+                (Inet6Address) dst, TEST_IFACE_PARAMS.macAddr, dstMac);
+    }
+
+    private TetherStatsParcel buildEmptyTetherStatsParcel(int ifIndex) {
+        TetherStatsParcel parcel = new TetherStatsParcel();
+        parcel.ifIndex = ifIndex;
+        return parcel;
+    }
+
+    private void resetNetdAndBpfCoordinator() throws Exception {
+        reset(mNetd, mBpfCoordinator);
+        when(mNetd.tetherOffloadGetStats()).thenReturn(new TetherStatsParcel[0]);
+        when(mNetd.tetherOffloadGetAndClearStats(UPSTREAM_IFINDEX))
+                .thenReturn(buildEmptyTetherStatsParcel(UPSTREAM_IFINDEX));
+        when(mNetd.tetherOffloadGetAndClearStats(UPSTREAM_IFINDEX2))
+                .thenReturn(buildEmptyTetherStatsParcel(UPSTREAM_IFINDEX2));
+    }
+
     @Test
     public void addRemoveipv6ForwardingRules() throws Exception {
         initTetheredStateMachine(TETHERING_WIFI, UPSTREAM_IFACE, false /* usingLegacyDhcp */,
@@ -677,75 +714,100 @@
         final MacAddress macA = MacAddress.fromString("00:00:00:00:00:0a");
         final MacAddress macB = MacAddress.fromString("11:22:33:00:00:0b");
 
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
+        verifyNoMoreInteractions(mBpfCoordinator, mNetd);
+
+        // TODO: Perhaps verify the interaction of tetherOffloadSetInterfaceQuota and
+        // tetherOffloadGetAndClearStats in netd while the rules are changed.
 
         // Events on other interfaces are ignored.
         recvNewNeigh(notMyIfindex, neighA, NUD_REACHABLE, macA);
-        verifyNoMoreInteractions(mNetd);
+        verifyNoMoreInteractions(mBpfCoordinator, mNetd);
 
         // Events on this interface are received and sent to netd.
         recvNewNeigh(myIfindex, neighA, NUD_REACHABLE, macA);
+        verify(mBpfCoordinator).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighA, macA));
         verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neighA, macA));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
         recvNewNeigh(myIfindex, neighB, NUD_REACHABLE, macB);
+        verify(mBpfCoordinator).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighB, macB));
         verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neighB, macB));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
         // Link-local and multicast neighbors are ignored.
         recvNewNeigh(myIfindex, neighLL, NUD_REACHABLE, macA);
-        verifyNoMoreInteractions(mNetd);
+        verifyNoMoreInteractions(mBpfCoordinator, mNetd);
         recvNewNeigh(myIfindex, neighMC, NUD_REACHABLE, macA);
-        verifyNoMoreInteractions(mNetd);
+        verifyNoMoreInteractions(mBpfCoordinator, mNetd);
 
         // A neighbor that is no longer valid causes the rule to be removed.
         // NUD_FAILED events do not have a MAC address.
         recvNewNeigh(myIfindex, neighA, NUD_FAILED, null);
+        verify(mBpfCoordinator).tetherOffloadRuleRemove(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighA, macNull));
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighA, macNull));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
         // A neighbor that is deleted causes the rule to be removed.
         recvDelNeigh(myIfindex, neighB, NUD_STALE, macB);
+        verify(mBpfCoordinator).tetherOffloadRuleRemove(
+                mIpServer,  makeForwardingRule(UPSTREAM_IFINDEX, neighB, macNull));
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighB, macNull));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
-        // Upstream changes result in deleting and re-adding the rules.
+        // Upstream changes result in updating the rules.
         recvNewNeigh(myIfindex, neighA, NUD_REACHABLE, macA);
         recvNewNeigh(myIfindex, neighB, NUD_REACHABLE, macB);
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
         InOrder inOrder = inOrder(mNetd);
         LinkProperties lp = new LinkProperties();
         lp.setInterfaceName(UPSTREAM_IFACE2);
         dispatchTetherConnectionChanged(UPSTREAM_IFACE2, lp, -1);
-        inOrder.verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX2, neighA, macA));
+        verify(mBpfCoordinator).tetherOffloadRuleUpdate(mIpServer, UPSTREAM_IFINDEX2);
         inOrder.verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighA, macA));
-        inOrder.verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX2, neighB, macB));
+        inOrder.verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX2, neighA, macA));
         inOrder.verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighB, macB));
-        reset(mNetd);
+        inOrder.verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX2, neighB, macB));
+        resetNetdAndBpfCoordinator();
 
         // When the upstream is lost, rules are removed.
         dispatchTetherConnectionChanged(null, null, 0);
+        // Clear function is called two times by:
+        // - processMessage CMD_TETHER_CONNECTION_CHANGED for the upstream is lost.
+        // - processMessage CMD_IPV6_TETHER_UPDATE for the IPv6 upstream is lost.
+        // See dispatchTetherConnectionChanged.
+        verify(mBpfCoordinator, times(2)).tetherOffloadRuleClear(mIpServer);
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX2, neighA, macA));
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX2, neighB, macB));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
         // If the upstream is IPv4-only, no rules are added.
         dispatchTetherConnectionChanged(UPSTREAM_IFACE);
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
         recvNewNeigh(myIfindex, neighA, NUD_REACHABLE, macA);
-        verifyNoMoreInteractions(mNetd);
+        // Clear function is called by #updateIpv6ForwardingRules for the IPv6 upstream is lost.
+        verify(mBpfCoordinator).tetherOffloadRuleClear(mIpServer);
+        verifyNoMoreInteractions(mBpfCoordinator, mNetd);
 
         // Rules can be added again once upstream IPv6 connectivity is available.
         lp.setInterfaceName(UPSTREAM_IFACE);
         dispatchTetherConnectionChanged(UPSTREAM_IFACE, lp, -1);
         recvNewNeigh(myIfindex, neighB, NUD_REACHABLE, macB);
+        verify(mBpfCoordinator).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighB, macB));
         verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neighB, macB));
+        verify(mBpfCoordinator, never()).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighA, macA));
         verify(mNetd, never()).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neighA, macA));
 
         // If upstream IPv6 connectivity is lost, rules are removed.
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
         dispatchTetherConnectionChanged(UPSTREAM_IFACE, null, 0);
+        verify(mBpfCoordinator).tetherOffloadRuleClear(mIpServer);
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighB, macB));
 
         // When the interface goes down, rules are removed.
@@ -753,15 +815,20 @@
         dispatchTetherConnectionChanged(UPSTREAM_IFACE, lp, -1);
         recvNewNeigh(myIfindex, neighA, NUD_REACHABLE, macA);
         recvNewNeigh(myIfindex, neighB, NUD_REACHABLE, macB);
+        verify(mBpfCoordinator).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighA, macA));
         verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neighA, macA));
+        verify(mBpfCoordinator).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neighB, macB));
         verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neighB, macB));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
 
         mIpServer.stop();
         mLooper.dispatchAll();
+        verify(mBpfCoordinator).tetherOffloadRuleClear(mIpServer);
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighA, macA));
         verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neighB, macB));
-        reset(mNetd);
+        resetNetdAndBpfCoordinator();
     }
 
     @Test
@@ -771,35 +838,46 @@
         final MacAddress macA = MacAddress.fromString("00:00:00:00:00:0a");
         final MacAddress macNull = MacAddress.fromString("00:00:00:00:00:00");
 
-        reset(mNetd);
-
         // Expect that rules can be only added/removed when the BPF offload config is enabled.
-        // Note that the usingBpfOffload false case is not a realistic test case. Because IP
+        // Note that the BPF offload disabled case is not a realistic test case. Because IP
         // neighbor monitor doesn't start if BPF offload is disabled, there should have no
         // neighbor event listening. This is used for testing the protection check just in case.
-        // TODO: Perhaps remove this test once we don't need this check anymore.
-        for (boolean usingBpfOffload : new boolean[]{true, false}) {
-            initTetheredStateMachine(TETHERING_WIFI, UPSTREAM_IFACE, false /* usingLegacyDhcp */,
-                    usingBpfOffload);
+        // TODO: Perhaps remove the BPF offload disabled case test once this check isn't needed
+        // anymore.
 
-            // A neighbor is added.
-            recvNewNeigh(myIfindex, neigh, NUD_REACHABLE, macA);
-            if (usingBpfOffload) {
-                verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neigh, macA));
-            } else {
-                verify(mNetd, never()).tetherOffloadRuleAdd(any());
-            }
-            reset(mNetd);
+        // [1] Enable BPF offload.
+        // A neighbor that is added or deleted causes the rule to be added or removed.
+        initTetheredStateMachine(TETHERING_WIFI, UPSTREAM_IFACE, false /* usingLegacyDhcp */,
+                true /* usingBpfOffload */);
+        resetNetdAndBpfCoordinator();
 
-            // A neighbor is deleted.
-            recvDelNeigh(myIfindex, neigh, NUD_STALE, macA);
-            if (usingBpfOffload) {
-                verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neigh, macNull));
-            } else {
-                verify(mNetd, never()).tetherOffloadRuleRemove(any());
-            }
-            reset(mNetd);
-        }
+        recvNewNeigh(myIfindex, neigh, NUD_REACHABLE, macA);
+        verify(mBpfCoordinator).tetherOffloadRuleAdd(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neigh, macA));
+        verify(mNetd).tetherOffloadRuleAdd(matches(UPSTREAM_IFINDEX, neigh, macA));
+        resetNetdAndBpfCoordinator();
+
+        recvDelNeigh(myIfindex, neigh, NUD_STALE, macA);
+        verify(mBpfCoordinator).tetherOffloadRuleRemove(
+                mIpServer, makeForwardingRule(UPSTREAM_IFINDEX, neigh, macNull));
+        verify(mNetd).tetherOffloadRuleRemove(matches(UPSTREAM_IFINDEX, neigh, macNull));
+        resetNetdAndBpfCoordinator();
+
+        // [2] Disable BPF offload.
+        // A neighbor that is added or deleted doesn’t cause the rule to be added or removed.
+        initTetheredStateMachine(TETHERING_WIFI, UPSTREAM_IFACE, false /* usingLegacyDhcp */,
+                false /* usingBpfOffload */);
+        resetNetdAndBpfCoordinator();
+
+        recvNewNeigh(myIfindex, neigh, NUD_REACHABLE, macA);
+        verify(mBpfCoordinator, never()).tetherOffloadRuleAdd(any(), any());
+        verify(mNetd, never()).tetherOffloadRuleAdd(any());
+        resetNetdAndBpfCoordinator();
+
+        recvDelNeigh(myIfindex, neigh, NUD_STALE, macA);
+        verify(mBpfCoordinator, never()).tetherOffloadRuleRemove(any(), any());
+        verify(mNetd, never()).tetherOffloadRuleRemove(any());
+        resetNetdAndBpfCoordinator();
     }
 
     @Test
diff --git a/Tethering/tests/unit/src/com/android/networkstack/tethering/BpfCoordinatorTest.java b/Tethering/tests/unit/src/com/android/networkstack/tethering/BpfCoordinatorTest.java
index 3e19ddf..e2d7aab 100644
--- a/Tethering/tests/unit/src/com/android/networkstack/tethering/BpfCoordinatorTest.java
+++ b/Tethering/tests/unit/src/com/android/networkstack/tethering/BpfCoordinatorTest.java
@@ -141,7 +141,7 @@
         setupFunctioningNetdInterface();
 
         final BpfCoordinator coordinator = makeBpfCoordinator();
-        coordinator.start();
+        coordinator.startPolling();
 
         final String wlanIface = "wlan0";
         final Integer wlanIfIndex = 100;
@@ -197,7 +197,7 @@
         // [3] Stop coordinator.
         // Shutdown the coordinator and clear the invocation history, especially the
         // tetherOffloadGetStats() calls.
-        coordinator.stop();
+        coordinator.stopPolling();
         clearInvocations(mNetd);
 
         // Verify the polling update thread stopped.
@@ -211,7 +211,7 @@
         setupFunctioningNetdInterface();
 
         final BpfCoordinator coordinator = makeBpfCoordinator();
-        coordinator.start();
+        coordinator.startPolling();
 
         final String mobileIface = "rmnet_data0";
         final Integer mobileIfIndex = 100;