Add async BufferedFile and StreamingPacketFile impls

Bug: 245971639

Change-Id: Ief1719262c2cb68819f6feb955e97793f3638ec0
diff --git a/staticlibs/Android.bp b/staticlibs/Android.bp
index 904e8c6..b1c653d 100644
--- a/staticlibs/Android.bp
+++ b/staticlibs/Android.bp
@@ -328,6 +328,31 @@
     lint: { strict_updatability_linting: true },
 }
 
+java_library {
+    name: "net-utils-device-common-wear",
+    srcs: [
+        "device/com/android/net/module/util/wear/*.java",
+    ],
+    sdk_version: "module_current",
+    min_sdk_version: "29",
+    visibility: [
+        "//frameworks/libs/net/common/tests:__subpackages__",
+        "//frameworks/libs/net/common/testutils:__subpackages__",
+        "//packages/modules/Connectivity:__subpackages__",
+    ],
+    libs: [
+        "framework-annotations-lib",
+    ],
+    static_libs: [
+        "net-utils-device-common-async",
+    ],
+    apex_available: [
+        "com.android.tethering",
+        "//apex_available:platform",
+    ],
+    lint: { strict_updatability_linting: true },
+}
+
 // Limited set of utilities for use by service-connectivity-mdns-standalone-build-test, to make sure
 // the mDNS code can build with only system APIs.
 // The mDNS code is platform code so it should use framework-annotations-lib, contrary to apps that
diff --git a/staticlibs/device/com/android/net/module/util/async/BufferedFile.java b/staticlibs/device/com/android/net/module/util/async/BufferedFile.java
new file mode 100644
index 0000000..bb5736b
--- /dev/null
+++ b/staticlibs/device/com/android/net/module/util/async/BufferedFile.java
@@ -0,0 +1,292 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.async;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Buffers inbound and outbound file data within given strict limits.
+ *
+ * Automatically manages all readability and writeability events in EventManager:
+ *   - When read buffer has more space - asks EventManager to notify on more data
+ *   - When write buffer has more space - asks the user to provide more data
+ *   - When underlying file cannot accept more data - registers EventManager callback
+ *
+ * @hide
+ */
+public final class BufferedFile implements AsyncFile.Listener {
+    /**
+     * Receives notifications when new data or output space is available.
+     * @hide
+     */
+    public interface Listener {
+        /** Invoked after the underlying file has been closed. */
+        void onBufferedFileClosed();
+
+        /** Invoked when there's new data in the inbound buffer. */
+        void onBufferedFileInboundData(int readByteCount);
+
+        /** Notifies on data being flushed from output buffer. */
+        void onBufferedFileOutboundSpace();
+
+        /** Notifies on unrecoverable error in file access. */
+        void onBufferedFileIoError(String message);
+    }
+
+    private final Listener mListener;
+    private final EventManager mEventManager;
+    private AsyncFile mFile;
+
+    private final CircularByteBuffer mInboundBuffer;
+    private final AtomicLong mTotalBytesRead = new AtomicLong();
+    private boolean mIsReadingShutdown;
+
+    private final CircularByteBuffer mOutboundBuffer;
+    private final AtomicLong mTotalBytesWritten = new AtomicLong();
+
+    /** Creates BufferedFile based on the given file descriptor. */
+    public static BufferedFile create(
+            EventManager eventManager,
+            FileHandle fileHandle,
+            Listener listener,
+            int inboundBufferSize,
+            int outboundBufferSize) throws IOException {
+        if (fileHandle == null) {
+            throw new NullPointerException();
+        }
+        BufferedFile file = new BufferedFile(
+            eventManager, listener, inboundBufferSize, outboundBufferSize);
+        file.mFile = eventManager.registerFile(fileHandle, file);
+        return file;
+    }
+
+    private BufferedFile(
+            EventManager eventManager,
+            Listener listener,
+            int inboundBufferSize,
+            int outboundBufferSize) {
+        if (eventManager == null || listener == null) {
+            throw new NullPointerException();
+        }
+        mEventManager = eventManager;
+        mListener = listener;
+
+        mInboundBuffer = new CircularByteBuffer(inboundBufferSize);
+        mOutboundBuffer = new CircularByteBuffer(outboundBufferSize);
+    }
+
+    /** Requests this file to be closed. */
+    public void close() {
+        mFile.close();
+    }
+
+    @Override
+    public void onClosed(AsyncFile file) {
+        mListener.onBufferedFileClosed();
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // READ PATH
+    ///////////////////////////////////////////////////////////////////////////
+
+    /** Returns buffer that is automatically filled with inbound data. */
+    public ReadableByteBuffer getInboundBuffer() {
+        return mInboundBuffer;
+    }
+
+    public int getInboundBufferFreeSizeForTest() {
+        return mInboundBuffer.freeSize();
+    }
+
+    /** Permanently disables reading of this file, and clears all buffered data. */
+    public void shutdownReading() {
+        mIsReadingShutdown = true;
+        mInboundBuffer.clear();
+        mFile.enableReadEvents(false);
+    }
+
+    /** Returns true after shutdownReading() has been called. */
+    public boolean isReadingShutdown() {
+        return mIsReadingShutdown;
+    }
+
+    /** Starts or resumes async read operations on this file. */
+    public void continueReading() {
+        if (!mIsReadingShutdown && mInboundBuffer.freeSize() > 0) {
+            mFile.enableReadEvents(true);
+        }
+    }
+
+    @Override
+    public void onReadReady(AsyncFile file) {
+        if (mIsReadingShutdown) {
+            return;
+        }
+
+        int readByteCount;
+        try {
+            readByteCount = bufferInputData();
+        } catch (IOException e) {
+            mListener.onBufferedFileIoError("IOException while reading: " + e.toString());
+            return;
+        }
+
+        if (readByteCount > 0) {
+            mListener.onBufferedFileInboundData(readByteCount);
+        }
+
+        continueReading();
+    }
+
+    private int bufferInputData() throws IOException {
+        int totalReadCount = 0;
+        while (true) {
+            final int maxReadCount = mInboundBuffer.getDirectWriteSize();
+            if (maxReadCount == 0) {
+                mFile.enableReadEvents(false);
+                break;
+            }
+
+            final int bufferOffset = mInboundBuffer.getDirectWritePos();
+            final byte[] buffer = mInboundBuffer.getDirectWriteBuffer();
+
+            final int readCount = mFile.read(buffer, bufferOffset, maxReadCount);
+            if (readCount <= 0) {
+                break;
+            }
+
+            mInboundBuffer.accountForDirectWrite(readCount);
+            totalReadCount += readCount;
+        }
+
+        mTotalBytesRead.addAndGet(totalReadCount);
+        return totalReadCount;
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // WRITE PATH
+    ///////////////////////////////////////////////////////////////////////////
+
+    /** Returns the number of bytes currently buffered for output. */
+    public int getOutboundBufferSize() {
+        return mOutboundBuffer.size();
+    }
+
+    /** Returns the number of bytes currently available for buffering for output. */
+    public int getOutboundBufferFreeSize() {
+        return mOutboundBuffer.freeSize();
+    }
+
+    /**
+     * Queues the given data for output.
+     * Throws runtime exception if there is not enough space.
+     */
+    public boolean enqueueOutboundData(byte[] data, int pos, int len) {
+        return enqueueOutboundData(data, pos, len, null, 0, 0);
+    }
+
+    /**
+     * Queues data1, then data2 for output.
+     * Throws runtime exception if there is not enough space.
+     */
+    public boolean enqueueOutboundData(
+            byte[] data1, int pos1, int len1,
+            byte[] buffer2, int pos2, int len2) {
+        Assertions.throwsIfOutOfBounds(data1, pos1, len1);
+        Assertions.throwsIfOutOfBounds(buffer2, pos2, len2);
+
+        final int totalLen = len1 + len2;
+
+        if (totalLen > mOutboundBuffer.freeSize()) {
+            flushOutboundBuffer();
+
+            if (totalLen > mOutboundBuffer.freeSize()) {
+                return false;
+            }
+        }
+
+        mOutboundBuffer.writeBytes(data1, pos1, len1);
+
+        if (buffer2 != null) {
+            mOutboundBuffer.writeBytes(buffer2, pos2, len2);
+        }
+
+        flushOutboundBuffer();
+
+        return true;
+    }
+
+    private void flushOutboundBuffer() {
+        try {
+            while (mOutboundBuffer.getDirectReadSize() > 0) {
+                final int maxReadSize = mOutboundBuffer.getDirectReadSize();
+                final int writeCount = mFile.write(
+                    mOutboundBuffer.getDirectReadBuffer(),
+                    mOutboundBuffer.getDirectReadPos(),
+                    maxReadSize);
+
+                if (writeCount == 0) {
+                    mFile.enableWriteEvents(true);
+                    break;
+                }
+
+                if (writeCount > maxReadSize) {
+                    throw new IllegalArgumentException(
+                        "Write count " + writeCount + " above max " + maxReadSize);
+                }
+
+                mOutboundBuffer.accountForDirectRead(writeCount);
+            }
+        } catch (IOException e) {
+            scheduleOnIoError("IOException while writing: " + e.toString());
+        }
+    }
+
+    private void scheduleOnIoError(String message) {
+        mEventManager.execute(() -> {
+            mListener.onBufferedFileIoError(message);
+        });
+    }
+
+    @Override
+    public void onWriteReady(AsyncFile file) {
+        mFile.enableWriteEvents(false);
+        flushOutboundBuffer();
+        mListener.onBufferedFileOutboundSpace();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("file={");
+        sb.append(mFile);
+        sb.append("}");
+        if (mIsReadingShutdown) {
+            sb.append(", readingShutdown");
+        }
+        sb.append("}, inboundBuffer={");
+        sb.append(mInboundBuffer);
+        sb.append("}, outboundBuffer={");
+        sb.append(mOutboundBuffer);
+        sb.append("}, totalBytesRead=");
+        sb.append(mTotalBytesRead);
+        sb.append(", totalBytesWritten=");
+        sb.append(mTotalBytesWritten);
+        return sb.toString();
+    }
+}
diff --git a/staticlibs/device/com/android/net/module/util/wear/NetPacketHelpers.java b/staticlibs/device/com/android/net/module/util/wear/NetPacketHelpers.java
new file mode 100644
index 0000000..341c44b
--- /dev/null
+++ b/staticlibs/device/com/android/net/module/util/wear/NetPacketHelpers.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+import com.android.net.module.util.async.ReadableByteBuffer;
+
+/**
+ * Implements utilities for decoding parts of TCP/UDP/IP headers.
+ *
+ * @hide
+ */
+final class NetPacketHelpers {
+    static void encodeNetworkUnsignedInt16(int value, byte[] dst, final int dstPos) {
+        dst[dstPos] = (byte) ((value >> 8) & 0xFF);
+        dst[dstPos + 1] = (byte) (value & 0xFF);
+    }
+
+    static int decodeNetworkUnsignedInt16(byte[] data, final int pos) {
+        return ((data[pos] & 0xFF) << 8) | (data[pos + 1] & 0xFF);
+    }
+
+    static int decodeNetworkUnsignedInt16(ReadableByteBuffer data, final int pos) {
+        return ((data.peek(pos) & 0xFF) << 8) | (data.peek(pos + 1) & 0xFF);
+    }
+
+    private NetPacketHelpers() {}
+}
diff --git a/staticlibs/device/com/android/net/module/util/wear/PacketFile.java b/staticlibs/device/com/android/net/module/util/wear/PacketFile.java
new file mode 100644
index 0000000..7f5ed78
--- /dev/null
+++ b/staticlibs/device/com/android/net/module/util/wear/PacketFile.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+/**
+ * Defines bidirectional file where all transmissions are made as complete packets.
+ *
+ * Automatically manages all readability and writeability events in EventManager:
+ *   - When read buffer has more space - asks EventManager to notify on more data
+ *   - When write buffer has more space - asks the user to provide more data
+ *   - When underlying file cannot accept more data - registers EventManager callback
+ *
+ * @hide
+ */
+public interface PacketFile {
+    /** @hide */
+    public enum ErrorCode {
+        UNEXPECTED_ERROR,
+        IO_ERROR,
+        INBOUND_PACKET_TOO_LARGE,
+        OUTBOUND_PACKET_TOO_LARGE,
+    }
+
+    /**
+     * Receives notifications when new data or output space is available.
+     *
+     * @hide
+     */
+    public interface Listener {
+        /**
+         * Handles the initial part of the stream, which on some systems provides lower-level
+         * configuration data.
+         *
+         * Returns the number of bytes consumed, or zero if the preamble has been fully read.
+         */
+        int onPreambleData(byte[] data, int pos, int len);
+
+        /** Handles one extracted packet. */
+        void onInboundPacket(byte[] data, int pos, int len);
+
+        /** Notifies on new data being added to the buffer. */
+        void onInboundBuffered(int newByteCount, int totalBufferedSize);
+
+        /** Notifies on data being flushed from output buffer. */
+        void onOutboundPacketSpace();
+
+        /** Notifies on unrecoverable error in the packet processing. */
+        void onPacketFileError(ErrorCode error, String message);
+    }
+
+    /** Requests this file to be closed. */
+    void close();
+
+    /** Permanently disables reading of this file, and clears all buffered data. */
+    void shutdownReading();
+
+    /** Starts or resumes async read operations on this file. */
+    void continueReading();
+
+    /** Returns the number of bytes currently buffered as input. */
+    int getInboundBufferSize();
+
+    /** Returns the number of bytes currently available for buffering for output. */
+    int getOutboundFreeSize();
+
+    /**
+     * Queues the given data for output.
+     * Throws runtime exception if there is not enough space.
+     */
+    boolean enqueueOutboundPacket(byte[] data, int pos, int len);
+}
diff --git a/staticlibs/device/com/android/net/module/util/wear/StreamingPacketFile.java b/staticlibs/device/com/android/net/module/util/wear/StreamingPacketFile.java
new file mode 100644
index 0000000..52dbee4
--- /dev/null
+++ b/staticlibs/device/com/android/net/module/util/wear/StreamingPacketFile.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+import com.android.net.module.util.async.BufferedFile;
+import com.android.net.module.util.async.EventManager;
+import com.android.net.module.util.async.FileHandle;
+import com.android.net.module.util.async.Assertions;
+import com.android.net.module.util.async.ReadableByteBuffer;
+
+import java.io.IOException;
+
+/**
+ * Implements PacketFile based on a streaming file descriptor.
+ *
+ * Packets are delineated using network-order 2-byte length indicators.
+ *
+ * @hide
+ */
+public final class StreamingPacketFile implements PacketFile, BufferedFile.Listener {
+    private static final int HEADER_SIZE = 2;
+
+    private final EventManager mEventManager;
+    private final Listener mListener;
+    private final BufferedFile mFile;
+    private final int mMaxPacketSize;
+    private final ReadableByteBuffer mInboundBuffer;
+    private boolean mIsInPreamble = true;
+
+    private final byte[] mTempPacketReadBuffer;
+    private final byte[] mTempHeaderWriteBuffer;
+
+    public StreamingPacketFile(
+            EventManager eventManager,
+            FileHandle fileHandle,
+            Listener listener,
+            int maxPacketSize,
+            int maxBufferedInboundPackets,
+            int maxBufferedOutboundPackets) throws IOException {
+        if (eventManager == null || fileHandle == null || listener == null) {
+            throw new NullPointerException();
+        }
+
+        mEventManager = eventManager;
+        mListener = listener;
+        mMaxPacketSize = maxPacketSize;
+
+        final int maxTotalLength = HEADER_SIZE + maxPacketSize;
+
+        mFile = BufferedFile.create(eventManager, fileHandle, this,
+            maxTotalLength * maxBufferedInboundPackets,
+            maxTotalLength * maxBufferedOutboundPackets);
+        mInboundBuffer = mFile.getInboundBuffer();
+
+        mTempPacketReadBuffer = new byte[maxTotalLength];
+        mTempHeaderWriteBuffer = new byte[HEADER_SIZE];
+    }
+
+    @Override
+    public void close() {
+        mFile.close();
+    }
+
+    public BufferedFile getUnderlyingFileForTest() {
+        return mFile;
+    }
+
+    @Override
+    public void shutdownReading() {
+        mFile.shutdownReading();
+    }
+
+    @Override
+    public void continueReading() {
+        mFile.continueReading();
+    }
+
+    @Override
+    public int getInboundBufferSize() {
+        return mInboundBuffer.size();
+    }
+
+    @Override
+    public void onBufferedFileClosed() {
+    }
+
+    @Override
+    public void onBufferedFileInboundData(int readByteCount) {
+        if (mFile.isReadingShutdown()) {
+            return;
+        }
+
+        if (readByteCount > 0) {
+            mListener.onInboundBuffered(readByteCount, mInboundBuffer.size());
+        }
+
+        if (extractOnePacket() && !mFile.isReadingShutdown()) {
+            // There could be more packets already buffered, continue parsing next
+            // packet even before another read event comes
+            mEventManager.execute(() -> {
+                onBufferedFileInboundData(0);
+            });
+        } else {
+            continueReading();
+        }
+    }
+
+    private boolean extractOnePacket() {
+        while (mIsInPreamble) {
+            final int directReadSize = Math.min(
+                mInboundBuffer.getDirectReadSize(), mTempPacketReadBuffer.length);
+            if (directReadSize == 0) {
+                return false;
+            }
+
+            // Copy for safety, so higher-level callback cannot modify the data.
+            System.arraycopy(mInboundBuffer.getDirectReadBuffer(),
+                mInboundBuffer.getDirectReadPos(), mTempPacketReadBuffer, 0, directReadSize);
+
+            final int preambleConsumedBytes = mListener.onPreambleData(
+                mTempPacketReadBuffer, 0, directReadSize);
+            if (mFile.isReadingShutdown()) {
+                return false;  // The callback has called shutdownReading().
+            }
+
+            if (preambleConsumedBytes == 0) {
+                mIsInPreamble = false;
+                break;
+            }
+
+            mInboundBuffer.accountForDirectRead(preambleConsumedBytes);
+        }
+
+        final int bufferedSize = mInboundBuffer.size();
+        if (bufferedSize < HEADER_SIZE) {
+            return false;
+        }
+
+        final int dataLength = NetPacketHelpers.decodeNetworkUnsignedInt16(mInboundBuffer, 0);
+        if (dataLength > mMaxPacketSize) {
+            mListener.onPacketFileError(
+                PacketFile.ErrorCode.INBOUND_PACKET_TOO_LARGE,
+                "Inbound packet length: " + dataLength);
+            return false;
+        }
+
+        final int totalLength = HEADER_SIZE + dataLength;
+        if (bufferedSize < totalLength) {
+            return false;
+        }
+
+        mInboundBuffer.readBytes(mTempPacketReadBuffer, 0, totalLength);
+
+        mListener.onInboundPacket(mTempPacketReadBuffer, HEADER_SIZE, dataLength);
+        return true;
+    }
+
+    @Override
+    public int getOutboundFreeSize() {
+        final int freeSize = mFile.getOutboundBufferFreeSize();
+        return (freeSize > HEADER_SIZE ? freeSize - HEADER_SIZE : 0);
+    }
+
+    @Override
+    public boolean enqueueOutboundPacket(byte[] buffer, int pos, int len) {
+        Assertions.throwsIfOutOfBounds(buffer, pos, len);
+
+        if (len == 0) {
+            return true;
+        }
+
+        if (len > mMaxPacketSize) {
+            mListener.onPacketFileError(
+                PacketFile.ErrorCode.OUTBOUND_PACKET_TOO_LARGE,
+                "Outbound packet length: " + len);
+            return false;
+        }
+
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len, mTempHeaderWriteBuffer, 0);
+
+        mFile.enqueueOutboundData(
+            mTempHeaderWriteBuffer, 0, mTempHeaderWriteBuffer.length,
+            buffer, pos, len);
+        return true;
+    }
+
+    @Override
+    public void onBufferedFileOutboundSpace() {
+        mListener.onOutboundPacketSpace();
+    }
+
+    @Override
+    public void onBufferedFileIoError(String message) {
+        mListener.onPacketFileError(PacketFile.ErrorCode.IO_ERROR, message);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("maxPacket=");
+        sb.append(mMaxPacketSize);
+        sb.append(", file={");
+        sb.append(mFile);
+        sb.append("}");
+        return sb.toString();
+    }
+}
diff --git a/staticlibs/tests/unit/Android.bp b/staticlibs/tests/unit/Android.bp
index 6e223bd..40371e6 100644
--- a/staticlibs/tests/unit/Android.bp
+++ b/staticlibs/tests/unit/Android.bp
@@ -21,6 +21,7 @@
         "net-utils-device-common-async",
         "net-utils-device-common-bpf",
         "net-utils-device-common-ip",
+        "net-utils-device-common-wear",
     ],
     libs: [
         "android.test.runner",
diff --git a/staticlibs/tests/unit/src/com/android/net/module/util/async/BufferedFileTest.java b/staticlibs/tests/unit/src/com/android/net/module/util/async/BufferedFileTest.java
new file mode 100644
index 0000000..11a74f2
--- /dev/null
+++ b/staticlibs/tests/unit/src/com/android/net/module/util/async/BufferedFileTest.java
@@ -0,0 +1,376 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.async;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.ignoreStubs;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import android.os.ParcelFileDescriptor;
+
+import androidx.test.filters.SmallTest;
+import androidx.test.runner.AndroidJUnit4;
+
+import com.android.testutils.async.ReadableDataAnswer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@RunWith(AndroidJUnit4.class)
+@SmallTest
+public class BufferedFileTest {
+    @Mock EventManager mockEventManager;
+    @Mock BufferedFile.Listener mockFileListener;
+    @Mock AsyncFile mockAsyncFile;
+    @Mock ParcelFileDescriptor mockParcelFileDescriptor;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager));
+    }
+
+    @Test
+    public void onClosed() throws Exception {
+        final int inboundBufferSize = 1024;
+        final int outboundBufferSize = 768;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        file.onClosed(mockAsyncFile);
+
+        verify(mockFileListener).onBufferedFileClosed();
+    }
+
+    @Test
+    public void continueReadingAndClose() throws Exception {
+        final int inboundBufferSize = 1024;
+        final int outboundBufferSize = 768;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        assertEquals(inboundBufferSize, file.getInboundBufferFreeSizeForTest());
+        assertEquals(outboundBufferSize, file.getOutboundBufferFreeSize());
+
+        file.continueReading();
+        verify(mockAsyncFile).enableReadEvents(true);
+
+        file.close();
+        verify(mockAsyncFile).close();
+    }
+
+    @Test
+    public void enqueueOutboundData() throws Exception {
+        final int inboundBufferSize = 10;
+        final int outboundBufferSize = 250;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+
+        assertEquals(0, file.getOutboundBufferSize());
+
+        final int totalLen = data1.length + data2.length;
+
+        when(mockAsyncFile.write(any(), anyInt(), anyInt())).thenReturn(0);
+        assertTrue(file.enqueueOutboundData(data1, 0, data1.length, null, 0, 0));
+        verify(mockAsyncFile).enableWriteEvents(true);
+
+        assertEquals(data1.length, file.getOutboundBufferSize());
+
+        checkAndResetMocks();
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture())).thenReturn(totalLen);
+
+        assertTrue(file.enqueueOutboundData(data2, 0, data2.length, null, 0, 0));
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(0, file.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(totalLen, lenCaptor.getValue().intValue());
+        assertEquals(data1[0], arrayCaptor.getValue()[0]);
+        assertEquals(data2[0], arrayCaptor.getValue()[data1.length]);
+    }
+
+    @Test
+    public void enqueueOutboundData_combined() throws Exception {
+        final int inboundBufferSize = 10;
+        final int outboundBufferSize = 250;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+
+        assertEquals(0, file.getOutboundBufferSize());
+
+        final int totalLen = data1.length + data2.length;
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture())).thenReturn(totalLen);
+
+        assertTrue(file.enqueueOutboundData(data1, 0, data1.length, data2, 0, data2.length));
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(0, file.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(totalLen, lenCaptor.getValue().intValue());
+        assertEquals(data1[0], arrayCaptor.getValue()[0]);
+        assertEquals(data2[0], arrayCaptor.getValue()[data1.length]);
+    }
+
+    @Test
+    public void enableWriteEvents() throws Exception {
+        final int inboundBufferSize = 10;
+        final int outboundBufferSize = 250;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        final byte[] data3 = new byte[103];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+        data3[0] = (byte) 3;
+
+        assertEquals(0, file.getOutboundBufferSize());
+
+        // Write first 2 buffers, but fail to flush them, causing async write request.
+        final int data1And2Len = data1.length + data2.length;
+        when(mockAsyncFile.write(any(), eq(0), eq(data1And2Len))).thenReturn(0);
+        assertTrue(file.enqueueOutboundData(data1, 0, data1.length, data2, 0, data2.length));
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(data1And2Len, file.getOutboundBufferSize());
+        verify(mockAsyncFile).enableWriteEvents(true);
+
+        // Try to write 3rd buffers, which won't fit, then fail to flush.
+        when(mockAsyncFile.write(any(), eq(0), eq(data1And2Len))).thenReturn(0);
+        assertFalse(file.enqueueOutboundData(data3, 0, data3.length, null, 0, 0));
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(data1And2Len, file.getOutboundBufferSize());
+        verify(mockAsyncFile, times(2)).enableWriteEvents(true);
+
+        checkAndResetMocks();
+
+        // Simulate writeability event, and successfully flush.
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(arrayCaptor.capture(),
+                posCaptor.capture(), lenCaptor.capture())).thenReturn(data1And2Len);
+        file.onWriteReady(mockAsyncFile);
+        verify(mockAsyncFile).enableWriteEvents(false);
+        verify(mockFileListener).onBufferedFileOutboundSpace();
+        assertEquals(0, file.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(data1And2Len, lenCaptor.getValue().intValue());
+        assertEquals(data1[0], arrayCaptor.getValue()[0]);
+        assertEquals(data2[0], arrayCaptor.getValue()[data1.length]);
+
+        checkAndResetMocks();
+
+        // Now write, but fail to flush the third buffer.
+        when(mockAsyncFile.write(arrayCaptor.capture(),
+                posCaptor.capture(), lenCaptor.capture())).thenReturn(0);
+        assertTrue(file.enqueueOutboundData(data3, 0, data3.length, null, 0, 0));
+        verify(mockAsyncFile).enableWriteEvents(true);
+        assertEquals(data3.length, file.getOutboundBufferSize());
+
+        assertEquals(data1And2Len, posCaptor.getValue().intValue());
+        assertEquals(outboundBufferSize - data1And2Len, lenCaptor.getValue().intValue());
+        assertEquals(data3[0], arrayCaptor.getValue()[data1And2Len]);
+    }
+
+    @Test
+    public void read() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data1, data2);
+        final ReadableByteBuffer inboundBuffer = file.getInboundBuffer();
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile).enableReadEvents(true);
+        verify(mockFileListener).onBufferedFileInboundData(eq(data1.length + data2.length));
+
+        assertEquals(0, file.getOutboundBufferSize());
+        assertEquals(data1.length + data2.length, inboundBuffer.size());
+        assertEquals((byte) 1, inboundBuffer.peek(0));
+        assertEquals((byte) 2, inboundBuffer.peek(data1.length));
+    }
+
+    @Test
+    public void enableReadEvents() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        final byte[] data3 = new byte[103];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+        data3[0] = (byte) 3;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data1, data2, data3);
+        final ReadableByteBuffer inboundBuffer = file.getInboundBuffer();
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile).enableReadEvents(false);
+        verify(mockFileListener).onBufferedFileInboundData(eq(inboundBufferSize));
+
+        assertEquals(0, file.getOutboundBufferSize());
+        assertEquals(inboundBufferSize, inboundBuffer.size());
+        assertEquals((byte) 1, inboundBuffer.peek(0));
+        assertEquals((byte) 2, inboundBuffer.peek(data1.length));
+        assertEquals((byte) 3, inboundBuffer.peek(data1.length + data2.length));
+
+        checkAndResetMocks();
+
+        // Cannot enable read events since the buffer is full.
+        file.continueReading();
+
+        checkAndResetMocks();
+
+        final byte[] tmp = new byte[inboundBufferSize];
+        inboundBuffer.readBytes(tmp, 0, data1.length);
+        assertEquals(inboundBufferSize - data1.length, inboundBuffer.size());
+
+        file.continueReading();
+
+        inboundBuffer.readBytes(tmp, 0, data2.length);
+        assertEquals(inboundBufferSize - data1.length - data2.length, inboundBuffer.size());
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile, times(2)).enableReadEvents(true);
+        verify(mockFileListener).onBufferedFileInboundData(
+            eq(data1.length + data2.length + data3.length - inboundBufferSize));
+
+        assertEquals(data3.length, inboundBuffer.size());
+        assertEquals((byte) 3, inboundBuffer.peek(0));
+    }
+
+    @Test
+    public void shutdownReading() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data = new byte[100];
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+
+        file.shutdownReading();
+        file.onReadReady(mockAsyncFile);
+
+        verify(mockAsyncFile).enableReadEvents(false);
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(data.length, dataAnswer.getRemainingSize());
+    }
+
+    @Test
+    public void shutdownReading_inCallback() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data = new byte[100];
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) {
+                file.shutdownReading();
+                return null;
+            }}).when(mockFileListener).onBufferedFileInboundData(anyInt());
+
+        file.onReadReady(mockAsyncFile);
+
+        verify(mockAsyncFile).enableReadEvents(false);
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(0, dataAnswer.getRemainingSize());
+    }
+
+    private void checkAndResetMocks() {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager,
+            mockParcelFileDescriptor));
+        reset(mockFileListener, mockAsyncFile, mockEventManager);
+    }
+
+    private BufferedFile createFile(
+            int inboundBufferSize, int outboundBufferSize) throws Exception {
+        when(mockEventManager.registerFile(any(), any())).thenReturn(mockAsyncFile);
+        return BufferedFile.create(
+            mockEventManager,
+            FileHandle.fromFileDescriptor(mockParcelFileDescriptor),
+            mockFileListener,
+            inboundBufferSize,
+            outboundBufferSize);
+    }
+}
diff --git a/staticlibs/tests/unit/src/com/android/net/module/util/wear/NetPacketHelpersTest.java b/staticlibs/tests/unit/src/com/android/net/module/util/wear/NetPacketHelpersTest.java
new file mode 100644
index 0000000..23e7b15
--- /dev/null
+++ b/staticlibs/tests/unit/src/com/android/net/module/util/wear/NetPacketHelpersTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+import static org.junit.Assert.assertEquals;
+
+import androidx.test.filters.SmallTest;
+import androidx.test.runner.AndroidJUnit4;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.android.net.module.util.async.CircularByteBuffer;
+
+@RunWith(AndroidJUnit4.class)
+@SmallTest
+public class NetPacketHelpersTest {
+    @Test
+    public void decodeNetworkUnsignedInt16() {
+        final byte[] data = new byte[4];
+        data[0] = (byte) 0xFF;
+        data[1] = (byte) 1;
+        data[2] = (byte) 2;
+        data[3] = (byte) 0xFF;
+
+        assertEquals(0x0102, NetPacketHelpers.decodeNetworkUnsignedInt16(data, 1));
+
+        CircularByteBuffer buffer = new CircularByteBuffer(100);
+        buffer.writeBytes(data, 0, data.length);
+
+        assertEquals(0x0102, NetPacketHelpers.decodeNetworkUnsignedInt16(buffer, 1));
+    }
+
+    @Test
+    public void encodeNetworkUnsignedInt16() {
+        final byte[] data = new byte[4];
+        data[0] = (byte) 0xFF;
+        data[3] = (byte) 0xFF;
+        NetPacketHelpers.encodeNetworkUnsignedInt16(0x0102, data, 1);
+
+        assertEquals((byte) 0xFF, data[0]);
+        assertEquals((byte) 1, data[1]);
+        assertEquals((byte) 2, data[2]);
+        assertEquals((byte) 0xFF, data[3]);
+    }
+}
diff --git a/staticlibs/tests/unit/src/com/android/net/module/util/wear/StreamingPacketFileTest.java b/staticlibs/tests/unit/src/com/android/net/module/util/wear/StreamingPacketFileTest.java
new file mode 100644
index 0000000..1fcca70
--- /dev/null
+++ b/staticlibs/tests/unit/src/com/android/net/module/util/wear/StreamingPacketFileTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.ignoreStubs;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import android.os.ParcelFileDescriptor;
+
+import androidx.test.filters.SmallTest;
+import androidx.test.runner.AndroidJUnit4;
+
+import com.android.net.module.util.async.AsyncFile;
+import com.android.net.module.util.async.BufferedFile;
+import com.android.net.module.util.async.EventManager;
+import com.android.net.module.util.async.FileHandle;
+import com.android.net.module.util.async.ReadableByteBuffer;
+import com.android.testutils.async.ReadableDataAnswer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@RunWith(AndroidJUnit4.class)
+@SmallTest
+public class StreamingPacketFileTest {
+    private static final int MAX_PACKET_SIZE = 100;
+
+    @Mock EventManager mockEventManager;
+    @Mock PacketFile.Listener mockFileListener;
+    @Mock AsyncFile mockAsyncFile;
+    @Mock ParcelFileDescriptor mockParcelFileDescriptor;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager));
+    }
+
+    @Test
+    public void continueReadingAndClose() throws Exception {
+        final int maxBufferedInboundPackets = 3;
+        final int maxBufferedOutboundPackets = 5;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+
+        assertEquals(maxBufferedInboundPackets * (MAX_PACKET_SIZE + 2),
+            bufferedFile.getInboundBufferFreeSizeForTest());
+        assertEquals(maxBufferedOutboundPackets * (MAX_PACKET_SIZE + 2),
+            bufferedFile.getOutboundBufferFreeSize());
+        assertEquals(bufferedFile.getOutboundBufferFreeSize() - 2,
+            file.getOutboundFreeSize());
+
+        file.continueReading();
+        verify(mockAsyncFile).enableReadEvents(true);
+
+        file.close();
+        verify(mockAsyncFile).close();
+    }
+
+    @Test
+    public void enqueueOutboundPacket() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+
+        final byte[] packet1 = new byte[11];
+        final byte[] packet2 = new byte[12];
+        packet1[0] = (byte) 1;
+        packet2[0] = (byte) 2;
+
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+
+        when(mockAsyncFile.write(any(), anyInt(), anyInt())).thenReturn(0);
+        assertTrue(file.enqueueOutboundPacket(packet1, 0, packet1.length));
+        verify(mockAsyncFile).enableWriteEvents(true);
+
+        assertEquals(packet1.length + 2, bufferedFile.getOutboundBufferSize());
+
+        checkAndResetMocks();
+
+        final int totalLen = packet1.length + packet2.length + 4;
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture())).thenReturn(totalLen);
+
+        assertTrue(file.enqueueOutboundPacket(packet2, 0, packet2.length));
+
+        assertEquals(0, bufferedFile.getInboundBuffer().size());
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(totalLen, lenCaptor.getValue().intValue());
+
+        final byte[] capturedData = arrayCaptor.getValue();
+        assertEquals(packet1.length, NetPacketHelpers.decodeNetworkUnsignedInt16(capturedData, 0));
+        assertEquals(packet2.length,
+            NetPacketHelpers.decodeNetworkUnsignedInt16(capturedData, packet1.length + 2));
+        assertEquals(packet1[0], capturedData[2]);
+        assertEquals(packet2[0], capturedData[packet1.length + 4]);
+    }
+
+    @Test
+    public void onInboundPacket() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+        final ReadableByteBuffer inboundBuffer = bufferedFile.getInboundBuffer();
+
+        final int len1 = 11;
+        final int len2 = 12;
+        final byte[] data = new byte[len1 + len2 + 4];
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len1, data, 0);
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len2, data, 11 + 2);
+        data[2] = (byte) 1;
+        data[len1 + 4] = (byte) 2;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        when(mockFileListener.onPreambleData(any(), eq(0), eq(data.length))).thenReturn(0);
+        bufferedFile.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile).enableReadEvents(true);
+        verify(mockFileListener).onInboundBuffered(data.length, data.length);
+        verify(mockFileListener).onInboundPacket(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture());
+        verify(mockEventManager).execute(any());
+
+        byte[] capturedData = arrayCaptor.getValue();
+        assertEquals(2, posCaptor.getValue().intValue());
+        assertEquals(len1, lenCaptor.getValue().intValue());
+        assertEquals((byte) 1, capturedData[2]);
+
+        checkAndResetMocks();
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onBufferedFileInboundData(0);
+        verify(mockFileListener).onInboundPacket(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture());
+        verify(mockEventManager).execute(any());
+
+        capturedData = arrayCaptor.getValue();
+        assertEquals(2, posCaptor.getValue().intValue());
+        assertEquals(len2, lenCaptor.getValue().intValue());
+        assertEquals((byte) 2, capturedData[2]);
+
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+        assertEquals(0, inboundBuffer.size());
+    }
+
+    @Test
+    public void onReadReady_preambleData() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+        final ReadableByteBuffer inboundBuffer = bufferedFile.getInboundBuffer();
+
+        final int preambleLen = 23;
+        final int len1 = 11;
+        final byte[] data = new byte[preambleLen + 2 + len1];
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len1, data, preambleLen);
+        data[preambleLen + 2] = (byte) 1;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        when(mockFileListener.onPreambleData(any(), eq(0), eq(data.length))).thenReturn(5);
+        when(mockFileListener.onPreambleData(
+            any(), eq(0), eq(data.length - 5))).thenReturn(preambleLen - 5);
+        when(mockFileListener.onPreambleData(
+            any(), eq(0), eq(data.length - preambleLen))).thenReturn(0);
+
+        bufferedFile.onReadReady(mockAsyncFile);
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        verify(mockFileListener).onInboundBuffered(data.length, data.length);
+        verify(mockFileListener).onInboundPacket(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture());
+        verify(mockEventManager).execute(any());
+        verify(mockAsyncFile).enableReadEvents(true);
+
+        final byte[] capturedData = arrayCaptor.getValue();
+        assertEquals(2, posCaptor.getValue().intValue());
+        assertEquals(len1, lenCaptor.getValue().intValue());
+        assertEquals((byte) 1, capturedData[2]);
+
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+        assertEquals(0, inboundBuffer.size());
+    }
+
+    @Test
+    public void shutdownReading() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+
+        final byte[] data = new byte[100];
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) {
+                file.shutdownReading();
+                return Integer.valueOf(-1);
+            }}).when(mockFileListener).onPreambleData(any(), anyInt(), anyInt());
+
+        bufferedFile.onReadReady(mockAsyncFile);
+
+        verify(mockFileListener).onInboundBuffered(data.length, data.length);
+        verify(mockAsyncFile).enableReadEvents(false);
+
+        assertEquals(0, bufferedFile.getInboundBuffer().size());
+    }
+
+    private void checkAndResetMocks() {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager,
+            mockParcelFileDescriptor));
+        reset(mockFileListener, mockAsyncFile, mockEventManager);
+    }
+
+    private StreamingPacketFile createFile(
+            int maxBufferedInboundPackets, int maxBufferedOutboundPackets) throws Exception {
+        when(mockEventManager.registerFile(any(), any())).thenReturn(mockAsyncFile);
+        return new StreamingPacketFile(
+            mockEventManager,
+            FileHandle.fromFileDescriptor(mockParcelFileDescriptor),
+            mockFileListener,
+            MAX_PACKET_SIZE,
+            maxBufferedInboundPackets,
+            maxBufferedOutboundPackets);
+    }
+}
diff --git a/staticlibs/testutils/Android.bp b/staticlibs/testutils/Android.bp
index bcf89b3..3382156 100644
--- a/staticlibs/testutils/Android.bp
+++ b/staticlibs/testutils/Android.bp
@@ -38,6 +38,7 @@
         "net-utils-device-common",
         "net-utils-device-common-async",
         "net-utils-device-common-netlink",
+        "net-utils-device-common-wear",
         "modules-utils-build_system",
     ],
     lint: { strict_updatability_linting: true },
diff --git a/staticlibs/testutils/devicetests/com/android/testutils/async/FakeOsAccess.java b/staticlibs/testutils/devicetests/com/android/testutils/async/FakeOsAccess.java
index 1b8e26b..48b57d7 100644
--- a/staticlibs/testutils/devicetests/com/android/testutils/async/FakeOsAccess.java
+++ b/staticlibs/testutils/devicetests/com/android/testutils/async/FakeOsAccess.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.android.testutils;
+package com.android.testutils.async;
 
 import android.os.ParcelFileDescriptor;
 import android.system.StructPollfd;
diff --git a/staticlibs/testutils/devicetests/com/android/testutils/async/RateLimiter.java b/staticlibs/testutils/devicetests/com/android/testutils/async/RateLimiter.java
index 137873d..d5cca0a 100644
--- a/staticlibs/testutils/devicetests/com/android/testutils/async/RateLimiter.java
+++ b/staticlibs/testutils/devicetests/com/android/testutils/async/RateLimiter.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.android.testutils;
+package com.android.testutils.async;
 
 import com.android.net.module.util.async.OsAccess;
 
diff --git a/staticlibs/testutils/devicetests/com/android/testutils/async/ReadableDataAnswer.java b/staticlibs/testutils/devicetests/com/android/testutils/async/ReadableDataAnswer.java
new file mode 100644
index 0000000..4bf5527
--- /dev/null
+++ b/staticlibs/testutils/devicetests/com/android/testutils/async/ReadableDataAnswer.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.testutils.async;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+
+public class ReadableDataAnswer implements Answer {
+    private final ArrayList<byte[]> mBuffers = new ArrayList<>();
+    private int mBufferPos;
+
+    public ReadableDataAnswer(byte[] ... buffers) {
+        for (byte[] buffer : buffers) {
+            addBuffer(buffer);
+        }
+    }
+
+    public void addBuffer(byte[] buffer) {
+        if (buffer.length != 0) {
+            mBuffers.add(buffer);
+        }
+    }
+
+    public int getRemainingSize() {
+        int totalSize = 0;
+        for (byte[] buffer : mBuffers) {
+            totalSize += buffer.length;
+        }
+        return totalSize - mBufferPos;
+    }
+
+    private void cleanupBuffers() {
+        if (!mBuffers.isEmpty() && mBufferPos == mBuffers.get(0).length) {
+            mBuffers.remove(0);
+            mBufferPos = 0;
+        }
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+        cleanupBuffers();
+
+        if (mBuffers.isEmpty()) {
+            return Integer.valueOf(0);
+        }
+
+        byte[] src = mBuffers.get(0);
+
+        byte[] dst = invocation.<byte[]>getArgument(0);
+        int dstPos = invocation.<Integer>getArgument(1);
+        int dstLen = invocation.<Integer>getArgument(2);
+
+        int copyLen = Math.min(dstLen, src.length - mBufferPos);
+        System.arraycopy(src, mBufferPos, dst, dstPos, copyLen);
+        mBufferPos += copyLen;
+
+        cleanupBuffers();
+        return Integer.valueOf(copyLen);
+    }
+}