Add current device consumption to InputConsumerNoResampling

Added current device consumption to InputConsumerNoResampling in
consumeBatchedInputEvents' body

Bug: 329776327
Flag: EXEMPT refactor
Test: TEST=libinput_tests; m $TEST && $ANDROID_HOST_OUT/nativetest64/$TEST/$TEST --gtest_filter="InputConsumerTest*"
Change-Id: Ibfcb28f019543a88b3ffada137199b1c3933d542
diff --git a/libs/input/InputConsumerNoResampling.cpp b/libs/input/InputConsumerNoResampling.cpp
index de55828..f33afe7 100644
--- a/libs/input/InputConsumerNoResampling.cpp
+++ b/libs/input/InputConsumerNoResampling.cpp
@@ -227,7 +227,7 @@
 
 InputConsumerNoResampling::~InputConsumerNoResampling() {
     ensureCalledOnLooperThread(__func__);
-    consumeBatchedInputEvents(std::nullopt);
+    consumeBatchedInputEvents(/*requestedFrameTime=*/std::nullopt);
     while (!mOutboundQueue.empty()) {
         processOutboundEvents();
         // This is our last chance to ack the events. If we don't ack them here, we will get an ANR,
@@ -253,8 +253,7 @@
 
     int handledEvents = 0;
     if (events & ALOOPER_EVENT_INPUT) {
-        std::vector<InputMessage> messages = readAllMessages();
-        handleMessages(std::move(messages));
+        handleMessages(readAllMessages());
         handledEvents |= ALOOPER_EVENT_INPUT;
     }
 
@@ -362,10 +361,8 @@
                 // add it to batch
                 mBatches[deviceId].emplace(msg);
             } else {
-                // consume all pending batches for this event immediately
-                // TODO(b/329776327): figure out if this could be smarter by limiting the
-                // consumption only to the current device.
-                consumeBatchedInputEvents(std::nullopt);
+                // consume all pending batches for this device immediately
+                consumeBatchedInputEvents(deviceId, /*requestedFrameTime=*/std::nullopt);
                 handleMessage(msg);
             }
         } else {
@@ -483,13 +480,13 @@
 }
 
 std::pair<std::unique_ptr<MotionEvent>, std::optional<uint32_t>>
-InputConsumerNoResampling::createBatchedMotionEvent(const nsecs_t frameTime,
+InputConsumerNoResampling::createBatchedMotionEvent(const nsecs_t requestedFrameTime,
                                                     std::queue<InputMessage>& messages) {
     std::unique_ptr<MotionEvent> motionEvent;
     std::optional<uint32_t> firstSeqForBatch;
     const nanoseconds resampleLatency =
             (mResampler != nullptr) ? mResampler->getResampleLatency() : nanoseconds{0};
-    const nanoseconds adjustedFrameTime = nanoseconds{frameTime} - resampleLatency;
+    const nanoseconds adjustedFrameTime = nanoseconds{requestedFrameTime} - resampleLatency;
 
     while (!messages.empty() &&
            (messages.front().body.motion.eventTime <= adjustedFrameTime.count())) {
@@ -511,36 +508,52 @@
         if (!messages.empty()) {
             futureSample = &messages.front();
         }
-        mResampler->resampleMotionEvent(nanoseconds{frameTime}, *motionEvent, futureSample);
+        mResampler->resampleMotionEvent(nanoseconds{requestedFrameTime}, *motionEvent,
+                                        futureSample);
     }
     return std::make_pair(std::move(motionEvent), firstSeqForBatch);
 }
 
 bool InputConsumerNoResampling::consumeBatchedInputEvents(
-        std::optional<nsecs_t> requestedFrameTime) {
+        std::optional<DeviceId> deviceId, std::optional<nsecs_t> requestedFrameTime) {
     ensureCalledOnLooperThread(__func__);
     // When batching is not enabled, we want to consume all events. That's equivalent to having an
-    // infinite frameTime.
-    const nsecs_t frameTime = requestedFrameTime.value_or(std::numeric_limits<nsecs_t>::max());
+    // infinite requestedFrameTime.
+    requestedFrameTime = requestedFrameTime.value_or(std::numeric_limits<nsecs_t>::max());
     bool producedEvents = false;
-    for (auto& [_, messages] : mBatches) {
-        auto [motion, firstSeqForBatch] = createBatchedMotionEvent(frameTime, messages);
+
+    for (auto deviceIdIter = (deviceId.has_value()) ? (mBatches.find(*deviceId))
+                                                    : (mBatches.begin());
+         deviceIdIter != mBatches.cend(); ++deviceIdIter) {
+        std::queue<InputMessage>& messages = deviceIdIter->second;
+        auto [motion, firstSeqForBatch] = createBatchedMotionEvent(*requestedFrameTime, messages);
         if (motion != nullptr) {
             LOG_ALWAYS_FATAL_IF(!firstSeqForBatch.has_value());
             mCallbacks.onMotionEvent(std::move(motion), *firstSeqForBatch);
             producedEvents = true;
         } else {
-            // This is OK, it just means that the frameTime is too old (all events that we have
-            // pending are in the future of the frametime). Maybe print a
-            // warning? If there are multiple devices active though, this might be normal and can
-            // just be ignored, unless none of them resulted in any consumption (in that case, this
-            // function would already return "false" so we could just leave it up to the caller).
+            // This is OK, it just means that the requestedFrameTime is too old (all events that we
+            // have pending are in the future of the requestedFrameTime). Maybe print a warning? If
+            // there are multiple devices active though, this might be normal and can just be
+            // ignored, unless none of them resulted in any consumption (in that case, this function
+            // would already return "false" so we could just leave it up to the caller).
+        }
+
+        if (deviceId.has_value()) {
+            // We already consumed events for this device. Break here to prevent iterating over the
+            // other devices.
+            break;
         }
     }
     std::erase_if(mBatches, [](const auto& pair) { return pair.second.empty(); });
     return producedEvents;
 }
 
+bool InputConsumerNoResampling::consumeBatchedInputEvents(
+        std::optional<nsecs_t> requestedFrameTime) {
+    return consumeBatchedInputEvents(/*deviceId=*/std::nullopt, requestedFrameTime);
+}
+
 void InputConsumerNoResampling::ensureCalledOnLooperThread(const char* func) const {
     sp<Looper> callingThreadLooper = Looper::getForThread();
     if (callingThreadLooper != mLooper->getLooper()) {
diff --git a/libs/input/tests/InputConsumer_test.cpp b/libs/input/tests/InputConsumer_test.cpp
index 55be453..dec78aa 100644
--- a/libs/input/tests/InputConsumer_test.cpp
+++ b/libs/input/tests/InputConsumer_test.cpp
@@ -20,9 +20,11 @@
 #include <optional>
 #include <utility>
 
+#include <TestEventMatchers.h>
 #include <TestInputChannel.h>
 #include <TestLooper.h>
 #include <android-base/logging.h>
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <input/BlockingQueue.h>
 #include <input/InputEventBuilders.h>
@@ -34,6 +36,10 @@
 
 using std::chrono::nanoseconds;
 
+using ::testing::AllOf;
+using ::testing::Matcher;
+using ::testing::Not;
+
 } // namespace
 
 class InputConsumerTest : public testing::Test, public InputConsumerCallbacks {
@@ -47,7 +53,17 @@
                                                             std::make_unique<LegacyResampler>());
     }
 
-    void assertOnBatchedInputEventPendingWasCalled();
+    void assertOnBatchedInputEventPendingWasCalled() {
+        ASSERT_GT(mOnBatchedInputEventPendingInvocationCount, 0UL)
+                << "onBatchedInputEventPending has not been called.";
+        --mOnBatchedInputEventPendingInvocationCount;
+    }
+
+    void assertReceivedMotionEvent(const Matcher<MotionEvent>& matcher) {
+        std::unique_ptr<MotionEvent> motionEvent = mMotionEvents.pop();
+        ASSERT_NE(motionEvent, nullptr);
+        EXPECT_THAT(*motionEvent, matcher);
+    }
 
     std::shared_ptr<TestInputChannel> mClientTestChannel;
     std::shared_ptr<TestLooper> mTestLooper;
@@ -96,12 +112,6 @@
     };
 };
 
-void InputConsumerTest::assertOnBatchedInputEventPendingWasCalled() {
-    ASSERT_GT(mOnBatchedInputEventPendingInvocationCount, 0UL)
-            << "onBatchedInputEventPending has not been called.";
-    --mOnBatchedInputEventPendingInvocationCount;
-}
-
 TEST_F(InputConsumerTest, MessageStreamBatchedInMotionEvent) {
     mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/0}
                                                .eventTime(nanoseconds{0ms}.count())
@@ -122,7 +132,7 @@
 
     assertOnBatchedInputEventPendingWasCalled();
 
-    mConsumer->consumeBatchedInputEvents(std::nullopt);
+    mConsumer->consumeBatchedInputEvents(/*frameTime=*/std::nullopt);
 
     std::unique_ptr<MotionEvent> downMotionEvent = mMotionEvents.pop();
     ASSERT_NE(downMotionEvent, nullptr);
@@ -182,4 +192,50 @@
     mClientTestChannel->assertFinishMessage(/*seq=*/2, true);
     mClientTestChannel->assertFinishMessage(/*seq=*/3, true);
 }
+
+TEST_F(InputConsumerTest, BatchedEventsMultiDeviceConsumption) {
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/0}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_DOWN)
+                                               .build());
+
+    mTestLooper->invokeCallback(mClientTestChannel->getFd(), ALOOPER_EVENT_INPUT);
+    assertReceivedMotionEvent(AllOf(WithDeviceId(0), WithMotionAction(AMOTION_EVENT_ACTION_DOWN)));
+
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/1}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_MOVE)
+                                               .build());
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/2}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_MOVE)
+                                               .build());
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/3}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_MOVE)
+                                               .build());
+
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/4}
+                                               .deviceId(1)
+                                               .action(AMOTION_EVENT_ACTION_DOWN)
+                                               .build());
+
+    mTestLooper->invokeCallback(mClientTestChannel->getFd(), ALOOPER_EVENT_INPUT);
+    assertReceivedMotionEvent(AllOf(WithDeviceId(1), WithMotionAction(AMOTION_EVENT_ACTION_DOWN)));
+
+    mClientTestChannel->enqueueMessage(InputMessageBuilder{InputMessage::Type::MOTION, /*seq=*/5}
+                                               .deviceId(0)
+                                               .action(AMOTION_EVENT_ACTION_UP)
+                                               .build());
+
+    mTestLooper->invokeCallback(mClientTestChannel->getFd(), ALOOPER_EVENT_INPUT);
+    assertReceivedMotionEvent(AllOf(WithDeviceId(0), WithMotionAction(AMOTION_EVENT_ACTION_MOVE),
+                                    Not(MotionEventIsResampled())));
+
+    mClientTestChannel->assertFinishMessage(/*seq=*/0, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/4, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/1, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/2, /*handled=*/true);
+    mClientTestChannel->assertFinishMessage(/*seq=*/3, /*handled=*/true);
+}
 } // namespace android
diff --git a/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp b/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp
index 467c3b4..1210f71 100644
--- a/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp
+++ b/libs/input/tests/InputPublisherAndConsumerNoResampling_test.cpp
@@ -364,7 +364,7 @@
         if (!mConsumer->probablyHasInput()) {
             ADD_FAILURE() << "should deterministically have input because there is a batch";
         }
-        mConsumer->consumeBatchedInputEvents(std::nullopt);
+        mConsumer->consumeBatchedInputEvents(/*frameTime=*/std::nullopt);
     };
     void onFocusEvent(std::unique_ptr<FocusEvent> event, uint32_t seq) override {
         mFocusEvents.push(std::move(event));
diff --git a/libs/input/tests/TestEventMatchers.h b/libs/input/tests/TestEventMatchers.h
new file mode 100644
index 0000000..dd2e40c
--- /dev/null
+++ b/libs/input/tests/TestEventMatchers.h
@@ -0,0 +1,110 @@
+/**
+ * Copyright 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include <input/Input.h>
+
+namespace android {
+
+/**
+ * This file contains a copy of Matchers from .../inputflinger/tests/TestEventMatchers.h. Ideally,
+ * implementations must not be duplicated.
+ * TODO(b/365606513): Find a way to share TestEventMatchers.h between inputflinger and libinput.
+ */
+
+class WithDeviceIdMatcher {
+public:
+    using is_gtest_matcher = void;
+    explicit WithDeviceIdMatcher(DeviceId deviceId) : mDeviceId(deviceId) {}
+
+    bool MatchAndExplain(const InputEvent& event, std::ostream*) const {
+        return mDeviceId == event.getDeviceId();
+    }
+
+    void DescribeTo(std::ostream* os) const { *os << "with device id " << mDeviceId; }
+
+    void DescribeNegationTo(std::ostream* os) const { *os << "wrong device id"; }
+
+private:
+    const DeviceId mDeviceId;
+};
+
+inline WithDeviceIdMatcher WithDeviceId(int32_t deviceId) {
+    return WithDeviceIdMatcher(deviceId);
+}
+
+class WithMotionActionMatcher {
+public:
+    using is_gtest_matcher = void;
+    explicit WithMotionActionMatcher(int32_t action) : mAction(action) {}
+
+    bool MatchAndExplain(const MotionEvent& event, std::ostream*) const {
+        bool matches = mAction == event.getAction();
+        if (event.getAction() == AMOTION_EVENT_ACTION_CANCEL) {
+            matches &= (event.getFlags() & AMOTION_EVENT_FLAG_CANCELED) != 0;
+        }
+        return matches;
+    }
+
+    void DescribeTo(std::ostream* os) const {
+        *os << "with motion action " << MotionEvent::actionToString(mAction);
+        if (mAction == AMOTION_EVENT_ACTION_CANCEL) {
+            *os << " and FLAG_CANCELED";
+        }
+    }
+
+    void DescribeNegationTo(std::ostream* os) const { *os << "wrong action"; }
+
+private:
+    const int32_t mAction;
+};
+
+inline WithMotionActionMatcher WithMotionAction(int32_t action) {
+    return WithMotionActionMatcher(action);
+}
+
+class MotionEventIsResampledMatcher {
+public:
+    using is_gtest_matcher = void;
+
+    bool MatchAndExplain(const MotionEvent& motionEvent, std::ostream*) const {
+        const size_t numSamples = motionEvent.getHistorySize() + 1;
+        const size_t numPointers = motionEvent.getPointerCount();
+        if (numPointers <= 0 || numSamples <= 0) {
+            return false;
+        }
+        for (size_t i = 0; i < numPointers; ++i) {
+            const PointerCoords& pointerCoords =
+                    motionEvent.getSamplePointerCoords()[numSamples * numPointers + i];
+            if (!pointerCoords.isResampled) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    void DescribeTo(std::ostream* os) const { *os << "MotionEvent is resampled."; }
+
+    void DescribeNegationTo(std::ostream* os) const { *os << "MotionEvent is not resampled."; }
+};
+
+inline MotionEventIsResampledMatcher MotionEventIsResampled() {
+    return MotionEventIsResampledMatcher();
+}
+} // namespace android