Add async BufferedFile and StreamingPacketFile impls

Bug: 245971639

Change-Id: Ief1719262c2cb68819f6feb955e97793f3638ec0
diff --git a/staticlibs/tests/unit/Android.bp b/staticlibs/tests/unit/Android.bp
index 6e223bd..40371e6 100644
--- a/staticlibs/tests/unit/Android.bp
+++ b/staticlibs/tests/unit/Android.bp
@@ -21,6 +21,7 @@
         "net-utils-device-common-async",
         "net-utils-device-common-bpf",
         "net-utils-device-common-ip",
+        "net-utils-device-common-wear",
     ],
     libs: [
         "android.test.runner",
diff --git a/staticlibs/tests/unit/src/com/android/net/module/util/async/BufferedFileTest.java b/staticlibs/tests/unit/src/com/android/net/module/util/async/BufferedFileTest.java
new file mode 100644
index 0000000..11a74f2
--- /dev/null
+++ b/staticlibs/tests/unit/src/com/android/net/module/util/async/BufferedFileTest.java
@@ -0,0 +1,376 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.async;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.ignoreStubs;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import android.os.ParcelFileDescriptor;
+
+import androidx.test.filters.SmallTest;
+import androidx.test.runner.AndroidJUnit4;
+
+import com.android.testutils.async.ReadableDataAnswer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@RunWith(AndroidJUnit4.class)
+@SmallTest
+public class BufferedFileTest {
+    @Mock EventManager mockEventManager;
+    @Mock BufferedFile.Listener mockFileListener;
+    @Mock AsyncFile mockAsyncFile;
+    @Mock ParcelFileDescriptor mockParcelFileDescriptor;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager));
+    }
+
+    @Test
+    public void onClosed() throws Exception {
+        final int inboundBufferSize = 1024;
+        final int outboundBufferSize = 768;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        file.onClosed(mockAsyncFile);
+
+        verify(mockFileListener).onBufferedFileClosed();
+    }
+
+    @Test
+    public void continueReadingAndClose() throws Exception {
+        final int inboundBufferSize = 1024;
+        final int outboundBufferSize = 768;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        assertEquals(inboundBufferSize, file.getInboundBufferFreeSizeForTest());
+        assertEquals(outboundBufferSize, file.getOutboundBufferFreeSize());
+
+        file.continueReading();
+        verify(mockAsyncFile).enableReadEvents(true);
+
+        file.close();
+        verify(mockAsyncFile).close();
+    }
+
+    @Test
+    public void enqueueOutboundData() throws Exception {
+        final int inboundBufferSize = 10;
+        final int outboundBufferSize = 250;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+
+        assertEquals(0, file.getOutboundBufferSize());
+
+        final int totalLen = data1.length + data2.length;
+
+        when(mockAsyncFile.write(any(), anyInt(), anyInt())).thenReturn(0);
+        assertTrue(file.enqueueOutboundData(data1, 0, data1.length, null, 0, 0));
+        verify(mockAsyncFile).enableWriteEvents(true);
+
+        assertEquals(data1.length, file.getOutboundBufferSize());
+
+        checkAndResetMocks();
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture())).thenReturn(totalLen);
+
+        assertTrue(file.enqueueOutboundData(data2, 0, data2.length, null, 0, 0));
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(0, file.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(totalLen, lenCaptor.getValue().intValue());
+        assertEquals(data1[0], arrayCaptor.getValue()[0]);
+        assertEquals(data2[0], arrayCaptor.getValue()[data1.length]);
+    }
+
+    @Test
+    public void enqueueOutboundData_combined() throws Exception {
+        final int inboundBufferSize = 10;
+        final int outboundBufferSize = 250;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+
+        assertEquals(0, file.getOutboundBufferSize());
+
+        final int totalLen = data1.length + data2.length;
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture())).thenReturn(totalLen);
+
+        assertTrue(file.enqueueOutboundData(data1, 0, data1.length, data2, 0, data2.length));
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(0, file.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(totalLen, lenCaptor.getValue().intValue());
+        assertEquals(data1[0], arrayCaptor.getValue()[0]);
+        assertEquals(data2[0], arrayCaptor.getValue()[data1.length]);
+    }
+
+    @Test
+    public void enableWriteEvents() throws Exception {
+        final int inboundBufferSize = 10;
+        final int outboundBufferSize = 250;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        final byte[] data3 = new byte[103];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+        data3[0] = (byte) 3;
+
+        assertEquals(0, file.getOutboundBufferSize());
+
+        // Write first 2 buffers, but fail to flush them, causing async write request.
+        final int data1And2Len = data1.length + data2.length;
+        when(mockAsyncFile.write(any(), eq(0), eq(data1And2Len))).thenReturn(0);
+        assertTrue(file.enqueueOutboundData(data1, 0, data1.length, data2, 0, data2.length));
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(data1And2Len, file.getOutboundBufferSize());
+        verify(mockAsyncFile).enableWriteEvents(true);
+
+        // Try to write 3rd buffers, which won't fit, then fail to flush.
+        when(mockAsyncFile.write(any(), eq(0), eq(data1And2Len))).thenReturn(0);
+        assertFalse(file.enqueueOutboundData(data3, 0, data3.length, null, 0, 0));
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(data1And2Len, file.getOutboundBufferSize());
+        verify(mockAsyncFile, times(2)).enableWriteEvents(true);
+
+        checkAndResetMocks();
+
+        // Simulate writeability event, and successfully flush.
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(arrayCaptor.capture(),
+                posCaptor.capture(), lenCaptor.capture())).thenReturn(data1And2Len);
+        file.onWriteReady(mockAsyncFile);
+        verify(mockAsyncFile).enableWriteEvents(false);
+        verify(mockFileListener).onBufferedFileOutboundSpace();
+        assertEquals(0, file.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(data1And2Len, lenCaptor.getValue().intValue());
+        assertEquals(data1[0], arrayCaptor.getValue()[0]);
+        assertEquals(data2[0], arrayCaptor.getValue()[data1.length]);
+
+        checkAndResetMocks();
+
+        // Now write, but fail to flush the third buffer.
+        when(mockAsyncFile.write(arrayCaptor.capture(),
+                posCaptor.capture(), lenCaptor.capture())).thenReturn(0);
+        assertTrue(file.enqueueOutboundData(data3, 0, data3.length, null, 0, 0));
+        verify(mockAsyncFile).enableWriteEvents(true);
+        assertEquals(data3.length, file.getOutboundBufferSize());
+
+        assertEquals(data1And2Len, posCaptor.getValue().intValue());
+        assertEquals(outboundBufferSize - data1And2Len, lenCaptor.getValue().intValue());
+        assertEquals(data3[0], arrayCaptor.getValue()[data1And2Len]);
+    }
+
+    @Test
+    public void read() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data1, data2);
+        final ReadableByteBuffer inboundBuffer = file.getInboundBuffer();
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile).enableReadEvents(true);
+        verify(mockFileListener).onBufferedFileInboundData(eq(data1.length + data2.length));
+
+        assertEquals(0, file.getOutboundBufferSize());
+        assertEquals(data1.length + data2.length, inboundBuffer.size());
+        assertEquals((byte) 1, inboundBuffer.peek(0));
+        assertEquals((byte) 2, inboundBuffer.peek(data1.length));
+    }
+
+    @Test
+    public void enableReadEvents() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data1 = new byte[101];
+        final byte[] data2 = new byte[102];
+        final byte[] data3 = new byte[103];
+        data1[0] = (byte) 1;
+        data2[0] = (byte) 2;
+        data3[0] = (byte) 3;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data1, data2, data3);
+        final ReadableByteBuffer inboundBuffer = file.getInboundBuffer();
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile).enableReadEvents(false);
+        verify(mockFileListener).onBufferedFileInboundData(eq(inboundBufferSize));
+
+        assertEquals(0, file.getOutboundBufferSize());
+        assertEquals(inboundBufferSize, inboundBuffer.size());
+        assertEquals((byte) 1, inboundBuffer.peek(0));
+        assertEquals((byte) 2, inboundBuffer.peek(data1.length));
+        assertEquals((byte) 3, inboundBuffer.peek(data1.length + data2.length));
+
+        checkAndResetMocks();
+
+        // Cannot enable read events since the buffer is full.
+        file.continueReading();
+
+        checkAndResetMocks();
+
+        final byte[] tmp = new byte[inboundBufferSize];
+        inboundBuffer.readBytes(tmp, 0, data1.length);
+        assertEquals(inboundBufferSize - data1.length, inboundBuffer.size());
+
+        file.continueReading();
+
+        inboundBuffer.readBytes(tmp, 0, data2.length);
+        assertEquals(inboundBufferSize - data1.length - data2.length, inboundBuffer.size());
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile, times(2)).enableReadEvents(true);
+        verify(mockFileListener).onBufferedFileInboundData(
+            eq(data1.length + data2.length + data3.length - inboundBufferSize));
+
+        assertEquals(data3.length, inboundBuffer.size());
+        assertEquals((byte) 3, inboundBuffer.peek(0));
+    }
+
+    @Test
+    public void shutdownReading() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data = new byte[100];
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+
+        file.shutdownReading();
+        file.onReadReady(mockAsyncFile);
+
+        verify(mockAsyncFile).enableReadEvents(false);
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(data.length, dataAnswer.getRemainingSize());
+    }
+
+    @Test
+    public void shutdownReading_inCallback() throws Exception {
+        final int inboundBufferSize = 250;
+        final int outboundBufferSize = 10;
+
+        final BufferedFile file = createFile(inboundBufferSize, outboundBufferSize);
+
+        final byte[] data = new byte[100];
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) {
+                file.shutdownReading();
+                return null;
+            }}).when(mockFileListener).onBufferedFileInboundData(anyInt());
+
+        file.onReadReady(mockAsyncFile);
+
+        verify(mockAsyncFile).enableReadEvents(false);
+
+        assertEquals(0, file.getInboundBuffer().size());
+        assertEquals(0, dataAnswer.getRemainingSize());
+    }
+
+    private void checkAndResetMocks() {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager,
+            mockParcelFileDescriptor));
+        reset(mockFileListener, mockAsyncFile, mockEventManager);
+    }
+
+    private BufferedFile createFile(
+            int inboundBufferSize, int outboundBufferSize) throws Exception {
+        when(mockEventManager.registerFile(any(), any())).thenReturn(mockAsyncFile);
+        return BufferedFile.create(
+            mockEventManager,
+            FileHandle.fromFileDescriptor(mockParcelFileDescriptor),
+            mockFileListener,
+            inboundBufferSize,
+            outboundBufferSize);
+    }
+}
diff --git a/staticlibs/tests/unit/src/com/android/net/module/util/wear/NetPacketHelpersTest.java b/staticlibs/tests/unit/src/com/android/net/module/util/wear/NetPacketHelpersTest.java
new file mode 100644
index 0000000..23e7b15
--- /dev/null
+++ b/staticlibs/tests/unit/src/com/android/net/module/util/wear/NetPacketHelpersTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+import static org.junit.Assert.assertEquals;
+
+import androidx.test.filters.SmallTest;
+import androidx.test.runner.AndroidJUnit4;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.android.net.module.util.async.CircularByteBuffer;
+
+@RunWith(AndroidJUnit4.class)
+@SmallTest
+public class NetPacketHelpersTest {
+    @Test
+    public void decodeNetworkUnsignedInt16() {
+        final byte[] data = new byte[4];
+        data[0] = (byte) 0xFF;
+        data[1] = (byte) 1;
+        data[2] = (byte) 2;
+        data[3] = (byte) 0xFF;
+
+        assertEquals(0x0102, NetPacketHelpers.decodeNetworkUnsignedInt16(data, 1));
+
+        CircularByteBuffer buffer = new CircularByteBuffer(100);
+        buffer.writeBytes(data, 0, data.length);
+
+        assertEquals(0x0102, NetPacketHelpers.decodeNetworkUnsignedInt16(buffer, 1));
+    }
+
+    @Test
+    public void encodeNetworkUnsignedInt16() {
+        final byte[] data = new byte[4];
+        data[0] = (byte) 0xFF;
+        data[3] = (byte) 0xFF;
+        NetPacketHelpers.encodeNetworkUnsignedInt16(0x0102, data, 1);
+
+        assertEquals((byte) 0xFF, data[0]);
+        assertEquals((byte) 1, data[1]);
+        assertEquals((byte) 2, data[2]);
+        assertEquals((byte) 0xFF, data[3]);
+    }
+}
diff --git a/staticlibs/tests/unit/src/com/android/net/module/util/wear/StreamingPacketFileTest.java b/staticlibs/tests/unit/src/com/android/net/module/util/wear/StreamingPacketFileTest.java
new file mode 100644
index 0000000..1fcca70
--- /dev/null
+++ b/staticlibs/tests/unit/src/com/android/net/module/util/wear/StreamingPacketFileTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.net.module.util.wear;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.ignoreStubs;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import android.os.ParcelFileDescriptor;
+
+import androidx.test.filters.SmallTest;
+import androidx.test.runner.AndroidJUnit4;
+
+import com.android.net.module.util.async.AsyncFile;
+import com.android.net.module.util.async.BufferedFile;
+import com.android.net.module.util.async.EventManager;
+import com.android.net.module.util.async.FileHandle;
+import com.android.net.module.util.async.ReadableByteBuffer;
+import com.android.testutils.async.ReadableDataAnswer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@RunWith(AndroidJUnit4.class)
+@SmallTest
+public class StreamingPacketFileTest {
+    private static final int MAX_PACKET_SIZE = 100;
+
+    @Mock EventManager mockEventManager;
+    @Mock PacketFile.Listener mockFileListener;
+    @Mock AsyncFile mockAsyncFile;
+    @Mock ParcelFileDescriptor mockParcelFileDescriptor;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager));
+    }
+
+    @Test
+    public void continueReadingAndClose() throws Exception {
+        final int maxBufferedInboundPackets = 3;
+        final int maxBufferedOutboundPackets = 5;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+
+        assertEquals(maxBufferedInboundPackets * (MAX_PACKET_SIZE + 2),
+            bufferedFile.getInboundBufferFreeSizeForTest());
+        assertEquals(maxBufferedOutboundPackets * (MAX_PACKET_SIZE + 2),
+            bufferedFile.getOutboundBufferFreeSize());
+        assertEquals(bufferedFile.getOutboundBufferFreeSize() - 2,
+            file.getOutboundFreeSize());
+
+        file.continueReading();
+        verify(mockAsyncFile).enableReadEvents(true);
+
+        file.close();
+        verify(mockAsyncFile).close();
+    }
+
+    @Test
+    public void enqueueOutboundPacket() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+
+        final byte[] packet1 = new byte[11];
+        final byte[] packet2 = new byte[12];
+        packet1[0] = (byte) 1;
+        packet2[0] = (byte) 2;
+
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+
+        when(mockAsyncFile.write(any(), anyInt(), anyInt())).thenReturn(0);
+        assertTrue(file.enqueueOutboundPacket(packet1, 0, packet1.length));
+        verify(mockAsyncFile).enableWriteEvents(true);
+
+        assertEquals(packet1.length + 2, bufferedFile.getOutboundBufferSize());
+
+        checkAndResetMocks();
+
+        final int totalLen = packet1.length + packet2.length + 4;
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+        when(mockAsyncFile.write(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture())).thenReturn(totalLen);
+
+        assertTrue(file.enqueueOutboundPacket(packet2, 0, packet2.length));
+
+        assertEquals(0, bufferedFile.getInboundBuffer().size());
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+
+        assertEquals(0, posCaptor.getValue().intValue());
+        assertEquals(totalLen, lenCaptor.getValue().intValue());
+
+        final byte[] capturedData = arrayCaptor.getValue();
+        assertEquals(packet1.length, NetPacketHelpers.decodeNetworkUnsignedInt16(capturedData, 0));
+        assertEquals(packet2.length,
+            NetPacketHelpers.decodeNetworkUnsignedInt16(capturedData, packet1.length + 2));
+        assertEquals(packet1[0], capturedData[2]);
+        assertEquals(packet2[0], capturedData[packet1.length + 4]);
+    }
+
+    @Test
+    public void onInboundPacket() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+        final ReadableByteBuffer inboundBuffer = bufferedFile.getInboundBuffer();
+
+        final int len1 = 11;
+        final int len2 = 12;
+        final byte[] data = new byte[len1 + len2 + 4];
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len1, data, 0);
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len2, data, 11 + 2);
+        data[2] = (byte) 1;
+        data[len1 + 4] = (byte) 2;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        when(mockFileListener.onPreambleData(any(), eq(0), eq(data.length))).thenReturn(0);
+        bufferedFile.onReadReady(mockAsyncFile);
+        verify(mockAsyncFile).enableReadEvents(true);
+        verify(mockFileListener).onInboundBuffered(data.length, data.length);
+        verify(mockFileListener).onInboundPacket(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture());
+        verify(mockEventManager).execute(any());
+
+        byte[] capturedData = arrayCaptor.getValue();
+        assertEquals(2, posCaptor.getValue().intValue());
+        assertEquals(len1, lenCaptor.getValue().intValue());
+        assertEquals((byte) 1, capturedData[2]);
+
+        checkAndResetMocks();
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        file.onBufferedFileInboundData(0);
+        verify(mockFileListener).onInboundPacket(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture());
+        verify(mockEventManager).execute(any());
+
+        capturedData = arrayCaptor.getValue();
+        assertEquals(2, posCaptor.getValue().intValue());
+        assertEquals(len2, lenCaptor.getValue().intValue());
+        assertEquals((byte) 2, capturedData[2]);
+
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+        assertEquals(0, inboundBuffer.size());
+    }
+
+    @Test
+    public void onReadReady_preambleData() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+        final ReadableByteBuffer inboundBuffer = bufferedFile.getInboundBuffer();
+
+        final int preambleLen = 23;
+        final int len1 = 11;
+        final byte[] data = new byte[preambleLen + 2 + len1];
+        NetPacketHelpers.encodeNetworkUnsignedInt16(len1, data, preambleLen);
+        data[preambleLen + 2] = (byte) 1;
+
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+        when(mockFileListener.onPreambleData(any(), eq(0), eq(data.length))).thenReturn(5);
+        when(mockFileListener.onPreambleData(
+            any(), eq(0), eq(data.length - 5))).thenReturn(preambleLen - 5);
+        when(mockFileListener.onPreambleData(
+            any(), eq(0), eq(data.length - preambleLen))).thenReturn(0);
+
+        bufferedFile.onReadReady(mockAsyncFile);
+
+        final ArgumentCaptor<byte[]> arrayCaptor = ArgumentCaptor.forClass(byte[].class);
+        final ArgumentCaptor<Integer> posCaptor = ArgumentCaptor.forClass(Integer.class);
+        final ArgumentCaptor<Integer> lenCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        verify(mockFileListener).onInboundBuffered(data.length, data.length);
+        verify(mockFileListener).onInboundPacket(
+            arrayCaptor.capture(), posCaptor.capture(), lenCaptor.capture());
+        verify(mockEventManager).execute(any());
+        verify(mockAsyncFile).enableReadEvents(true);
+
+        final byte[] capturedData = arrayCaptor.getValue();
+        assertEquals(2, posCaptor.getValue().intValue());
+        assertEquals(len1, lenCaptor.getValue().intValue());
+        assertEquals((byte) 1, capturedData[2]);
+
+        assertEquals(0, bufferedFile.getOutboundBufferSize());
+        assertEquals(0, inboundBuffer.size());
+    }
+
+    @Test
+    public void shutdownReading() throws Exception {
+        final int maxBufferedInboundPackets = 10;
+        final int maxBufferedOutboundPackets = 20;
+
+        final StreamingPacketFile file =
+            createFile(maxBufferedInboundPackets, maxBufferedOutboundPackets);
+        final BufferedFile bufferedFile = file.getUnderlyingFileForTest();
+
+        final byte[] data = new byte[100];
+        final ReadableDataAnswer dataAnswer = new ReadableDataAnswer(data);
+        when(mockAsyncFile.read(any(), anyInt(), anyInt())).thenAnswer(dataAnswer);
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock invocation) {
+                file.shutdownReading();
+                return Integer.valueOf(-1);
+            }}).when(mockFileListener).onPreambleData(any(), anyInt(), anyInt());
+
+        bufferedFile.onReadReady(mockAsyncFile);
+
+        verify(mockFileListener).onInboundBuffered(data.length, data.length);
+        verify(mockAsyncFile).enableReadEvents(false);
+
+        assertEquals(0, bufferedFile.getInboundBuffer().size());
+    }
+
+    private void checkAndResetMocks() {
+        verifyNoMoreInteractions(ignoreStubs(mockFileListener, mockAsyncFile, mockEventManager,
+            mockParcelFileDescriptor));
+        reset(mockFileListener, mockAsyncFile, mockEventManager);
+    }
+
+    private StreamingPacketFile createFile(
+            int maxBufferedInboundPackets, int maxBufferedOutboundPackets) throws Exception {
+        when(mockEventManager.registerFile(any(), any())).thenReturn(mockAsyncFile);
+        return new StreamingPacketFile(
+            mockEventManager,
+            FileHandle.fromFileDescriptor(mockParcelFileDescriptor),
+            mockFileListener,
+            MAX_PACKET_SIZE,
+            maxBufferedInboundPackets,
+            maxBufferedOutboundPackets);
+    }
+}