Add current device consumption to InputConsumerNoResampling

Added current device consumption to InputConsumerNoResampling in
consumeBatchedInputEvents' body

Bug: 329776327
Flag: EXEMPT refactor
Test: TEST=libinput_tests; m $TEST && $ANDROID_HOST_OUT/nativetest64/$TEST/$TEST --gtest_filter="InputConsumerTest*"
Change-Id: Ibfcb28f019543a88b3ffada137199b1c3933d542
diff --git a/include/input/InputConsumerNoResampling.h b/include/input/InputConsumerNoResampling.h
index 358a191..10c2aa0 100644
--- a/include/input/InputConsumerNoResampling.h
+++ b/include/input/InputConsumerNoResampling.h
@@ -16,6 +16,11 @@
 
 #pragma once
 
+#include <map>
+#include <memory>
+#include <optional>
+
+#include <input/Input.h>
 #include <input/InputTransport.h>
 #include <input/LooperInterface.h>
 #include <input/Resampler.h>
@@ -36,7 +41,7 @@
     /**
      * When you receive this callback, you must (eventually) call "consumeBatchedInputEvents".
      * If you don't want batching, then call "consumeBatchedInputEvents" immediately with
-     * std::nullopt frameTime to receive the pending motion event(s).
+     * std::nullopt requestedFrameTime to receive the pending motion event(s).
      * @param pendingBatchSource the source of the pending batch.
      */
     virtual void onBatchedInputEventPending(int32_t pendingBatchSource) = 0;
@@ -96,15 +101,17 @@
     void finishInputEvent(uint32_t seq, bool handled);
     void reportTimeline(int32_t inputEventId, nsecs_t gpuCompletedTime, nsecs_t presentTime);
     /**
-     * If you want to consume all events immediately (disable batching), the you still must call
-     * this. For frameTime, use a std::nullopt.
-     * @param frameTime the time up to which consume the events. When there's double (or triple)
-     * buffering, you may want to not consume all events currently available, because you could be
-     * still working on an older frame, but there could already have been events that arrived that
-     * are more recent.
+     * If you want to consume all events immediately (disable batching), then you still must call
+     * this. For requestedFrameTime, use a std::nullopt. It is not guaranteed that the consumption
+     * will occur at requestedFrameTime. The resampling strategy may modify it.
+     * @param requestedFrameTime the time up to which consume the events. When there's double (or
+     * triple) buffering, you may want to not consume all events currently available, because you
+     * could be still working on an older frame, but there could already have been events that
+     * arrived that are more recent.
      * @return whether any events were actually consumed
      */
-    bool consumeBatchedInputEvents(std::optional<nsecs_t> frameTime);
+    bool consumeBatchedInputEvents(std::optional<nsecs_t> requestedFrameTime);
+
     /**
      * Returns true when there is *likely* a pending batch or a pending event in the channel.
      *
@@ -200,20 +207,33 @@
     /**
      * Batched InputMessages, per deviceId.
      * For each device, we are storing a queue of batched messages. These will all be collapsed into
-     * a single MotionEvent (up to a specific frameTime) when the consumer calls
+     * a single MotionEvent (up to a specific requestedFrameTime) when the consumer calls
      * `consumeBatchedInputEvents`.
      */
     std::map<DeviceId, std::queue<InputMessage>> mBatches;
     /**
      * Creates a MotionEvent by consuming samples from the provided queue. If one message has
-     * eventTime > frameTime, all subsequent messages in the queue will be skipped. It is assumed
-     * that messages are queued in chronological order. In other words, only events that occurred
-     * prior to the requested frameTime will be consumed.
-     * @param frameTime the time up to which to consume events
+     * eventTime > adjustedFrameTime, all subsequent messages in the queue will be skipped. It is
+     * assumed that messages are queued in chronological order. In other words, only events that
+     * occurred prior to the adjustedFrameTime will be consumed.
+     * @param requestedFrameTime the time up to which to consume events.
      * @param messages the queue of messages to consume from
      */
     std::pair<std::unique_ptr<MotionEvent>, std::optional<uint32_t>> createBatchedMotionEvent(
-            const nsecs_t frameTime, std::queue<InputMessage>& messages);
+            const nsecs_t requestedFrameTime, std::queue<InputMessage>& messages);
+
+    /**
+     * Consumes the batched input events, optionally allowing the caller to specify a device id
+     * and/or requestedFrameTime threshold. It is not guaranteed that consumption will occur at
+     * requestedFrameTime.
+     * @param deviceId The device id from which to consume events. If std::nullopt, consumes events
+     * from any device id.
+     * @param requestedFrameTime The time up to which consume the events. If std::nullopt, consumes
+     * input events with any timestamp.
+     * @return Whether or not any events were consumed.
+     */
+    bool consumeBatchedInputEvents(std::optional<DeviceId> deviceId,
+                                   std::optional<nsecs_t> requestedFrameTime);
     /**
      * A map from a single sequence number to several sequence numbers. This is needed because of
      * batching. When batching is enabled, a single MotionEvent will contain several samples. Each
diff --git a/libs/input/InputConsumerNoResampling.cpp b/libs/input/InputConsumerNoResampling.cpp
index de55828..f33afe7 100644
--- a/libs/input/InputConsumerNoResampling.cpp
+++ b/libs/input/InputConsumerNoResampling.cpp
@@ -227,7 +227,7 @@
 
 InputConsumerNoResampling::~InputConsumerNoResampling() {
     ensureCalledOnLooperThread(__func__);
-    consumeBatchedInputEvents(std::nullopt);
+    consumeBatchedInputEvents(/*requestedFrameTime=*/std::nullopt);
     while (!mOutboundQueue.empty()) {
         processOutboundEvents();
         // This is our last chance to ack the events. If we don't ack them here, we will get an ANR,
@@ -253,8 +253,7 @@
 
     int handledEvents = 0;
     if (events & ALOOPER_EVENT_INPUT) {
-        std::vector<InputMessage> messages = readAllMessages();
-        handleMessages(std::move(messages));
+        handleMessages(readAllMessages());
         handledEvents |= ALOOPER_EVENT_INPUT;
     }
 
@@ -362,10 +361,8 @@
                 // add it to batch
                 mBatches[deviceId].emplace(msg);
             } else {
-                // consume all pending batches for this event immediately
-                // TODO(b/329776327): figure out if this could be smarter by limiting the
-                // consumption only to the current device.
-                consumeBatchedInputEvents(std::nullopt);
+                // consume all pending batches for this device immediately
+                consumeBatchedInputEvents(deviceId, /*requestedFrameTime=*/std::nullopt);
                 handleMessage(msg);
             }
         } else {
@@ -483,13 +480,13 @@
 }
 
 std::pair<std::unique_ptr<MotionEvent>, std::optional<uint32_t>>
-InputConsumerNoResampling::createBatchedMotionEvent(const nsecs_t frameTime,
+InputConsumerNoResampling::createBatchedMotionEvent(const nsecs_t requestedFrameTime,
                                                     std::queue<InputMessage>& messages) {
     std::unique_ptr<MotionEvent> motionEvent;
     std::optional<uint32_t> firstSeqForBatch;
     const nanoseconds resampleLatency =
             (mResampler != nullptr) ? mResampler->getResampleLatency() : nanoseconds{0};
-    const nanoseconds adjustedFrameTime = nanoseconds{frameTime} - resampleLatency;
+    const nanoseconds adjustedFrameTime = nanoseconds{requestedFrameTime} - resampleLatency;
 
     while (!messages.empty() &&
            (messages.front().body.motion.eventTime <= adjustedFrameTime.count())) {
@@ -511,36 +508,52 @@
         if (!messages.empty()) {
             futureSample = &messages.front();
         }
-        mResampler->resampleMotionEvent(nanoseconds{frameTime}, *motionEvent, futureSample);
+        mResampler->resampleMotionEvent(nanoseconds{requestedFrameTime}, *motionEvent,
+                                        futureSample);
     }
     return std::make_pair(std::move(motionEvent), firstSeqForBatch);
 }
 
 bool InputConsumerNoResampling::consumeBatchedInputEvents(
-        std::optional<nsecs_t> requestedFrameTime) {
+        std::optional<DeviceId> deviceId, std::optional<nsecs_t> requestedFrameTime) {
     ensureCalledOnLooperThread(__func__);
     // When batching is not enabled, we want to consume all events. That's equivalent to having an
-    // infinite frameTime.
-    const nsecs_t frameTime = requestedFrameTime.value_or(std::numeric_limits<nsecs_t>::max());
+    // infinite requestedFrameTime.
+    requestedFrameTime = requestedFrameTime.value_or(std::numeric_limits<nsecs_t>::max());
     bool producedEvents = false;
-    for (auto& [_, messages] : mBatches) {
-        auto [motion, firstSeqForBatch] = createBatchedMotionEvent(frameTime, messages);
+
+    for (auto deviceIdIter = (deviceId.has_value()) ? (mBatches.find(*deviceId))
+                                                    : (mBatches.begin());
+         deviceIdIter != mBatches.cend(); ++deviceIdIter) {
+        std::queue<InputMessage>& messages = deviceIdIter->second;
+        auto [motion, firstSeqForBatch] = createBatchedMotionEvent(*requestedFrameTime, messages);
         if (motion != nullptr) {
             LOG_ALWAYS_FATAL_IF(!firstSeqForBatch.has_value());
             mCallbacks.onMotionEvent(std::move(motion), *firstSeqForBatch);
             producedEvents = true;
         } else {
-            // This is OK, it just means that the frameTime is too old (all events that we have
-            // pending are in the future of the frametime). Maybe print a
-            // warning? If there are multiple devices active though, this might be normal and can
-            // just be ignored, unless none of them resulted in any consumption (in that case, this
-            // function would already return "false" so we could just leave it up to the caller).
+            // This is OK, it just means that the requestedFrameTime is too old (all events that we
+            // have pending are in the future of the requestedFrameTime). Maybe print a warning? If
+            // there are multiple devices active though, this might be normal and can just be
+            // ignored, unless none of them resulted in any consumption (in that case, this function
+            // would already return "false" so we could just leave it up to the caller).
+        }
+
+        if (deviceId.has_value()) {
+            // We already consumed events for this device. Break here to prevent iterating over the
+            // other devices.
+            break;
         }
     }
     std::erase_if(mBatches, [](const auto& pair) { return pair.second.empty(); });
     return producedEvents;
 }
 
+bool InputConsumerNoResampling::consumeBatchedInputEvents(
+        std::optional<nsecs_t> requestedFrameTime) {
+    return consumeBatchedInputEvents(/*deviceId=*/std::nullopt, requestedFrameTime);
+}
+
 void InputConsumerNoResampling::ensureCalledOnLooperThread(const char* func) const {
     sp<Looper> callingThreadLooper = Looper::getForThread();
     if (callingThreadLooper != mLooper->getLooper()) {
diff --git a/libs/input/tests/InputConsumer_test.cpp b/libs/input/tests/InputConsumer_test.cpp
index 55be453..dec78aa 100644
--- a/libs/input/tests/InputConsumer_test.cpp
+++ b/libs/input/tests/InputConsumer_test.cpp
@@ -20,9 +20,11 @@
 #include <optional>
 #include <utility>
 
+#include <TestEventMatchers.h>
 #include <TestInputChannel.h>
 #include <TestLooper.h>
 #include <android-base/logging.h>
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <input/BlockingQueue.h>
 #include <input/InputEventBuilders.h>
@@ -34,6 +36,10 @@
 
 using std::chrono::nanoseconds;
 
+using ::testing::AllOf;
+using ::testing::Matcher;
+using ::testing::Not;
+
 } // namespace
 
 class InputConsumerTest : public testing::Test, public InputConsumerCallbacks {
@@ -47,7 +53,17 @@
                                                             std::make_unique<LegacyResampler>());
     }
 
-    void assertOnBatchedInputEventPendingWasCalled();
+    void assertOnBatchedInputEventPendingWasCalled() {
+        ASSERT_GT(mOnBatchedInputEventPendingInvocationCount, 0UL)
+                << "onBatchedInputEventPending has not been called.";
+        --mOnBatchedInputEventPendingInvocationCount;
+    }
+
+    void assertReceivedMotionEvent(const Matcher<MotionEvent>& matcher) {
+        std::unique_ptr<MotionEvent> motionEvent = mMotionEvents.pop();
+        ASSERT_NE(motionEvent, nullptr);
+        EXPECT_THAT(*motionEvent, matcher);
+    }
 
     std::shared_ptr<TestInputChannel> mClientTestChannel;
     std::shared_ptr<TestLooper> mTestLooper;
@@ -96,12 +112,6 @@
     };
 };
 
-void InputConsumerTest::assertOnBatchedInputEventPendingWasCalled() {
-    ASSERT_GT(mOnBatchedInputEventPendingInvocationCount, 0UL)
-            << "onBatchedInputEventPending has not been called.";
-    --mOnBatchedInputEventPendingInvocationCount;
-}
-
 TEST_F(InputConsumerTest, MessageStreamBatchedInMotionEvent) {
     mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/0}
                                                .eventTime(nanoseconds{0ms}.count())
@@ -122,7 +132,7 @@
 
     assertOnBatchedInputEventPendingWasCalled();
 
-    mConsumer->consumeBatchedInputEvents(std::nullopt);
+    mConsumer->consumeBatchedInputEvents(/*frameTime=*/std::nullopt);
 
     std::unique_ptr<MotionEvent> downMotionEvent = mMotionEvents.pop();
     ASSERT_NE(downMotionEvent, nullptr);
@@ -182,4 +192,50 @@
     mClientTestChannel->assertFinishMessage(/*seq=*/2, true);
     mClientTestChannel->assertFinishMessage(/*seq=*/3, true);
 }
+
+TEST_F(InputConsumerTest, BatchedEventsMultiDeviceConsumption) {
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/0}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_DOWN)
+                                               .build());
+
+    mTestLooper->invokeCallback(mClientTestChannel->getFd(), ALOOPER_EVENT_INPUT);
+    assertReceivedMotionEvent(AllOf(WithDeviceId(0), WithMotionAction(AMOTION_EVENT_ACTION_DOWN)));
+
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/1}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_MOVE)
+                                               .build());
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/2}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_MOVE)
+                                               .build());
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/3}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_MOVE)
+                                               .build());
+
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/4}
+                                               .deviceId(1)
+                                               .action(AMOTION_EVENT_ACTION_DOWN)
+                                               .build());
+
+    mTestLooper->invokeCallback(mClientTestChannel->getFd(), ALOOPER_EVENT_INPUT);
+    assertReceivedMotionEvent(AllOf(WithDeviceId(1), WithMotionAction(AMOTION_EVENT_ACTION_DOWN)));
+
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/5}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_UP)
+                                               .build());
+
+    mTestLooper->invokeCallback(mClientTestChannel->getFd(), ALOOPER_EVENT_INPUT);
+    assertReceivedMotionEvent(AllOf(WithDeviceId(0), WithMotionAction(AMOTION_EVENT_ACTION_MOVE),
+                                    Not(MotionEventIsResampled())));
+
+    mClientTestChannel->assertFinishMessage(/*seq=*/0, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/4, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/1, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/2, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/3, /*handled=*/true);
+}
 } // namespace android
diff --git a/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp b/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp
index 467c3b4..1210f71 100644
--- a/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp
+++ b/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp
@@ -364,7 +364,7 @@
         if (!mConsumer->probablyHasInput()) {
             ADD_FAILURE() << "should deterministically have input because there is a batch";
         }
-        mConsumer->consumeBatchedInputEvents(std::nullopt);
+        mConsumer->consumeBatchedInputEvents(/*frameTime=*/std::nullopt);
     };
     void onFocusEvent(std::unique_ptr<FocusEvent> event, uint32_t seq) override {
         mFocusEvents.push(std::move(event));
diff --git a/libs/input/tests/TestEventMatchers.h b/libs/input/tests/TestEventMatchers.h
new file mode 100644
index 0000000..dd2e40c
--- /dev/null
+++ b/libs/input/tests/TestEventMatchers.h
@@ -0,0 +1,110 @@
+/**
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include <input/Input.h>
+
+namespace android {
+
+/**
+ * This file contains a copy of Matchers from .../inputflinger/tests/TestEventMatchers.h. Ideally,
+ * implementations must not be duplicated.
+ * TODO(b/365606513): Find a way to share TestEventMatchers.h between inputflinger and libinput.
+ */
+
+class WithDeviceIdMatcher {
+public:
+    using is_gtest_matcher = void;
+    explicit WithDeviceIdMatcher(DeviceId deviceId) : mDeviceId(deviceId) {}
+
+    bool MatchAndExplain(const InputEvent& event, std::ostream*) const {
+        return mDeviceId == event.getDeviceId();
+    }
+
+    void DescribeTo(std::ostream* os) const { *os << "with device id " << mDeviceId; }
+
+    void DescribeNegationTo(std::ostream* os) const { *os << "wrong device id"; }
+
+private:
+    const DeviceId mDeviceId;
+};
+
+inline WithDeviceIdMatcher WithDeviceId(int32_t deviceId) {
+    return WithDeviceIdMatcher(deviceId);
+}
+
+class WithMotionActionMatcher {
+public:
+    using is_gtest_matcher = void;
+    explicit WithMotionActionMatcher(int32_t action) : mAction(action) {}
+
+    bool MatchAndExplain(const MotionEvent& event, std::ostream*) const {
+        bool matches = mAction == event.getAction();
+        if (event.getAction() == AMOTION_EVENT_ACTION_CANCEL) {
+            matches &= (event.getFlags() & AMOTION_EVENT_FLAG_CANCELED) != 0;
+        }
+        return matches;
+    }
+
+    void DescribeTo(std::ostream* os) const {
+        *os << "with motion action " << MotionEvent::actionToString(mAction);
+        if (mAction == AMOTION_EVENT_ACTION_CANCEL) {
+            *os << " and FLAG_CANCELED";
+        }
+    }
+
+    void DescribeNegationTo(std::ostream* os) const { *os << "wrong action"; }
+
+private:
+    const int32_t mAction;
+};
+
+inline WithMotionActionMatcher WithMotionAction(int32_t action) {
+    return WithMotionActionMatcher(action);
+}
+
+class MotionEventIsResampledMatcher {
+public:
+    using is_gtest_matcher = void;
+
+    bool MatchAndExplain(const MotionEvent& motionEvent, std::ostream*) const {
+        const size_t numSamples = motionEvent.getHistorySize() + 1;
+        const size_t numPointers = motionEvent.getPointerCount();
+        if (numPointers <= 0 || numSamples <= 0) {
+            return false;
+        }
+        for (size_t i = 0; i < numPointers; ++i) {
+            const PointerCoords& pointerCoords =
+                    motionEvent.getSamplePointerCoords()[numSamples * numPointers + i];
+            if (!pointerCoords.isResampled) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    void DescribeTo(std::ostream* os) const { *os << "MotionEvent is resampled."; }
+
+    void DescribeNegationTo(std::ostream* os) const { *os << "MotionEvent is not resampled."; }
+};
+
+inline MotionEventIsResampledMatcher MotionEventIsResampled() {
+    return MotionEventIsResampledMatcher();
+}
+} // namespace android