Support dimension in condition in metric producers.

Test: added e2e tests for count/duration metrics sliced by fields in condition and with/without links.

Change-Id: Ie34deba68e6780abdde458be3f0ce5284e76a1a2
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 0455f6a..ae4df3e 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -21,6 +21,7 @@
 #include "guardrail/StatsdStats.h"
 #include "stats_util.h"
 #include "stats_log_util.h"
+#include "dimension.h"
 
 #include <limits.h>
 #include <stdlib.h>
@@ -71,13 +72,15 @@
     }
 
     // TODO: use UidMap if uid->pkg_name is required
-    mDimensions = metric.dimensions_in_what();
+    mDimensionsInWhat = metric.dimensions_in_what();
+    mDimensionsInCondition = metric.dimensions_in_condition();
 
     if (metric.links().size() > 0) {
         mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
                                metric.links().end());
-        mConditionSliced = true;
     }
+    mConditionSliced = (metric.links().size() > 0)||
+        (mDimensionsInCondition.has_field() && mDimensionsInCondition.child_size() > 0);
 
     VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -99,7 +102,10 @@
     auto count_metrics = report->mutable_count_metrics();
     for (const auto& counter : mPastBuckets) {
         CountMetricData* metricData = count_metrics->add_data();
-        *metricData->mutable_dimensions_in_what() = counter.first.getDimensionsValue();
+        *metricData->mutable_dimensions_in_what() =
+            counter.first.getDimensionKeyInWhat().getDimensionsValue();
+        *metricData->mutable_dimensions_in_condition() =
+            counter.first.getDimensionKeyInCondition().getDimensionsValue();
         for (const auto& bucket : counter.second) {
             CountBucketInfo* bucketInfo = metricData->add_bucket_info();
             bucketInfo->set_start_bucket_nanos(bucket.mBucketStartNs);
@@ -123,17 +129,26 @@
     VLOG("metric %lld dump report now...",(long long)mMetricId);
 
     for (const auto& counter : mPastBuckets) {
-        const HashableDimensionKey& hashableKey = counter.first;
-        VLOG("  dimension key %s", hashableKey.c_str());
+        const MetricDimensionKey& dimensionKey = counter.first;
+        VLOG("  dimension key %s", dimensionKey.c_str());
 
         long long wrapperToken =
                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension.
-        long long dimensionToken = protoOutput->start(
+        long long dimensionInWhatToken = protoOutput->start(
                 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
-        writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput);
-        protoOutput->end(dimensionToken);
+        writeDimensionsValueProtoToStream(
+            dimensionKey.getDimensionKeyInWhat().getDimensionsValue(), protoOutput);
+        protoOutput->end(dimensionInWhatToken);
+
+        if (dimensionKey.hasDimensionKeyInCondition()) {
+            long long dimensionInConditionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+            writeDimensionsValueProtoToStream(
+                dimensionKey.getDimensionKeyInCondition().getDimensionsValue(), protoOutput);
+            protoOutput->end(dimensionInConditionToken);
+        }
 
         // Then fill bucket_info (CountBucketInfo).
         for (const auto& bucket : counter.second) {
@@ -166,7 +181,7 @@
     mCondition = conditionMet;
 }
 
-bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
+bool CountMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
     if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
         return false;
     }
@@ -187,7 +202,7 @@
 }
 
 void CountMetricProducer::onMatchedLogEventInternalLocked(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKey, bool condition,
         const LogEvent& event) {
     uint64_t eventTimeNs = event.GetTimestampNs();
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 061b7a3..8659d47 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -50,7 +50,7 @@
 
 protected:
     void onMatchedLogEventInternalLocked(
-            const size_t matcherIndex, const HashableDimensionKey& eventKey,
+            const size_t matcherIndex, const MetricDimensionKey& eventKey,
             const ConditionKey& conditionKey, bool condition,
             const LogEvent& event) override;
 
@@ -74,14 +74,14 @@
     void flushIfNeededLocked(const uint64_t& newEventTime);
 
     // TODO: Add a lock to mPastBuckets.
-    std::unordered_map<HashableDimensionKey, std::vector<CountBucket>> mPastBuckets;
+    std::unordered_map<MetricDimensionKey, std::vector<CountBucket>> mPastBuckets;
 
     // The current bucket.
     std::shared_ptr<DimToValMap> mCurrentSlicedCounter = std::make_shared<DimToValMap>();
 
     static const size_t kBucketSize = sizeof(CountBucket{});
 
-    bool hitGuardRailLocked(const HashableDimensionKey& newKey);
+    bool hitGuardRailLocked(const MetricDimensionKey& newKey);
 
     FRIEND_TEST(CountMetricProducerTest, TestNonDimensionalEvents);
     FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition);
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 000874c..efbdae1 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -21,6 +21,7 @@
 #include "guardrail/StatsdStats.h"
 #include "stats_util.h"
 #include "stats_log_util.h"
+#include "dimension.h"
 
 #include <limits.h>
 #include <stdlib.h>
@@ -81,13 +82,15 @@
     }
 
     // TODO: use UidMap if uid->pkg_name is required
-    mDimensions = metric.dimensions_in_what();
+    mDimensionsInWhat = metric.dimensions_in_what();
+    mDimensionsInCondition = metric.dimensions_in_condition();
 
     if (metric.links().size() > 0) {
         mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
                                metric.links().end());
-        mConditionSliced = true;
     }
+    mConditionSliced = (metric.links().size() > 0)||
+        (mDimensionsInCondition.has_field() && mDimensionsInCondition.child_size() > 0);
 
     VLOG("metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -113,15 +116,17 @@
 }
 
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
-        const HashableDimensionKey& eventKey) const {
+        const MetricDimensionKey& eventKey) const {
     switch (mAggregationType) {
         case DurationMetric_AggregationType_SUM:
             return make_unique<OringDurationTracker>(
-                    mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex, mNested,
+                    mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex,
+                    mDimensionsInCondition, mNested,
                     mCurrentBucketStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
         case DurationMetric_AggregationType_MAX_SPARSE:
             return make_unique<MaxDurationTracker>(
-                    mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex, mNested,
+                    mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex,
+                    mDimensionsInCondition, mNested,
                     mCurrentBucketStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
     }
 }
@@ -129,10 +134,34 @@
 void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
     flushIfNeededLocked(eventTime);
+
     // Now for each of the on-going event, check if the condition has changed for them.
-    for (auto& pair : mCurrentSlicedDuration) {
+    for (auto& pair : mCurrentSlicedDurationTrackerMap) {
         pair.second->onSlicedConditionMayChange(eventTime);
     }
+
+
+    std::unordered_set<HashableDimensionKey> conditionDimensionsKeySet;
+    ConditionState conditionState = mWizard->getMetConditionDimension(
+        mConditionTrackerIndex, mDimensionsInCondition, &conditionDimensionsKeySet);
+
+    bool condition = (conditionState == ConditionState::kTrue);
+    for (auto& pair : mCurrentSlicedDurationTrackerMap) {
+        conditionDimensionsKeySet.erase(pair.first.getDimensionKeyInCondition());
+    }
+    std::unordered_set<MetricDimensionKey> newKeys;
+    for (const auto& conditionDimensionsKey : conditionDimensionsKeySet) {
+        for (auto& pair : mCurrentSlicedDurationTrackerMap) {
+            auto newKey =
+                MetricDimensionKey(pair.first.getDimensionKeyInWhat(), conditionDimensionsKey);
+            if (newKeys.find(newKey) == newKeys.end()) {
+                mCurrentSlicedDurationTrackerMap[newKey] = pair.second->clone(eventTime);
+                mCurrentSlicedDurationTrackerMap[newKey]->setEventKey(newKey);
+                mCurrentSlicedDurationTrackerMap[newKey]->onSlicedConditionMayChange(eventTime);
+            }
+            newKeys.insert(newKey);
+        }
+    }
 }
 
 void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet,
@@ -142,7 +171,7 @@
     flushIfNeededLocked(eventTime);
     // TODO: need to populate the condition change time from the event which triggers the condition
     // change, instead of using current time.
-    for (auto& pair : mCurrentSlicedDuration) {
+    for (auto& pair : mCurrentSlicedDurationTrackerMap) {
         pair.second->onConditionChanged(conditionMet, eventTime);
     }
 }
@@ -155,7 +184,10 @@
     auto duration_metrics = report->mutable_duration_metrics();
     for (const auto& pair : mPastBuckets) {
         DurationMetricData* metricData = duration_metrics->add_data();
-        *metricData->mutable_dimensions_in_what() = pair.first.getDimensionsValue();
+        *metricData->mutable_dimensions_in_what() =
+            pair.first.getDimensionKeyInWhat().getDimensionsValue();
+        *metricData->mutable_dimensions_in_condition() =
+            pair.first.getDimensionKeyInCondition().getDimensionsValue();
         for (const auto& bucket : pair.second) {
             auto bucketInfo = metricData->add_bucket_info();
             bucketInfo->set_start_bucket_nanos(bucket.mBucketStartNs);
@@ -179,8 +211,8 @@
     VLOG("metric %lld dump report now...", (long long)mMetricId);
 
     for (const auto& pair : mPastBuckets) {
-        const HashableDimensionKey& hashableKey = pair.first;
-        VLOG("  dimension key %s", hashableKey.c_str());
+        const MetricDimensionKey& dimensionKey = pair.first;
+        VLOG("  dimension key %s", dimensionKey.c_str());
 
         long long wrapperToken =
                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
@@ -188,9 +220,18 @@
         // First fill dimension.
         long long dimensionToken = protoOutput->start(
                 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
-        writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput);
+        writeDimensionsValueProtoToStream(
+            dimensionKey.getDimensionKeyInWhat().getDimensionsValue(), protoOutput);
         protoOutput->end(dimensionToken);
 
+        if (dimensionKey.hasDimensionKeyInCondition()) {
+            long long dimensionInConditionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+            writeDimensionsValueProtoToStream(
+                dimensionKey.getDimensionKeyInCondition().getDimensionsValue(), protoOutput);
+            protoOutput->end(dimensionInConditionToken);
+        }
+
         // Then fill bucket_info (DurationBucketInfo).
         for (const auto& bucket : pair.second) {
             long long bucketInfoToken = protoOutput->start(
@@ -219,10 +260,11 @@
         return;
     }
     VLOG("flushing...........");
-    for (auto it = mCurrentSlicedDuration.begin(); it != mCurrentSlicedDuration.end();) {
+    for (auto it = mCurrentSlicedDurationTrackerMap.begin();
+            it != mCurrentSlicedDurationTrackerMap.end();) {
         if (it->second->flushIfNeeded(eventTime, &mPastBuckets)) {
             VLOG("erase bucket for key %s", it->first.c_str());
-            it = mCurrentSlicedDuration.erase(it);
+            it = mCurrentSlicedDurationTrackerMap.erase(it);
         } else {
             ++it;
         }
@@ -234,28 +276,28 @@
 }
 
 void DurationMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
-    if (mCurrentSlicedDuration.size() == 0) {
+    if (mCurrentSlicedDurationTrackerMap.size() == 0) {
         return;
     }
 
     fprintf(out, "DurationMetric %lld dimension size %lu\n", (long long)mMetricId,
-            (unsigned long)mCurrentSlicedDuration.size());
+            (unsigned long)mCurrentSlicedDurationTrackerMap.size());
     if (verbose) {
-        for (const auto& slice : mCurrentSlicedDuration) {
+        for (const auto& slice : mCurrentSlicedDurationTrackerMap) {
             fprintf(out, "\t%s\n", slice.first.c_str());
             slice.second->dumpStates(out, verbose);
         }
     }
 }
 
-bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
+bool DurationMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
     // the key is not new, we are good.
-    if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
+    if (mCurrentSlicedDurationTrackerMap.find(newKey) != mCurrentSlicedDurationTrackerMap.end()) {
         return false;
     }
     // 1. Report the tuple count if the tuple count > soft limit
-    if (mCurrentSlicedDuration.size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
-        size_t newTupleCount = mCurrentSlicedDuration.size() + 1;
+    if (mCurrentSlicedDurationTrackerMap.size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
+        size_t newTupleCount = mCurrentSlicedDurationTrackerMap.size() + 1;
         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
         if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
@@ -268,27 +310,26 @@
 }
 
 void DurationMetricProducer::onMatchedLogEventInternalLocked(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKeys, bool condition,
         const LogEvent& event) {
     flushIfNeededLocked(event.GetTimestampNs());
 
     if (matcherIndex == mStopAllIndex) {
-        for (auto& pair : mCurrentSlicedDuration) {
+        for (auto& pair : mCurrentSlicedDurationTrackerMap) {
             pair.second->noteStopAll(event.GetTimestampNs());
         }
         return;
     }
 
-
-    if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
+    if (mCurrentSlicedDurationTrackerMap.find(eventKey) == mCurrentSlicedDurationTrackerMap.end()) {
         if (hitGuardRailLocked(eventKey)) {
             return;
         }
-        mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey);
+        mCurrentSlicedDurationTrackerMap[eventKey] = createDurationTracker(eventKey);
     }
 
-    auto it = mCurrentSlicedDuration.find(eventKey);
+    auto it = mCurrentSlicedDurationTrackerMap.find(eventKey);
 
     std::vector<DimensionsValue> values;
     getDimensionKeys(event, mInternalDimensions, &values);
@@ -302,10 +343,11 @@
     } else {
         for (const DimensionsValue& value : values) {
             if (matcherIndex == mStartIndex) {
-                it->second->noteStart(HashableDimensionKey(value), condition,
-                                      event.GetTimestampNs(), conditionKeys);
+                it->second->noteStart(
+                    HashableDimensionKey(value), condition, event.GetTimestampNs(), conditionKeys);
             } else if (matcherIndex == mStopIndex) {
-                it->second->noteStop(HashableDimensionKey(value), event.GetTimestampNs(), false);
+                it->second->noteStop(
+                   HashableDimensionKey(value), event.GetTimestampNs(), false);
             }
         }
     }
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index d8cab92..152e570 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -50,7 +50,7 @@
 
 protected:
     void onMatchedLogEventInternalLocked(
-            const size_t matcherIndex, const HashableDimensionKey& eventKey,
+            const size_t matcherIndex, const MetricDimensionKey& eventKey,
             const ConditionKey& conditionKeys, bool condition,
             const LogEvent& event) override;
 
@@ -92,21 +92,21 @@
 
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     // TODO: Add a lock to mPastBuckets.
-    std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>> mPastBuckets;
+    std::unordered_map<MetricDimensionKey, std::vector<DurationBucket>> mPastBuckets;
 
     // The current bucket.
-    std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
-            mCurrentSlicedDuration;
+    std::unordered_map<MetricDimensionKey, std::unique_ptr<DurationTracker>>
+            mCurrentSlicedDurationTrackerMap;
 
     // Helper function to create a duration tracker given the metric aggregation type.
     std::unique_ptr<DurationTracker> createDurationTracker(
-            const HashableDimensionKey& eventKey) const;
+        const MetricDimensionKey& eventKey) const;
 
     // This hides the base class's std::vector<sp<AnomalyTracker>> mAnomalyTrackers
     std::vector<sp<DurationAnomalyTracker>> mAnomalyTrackers;
 
     // Util function to check whether the specified dimension hits the guardrail.
-    bool hitGuardRailLocked(const HashableDimensionKey& newKey);
+    bool hitGuardRailLocked(const MetricDimensionKey& newKey);
 
     static const size_t kBucketSize = sizeof(DurationBucket{});
 
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 25c86d0..820d591 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -128,7 +128,7 @@
 }
 
 void EventMetricProducer::onMatchedLogEventInternalLocked(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKey, bool condition,
         const LogEvent& event) {
     if (!condition) {
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h
index 9da0dd0..935f206 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.h
+++ b/cmds/statsd/src/metrics/EventMetricProducer.h
@@ -45,7 +45,7 @@
 
 private:
     void onMatchedLogEventInternalLocked(
-            const size_t matcherIndex, const HashableDimensionKey& eventKey,
+            const size_t matcherIndex, const MetricDimensionKey& eventKey,
             const ConditionKey& conditionKey, bool condition,
             const LogEvent& event) override;
 
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1072c5a..d6cb189 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -82,13 +82,15 @@
     mFieldFilter = metric.gauge_fields_filter();
 
     // TODO: use UidMap if uid->pkg_name is required
-    mDimensions = metric.dimensions_in_what();
+    mDimensionsInWhat = metric.dimensions_in_what();
+    mDimensionsInCondition = metric.dimensions_in_condition();
 
     if (metric.links().size() > 0) {
         mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
                                metric.links().end());
-        mConditionSliced = true;
     }
+    mConditionSliced = (metric.links().size() > 0)||
+        (mDimensionsInCondition.has_field() && mDimensionsInCondition.child_size() > 0);
 
     // Kicks off the puller immediately.
     if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
@@ -136,18 +138,27 @@
     long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
 
     for (const auto& pair : mPastBuckets) {
-        const HashableDimensionKey& hashableKey = pair.first;
+        const MetricDimensionKey& dimensionKey = pair.first;
 
-        VLOG("  dimension key %s", hashableKey.c_str());
+        VLOG("  dimension key %s", dimensionKey.c_str());
         long long wrapperToken =
                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension.
         long long dimensionToken = protoOutput->start(
                 FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
-        writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput);
+        writeDimensionsValueProtoToStream(
+            dimensionKey.getDimensionKeyInWhat().getDimensionsValue(), protoOutput);
         protoOutput->end(dimensionToken);
 
+        if (dimensionKey.hasDimensionKeyInCondition()) {
+            long long dimensionInConditionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+            writeDimensionsValueProtoToStream(
+                dimensionKey.getDimensionKeyInCondition().getDimensionsValue(), protoOutput);
+            protoOutput->end(dimensionInConditionToken);
+        }
+
         // Then fill bucket_info (GaugeBucketInfo).
         for (const auto& bucket : pair.second) {
             long long bucketInfoToken = protoOutput->start(
@@ -248,7 +259,7 @@
     }
 }
 
-bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
+bool GaugeMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
         return false;
     }
@@ -268,7 +279,7 @@
 }
 
 void GaugeMetricProducer::onMatchedLogEventInternalLocked(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKey, bool condition,
         const LogEvent& event) {
     if (condition == false) {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 6c01347..86d0ccd 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -44,7 +44,7 @@
     uint64_t mBucketNum;
 };
 
-typedef std::unordered_map<HashableDimensionKey, std::vector<GaugeAtom>>
+typedef std::unordered_map<MetricDimensionKey, std::vector<GaugeAtom>>
     DimToGaugeAtomsMap;
 
 // This gauge metric producer first register the puller to automatically pull the gauge at the
@@ -64,7 +64,7 @@
 
 protected:
     void onMatchedLogEventInternalLocked(
-            const size_t matcherIndex, const HashableDimensionKey& eventKey,
+            const size_t matcherIndex, const MetricDimensionKey& eventKey,
             const ConditionKey& conditionKey, bool condition,
             const LogEvent& event) override;
 
@@ -99,7 +99,7 @@
 
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     // TODO: Add a lock to mPastBuckets.
-    std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
+    std::unordered_map<MetricDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
 
     // The current bucket.
     std::shared_ptr<DimToGaugeAtomsMap> mCurrentSlicedBucket;
@@ -119,7 +119,7 @@
     std::shared_ptr<FieldValueMap> getGaugeFields(const LogEvent& event);
 
     // Util function to check whether the specified dimension hits the guardrail.
-    bool hitGuardRailLocked(const HashableDimensionKey& newKey);
+    bool hitGuardRailLocked(const MetricDimensionKey& newKey);
 
     static const size_t kBucketSize = sizeof(GaugeBucket{});
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index e74924a..85e655b 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -15,6 +15,8 @@
  */
 #include "MetricProducer.h"
 
+#include "dimension.h"
+
 namespace android {
 namespace os {
 namespace statsd {
@@ -30,29 +32,51 @@
 
     bool condition;
     ConditionKey conditionKey;
+
+    std::unordered_set<HashableDimensionKey> dimensionKeysInCondition;
     if (mConditionSliced) {
         for (const auto& link : mConditionLinks) {
             getDimensionKeysForCondition(event, link, &conditionKey[link.condition()]);
         }
-        if (mWizard->query(mConditionTrackerIndex, conditionKey) != ConditionState::kTrue) {
-            condition = false;
-        } else {
-            condition = true;
-        }
+        auto conditionState =
+            mWizard->query(mConditionTrackerIndex, conditionKey, mDimensionsInCondition,
+                           &dimensionKeysInCondition);
+        condition = (conditionState == ConditionState::kTrue);
     } else {
         condition = mCondition;
     }
 
-    if (mDimensions.has_field() && mDimensions.child_size() > 0) {
-        vector<DimensionsValue> dimensionValues;
-        getDimensionKeys(event, mDimensions, &dimensionValues);
-        for (const DimensionsValue& dimensionValue : dimensionValues) {
+    vector<DimensionsValue> dimensionInWhatValues;
+    if (mDimensionsInWhat.has_field() && mDimensionsInWhat.child_size() > 0) {
+        getDimensionKeys(event, mDimensionsInWhat, &dimensionInWhatValues);
+    }
+
+    if (dimensionInWhatValues.empty() && dimensionKeysInCondition.empty()) {
+        onMatchedLogEventInternalLocked(
+            matcherIndex, DEFAULT_METRIC_DIMENSION_KEY, conditionKey, condition, event);
+    } else if (dimensionKeysInCondition.empty()) {
+        for (const DimensionsValue& whatValue : dimensionInWhatValues) {
             onMatchedLogEventInternalLocked(
-                matcherIndex, HashableDimensionKey(dimensionValue), conditionKey, condition, event);
+                matcherIndex,
+                MetricDimensionKey(HashableDimensionKey(whatValue), DEFAULT_DIMENSION_KEY),
+                conditionKey, condition, event);
+        }
+    } else if (dimensionInWhatValues.empty()) {
+        for (const auto& conditionDimensionKey : dimensionKeysInCondition) {
+            onMatchedLogEventInternalLocked(
+                matcherIndex,
+                MetricDimensionKey(DEFAULT_DIMENSION_KEY, conditionDimensionKey),
+                conditionKey, condition, event);
         }
     } else {
-        onMatchedLogEventInternalLocked(
-            matcherIndex, DEFAULT_DIMENSION_KEY, conditionKey, condition, event);
+        for (const DimensionsValue& whatValue : dimensionInWhatValues) {
+            for (const auto& conditionDimensionKey : dimensionKeysInCondition) {
+                onMatchedLogEventInternalLocked(
+                    matcherIndex,
+                    MetricDimensionKey(HashableDimensionKey(whatValue), conditionDimensionKey),
+                    conditionKey, condition, event);
+            }
+        }
     }
 }
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 6f33073..3b1498f 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -91,6 +91,7 @@
         std::lock_guard<std::mutex> lock(mMutex);
         return onDumpReportLocked(dumpTimeNs, protoOutput);
     }
+
     void onDumpReport(const uint64_t dumpTimeNs, StatsLogReport* report) {
         std::lock_guard<std::mutex> lock(mMutex);
         return onDumpReportLocked(dumpTimeNs, report);
@@ -156,7 +157,8 @@
 
     int mConditionTrackerIndex;
 
-    FieldMatcher mDimensions;  // The dimension defined in statsd_config
+    FieldMatcher mDimensionsInWhat;  // The dimensions_in_what defined in statsd_config
+    FieldMatcher mDimensionsInCondition;  // The dimensions_in_condition defined in statsd_config
 
     std::vector<MetricConditionLink> mConditionLinks;
 
@@ -178,7 +180,7 @@
      * [event]: the log event, just in case the metric needs its data, e.g., EventMetric.
      */
     virtual void onMatchedLogEventInternalLocked(
-            const size_t matcherIndex, const HashableDimensionKey& eventKey,
+            const size_t matcherIndex, const MetricDimensionKey& eventKey,
             const ConditionKey& conditionKey, bool condition,
             const LogEvent& event) = 0;
 
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 9cdbafc..d4b9102 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -143,6 +143,10 @@
     FRIEND_TEST(MetricConditionLinkE2eTest, TestMultiplePredicatesAndLinks);
     FRIEND_TEST(AttributionE2eTest, TestAttributionMatchAndSlice);
     FRIEND_TEST(GaugeMetricE2eTest, TestMultipleFieldsForPushedEvent);
+    FRIEND_TEST(DimensionInConditionE2eTest, TestCountMetricNoLink);
+    FRIEND_TEST(DimensionInConditionE2eTest, TestCountMetricWithLink);
+    FRIEND_TEST(DimensionInConditionE2eTest, TestDurationMetricNoLink);
+    FRIEND_TEST(DimensionInConditionE2eTest, TestDurationMetricWithLink);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index ae0c673..c9cc7bb 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -81,13 +81,15 @@
     }
 
     mBucketSizeNs = bucketSizeMills * 1000000;
-    mDimensions = metric.dimensions_in_what();
+    mDimensionsInWhat = metric.dimensions_in_what();
+    mDimensionsInCondition = metric.dimensions_in_condition();
 
     if (metric.links().size() > 0) {
         mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
                                metric.links().end());
-        mConditionSliced = true;
     }
+    mConditionSliced = (metric.links().size() > 0)||
+        (mDimensionsInCondition.has_field() && mDimensionsInCondition.child_size() > 0);
 
     if (!metric.has_condition() && mPullTagId != -1) {
         VLOG("Setting up periodic pulling for %d", mPullTagId);
@@ -124,7 +126,10 @@
     auto value_metrics = report->mutable_value_metrics();
     for (const auto& pair : mPastBuckets) {
         ValueMetricData* metricData = value_metrics->add_data();
-        *metricData->mutable_dimensions_in_what() = pair.first.getDimensionsValue();
+        *metricData->mutable_dimensions_in_what() =
+            pair.first.getDimensionKeyInWhat().getDimensionsValue();
+        *metricData->mutable_dimensions_in_condition() =
+            pair.first.getDimensionKeyInCondition().getDimensionsValue();
         for (const auto& bucket : pair.second) {
             ValueBucketInfo* bucketInfo = metricData->add_bucket_info();
             bucketInfo->set_start_bucket_nanos(bucket.mBucketStartNs);
@@ -146,16 +151,24 @@
     long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
 
     for (const auto& pair : mPastBuckets) {
-        const HashableDimensionKey& hashableKey = pair.first;
-        VLOG("  dimension key %s", hashableKey.c_str());
+        const MetricDimensionKey& dimensionKey = pair.first;
+        VLOG("  dimension key %s", dimensionKey.c_str());
         long long wrapperToken =
                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension.
         long long dimensionToken = protoOutput->start(
             FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
-        writeDimensionsValueProtoToStream(hashableKey.getDimensionsValue(), protoOutput);
+        writeDimensionsValueProtoToStream(
+            dimensionKey.getDimensionKeyInWhat().getDimensionsValue(), protoOutput);
         protoOutput->end(dimensionToken);
+        if (dimensionKey.hasDimensionKeyInCondition()) {
+            long long dimensionInConditionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+            writeDimensionsValueProtoToStream(
+                dimensionKey.getDimensionKeyInCondition().getDimensionsValue(), protoOutput);
+            protoOutput->end(dimensionInConditionToken);
+        }
 
         // Then fill bucket_info (ValueBucketInfo).
         for (const auto& bucket : pair.second) {
@@ -239,7 +252,7 @@
     }
 }
 
-bool ValueMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
+bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
     // ===========GuardRail==============
     // 1. Report the tuple count if the tuple count > soft limit
     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -260,7 +273,7 @@
 }
 
 void ValueMetricProducer::onMatchedLogEventInternalLocked(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKey, bool condition,
         const LogEvent& event) {
     uint64_t eventTimeNs = event.GetTimestampNs();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 9f750cf..121ec7d 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -49,7 +49,7 @@
 
 protected:
     void onMatchedLogEventInternalLocked(
-            const size_t matcherIndex, const HashableDimensionKey& eventKey,
+            const size_t matcherIndex, const MetricDimensionKey& eventKey,
             const ConditionKey& conditionKey, bool condition,
             const LogEvent& event) override;
 
@@ -99,16 +99,16 @@
         long sum;
     } Interval;
 
-    std::unordered_map<HashableDimensionKey, Interval> mCurrentSlicedBucket;
+    std::unordered_map<MetricDimensionKey, Interval> mCurrentSlicedBucket;
 
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     // TODO: Add a lock to mPastBuckets.
-    std::unordered_map<HashableDimensionKey, std::vector<ValueBucket>> mPastBuckets;
+    std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets;
 
     std::shared_ptr<FieldValueMap> getValueFields(const LogEvent& event);
 
     // Util function to check whether the specified dimension hits the guardrail.
-    bool hitGuardRailLocked(const HashableDimensionKey& newKey);
+    bool hitGuardRailLocked(const MetricDimensionKey& newKey);
 
     static const size_t kBucketSize = sizeof(ValueBucket{});
 
diff --git a/cmds/statsd/src/metrics/duration_helper/DurationTracker.h b/cmds/statsd/src/metrics/duration_helper/DurationTracker.h
index c2d2cea..45735a8 100644
--- a/cmds/statsd/src/metrics/duration_helper/DurationTracker.h
+++ b/cmds/statsd/src/metrics/duration_helper/DurationTracker.h
@@ -60,8 +60,9 @@
 
 class DurationTracker {
 public:
-    DurationTracker(const ConfigKey& key, const int64_t& id, const HashableDimensionKey& eventKey,
-                    sp<ConditionWizard> wizard, int conditionIndex, bool nesting,
+    DurationTracker(const ConfigKey& key, const int64_t& id, const MetricDimensionKey& eventKey,
+                    sp<ConditionWizard> wizard, int conditionIndex,
+                    const FieldMatcher& dimensionInCondition, bool nesting,
                     uint64_t currentBucketStartNs, uint64_t bucketSizeNs, bool conditionSliced,
                     const std::vector<sp<DurationAnomalyTracker>>& anomalyTrackers)
         : mConfigKey(key),
@@ -70,6 +71,7 @@
           mWizard(wizard),
           mConditionTrackerIndex(conditionIndex),
           mBucketSizeNs(bucketSizeNs),
+          mDimensionInCondition(dimensionInCondition),
           mNested(nesting),
           mCurrentBucketStartTimeNs(currentBucketStartNs),
           mDuration(0),
@@ -79,6 +81,8 @@
 
     virtual ~DurationTracker(){};
 
+    virtual unique_ptr<DurationTracker> clone(const uint64_t eventTime) = 0;
+
     virtual void noteStart(const HashableDimensionKey& key, bool condition,
                            const uint64_t eventTime, const ConditionKey& conditionKey) = 0;
     virtual void noteStop(const HashableDimensionKey& key, const uint64_t eventTime,
@@ -92,7 +96,7 @@
     // events, so that the owner can safely remove the tracker.
     virtual bool flushIfNeeded(
             uint64_t timestampNs,
-            std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) = 0;
+            std::unordered_map<MetricDimensionKey, std::vector<DurationBucket>>* output) = 0;
 
     // Predict the anomaly timestamp given the current status.
     virtual int64_t predictAnomalyTimestampNs(const DurationAnomalyTracker& anomalyTracker,
@@ -100,6 +104,10 @@
     // Dump internal states for debugging
     virtual void dumpStates(FILE* out, bool verbose) const = 0;
 
+    void setEventKey(const MetricDimensionKey& eventKey) {
+         mEventKey = eventKey;
+    }
+
 protected:
     // Starts the anomaly alarm.
     void startAnomalyAlarm(const uint64_t eventTime) {
@@ -150,7 +158,7 @@
 
     const int64_t mTrackerId;
 
-    HashableDimensionKey mEventKey;
+    MetricDimensionKey mEventKey;
 
     sp<ConditionWizard> mWizard;
 
@@ -158,6 +166,8 @@
 
     const int64_t mBucketSizeNs;
 
+    const FieldMatcher mDimensionInCondition;
+
     const bool mNested;
 
     uint64_t mCurrentBucketStartTimeNs;
diff --git a/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.cpp b/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.cpp
index 412a0c9..db7dea4 100644
--- a/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.cpp
+++ b/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.cpp
@@ -25,13 +25,23 @@
 namespace statsd {
 
 MaxDurationTracker::MaxDurationTracker(const ConfigKey& key, const int64_t& id,
-                                       const HashableDimensionKey& eventKey,
-                                       sp<ConditionWizard> wizard, int conditionIndex, bool nesting,
+                                       const MetricDimensionKey& eventKey,
+                                       sp<ConditionWizard> wizard, int conditionIndex,
+                                       const FieldMatcher& dimensionInCondition, bool nesting,
                                        uint64_t currentBucketStartNs, uint64_t bucketSizeNs,
                                        bool conditionSliced,
                                        const vector<sp<DurationAnomalyTracker>>& anomalyTrackers)
-    : DurationTracker(key, id, eventKey, wizard, conditionIndex, nesting, currentBucketStartNs,
-                      bucketSizeNs, conditionSliced, anomalyTrackers) {
+    : DurationTracker(key, id, eventKey, wizard, conditionIndex, dimensionInCondition, nesting,
+                      currentBucketStartNs, bucketSizeNs, conditionSliced, anomalyTrackers) {
+}
+
+unique_ptr<DurationTracker> MaxDurationTracker::clone(const uint64_t eventTime) {
+    auto clonedTracker = make_unique<MaxDurationTracker>(*this);
+    for (auto it = clonedTracker->mInfos.begin(); it != clonedTracker->mInfos.end(); ++it) {
+        it->second.lastStartTime = eventTime;
+        it->second.lastDuration = 0;
+    }
+    return clonedTracker;
 }
 
 bool MaxDurationTracker::hitGuardRail(const HashableDimensionKey& newKey) {
@@ -44,7 +54,7 @@
     if (mInfos.size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
         size_t newTupleCount = mInfos.size() + 1;
         StatsdStats::getInstance().noteMetricDimensionSize(
-            mConfigKey, hashDimensionsValue(mTrackerId, mEventKey.getDimensionsValue()),
+            mConfigKey, hashMetricDimensionKey(mTrackerId, mEventKey),
             newTupleCount);
         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
         if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
@@ -149,7 +159,7 @@
 }
 
 bool MaxDurationTracker::flushIfNeeded(
-        uint64_t eventTime, unordered_map<HashableDimensionKey, vector<DurationBucket>>* output) {
+        uint64_t eventTime, unordered_map<MetricDimensionKey, vector<DurationBucket>>* output) {
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
         return false;
     }
@@ -236,8 +246,14 @@
         if (pair.second.state == kStopped) {
             continue;
         }
-        bool conditionMet = mWizard->query(mConditionTrackerIndex, pair.second.conditionKeys) ==
-                            ConditionState::kTrue;
+        std::unordered_set<HashableDimensionKey> conditionDimensionKeySet;
+        ConditionState conditionState = mWizard->query(
+            mConditionTrackerIndex, pair.second.conditionKeys, mDimensionInCondition,
+            &conditionDimensionKeySet);
+        bool conditionMet = (conditionState == ConditionState::kTrue) &&
+            (!mDimensionInCondition.has_field() ||
+             conditionDimensionKeySet.find(mEventKey.getDimensionKeyInCondition()) !=
+                conditionDimensionKeySet.end());
         VLOG("key: %s, condition: %d", pair.first.c_str(), conditionMet);
         noteConditionChanged(pair.first, conditionMet, timestamp);
     }
diff --git a/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.h b/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.h
index 661d131..4d32a06 100644
--- a/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.h
+++ b/cmds/statsd/src/metrics/duration_helper/MaxDurationTracker.h
@@ -29,10 +29,15 @@
 class MaxDurationTracker : public DurationTracker {
 public:
     MaxDurationTracker(const ConfigKey& key, const int64_t& id,
-                       const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard,
-                       int conditionIndex, bool nesting, uint64_t currentBucketStartNs,
-                       uint64_t bucketSizeNs, bool conditionSliced,
+                       const MetricDimensionKey& eventKey, sp<ConditionWizard> wizard,
+                       int conditionIndex, const FieldMatcher& dimensionInCondition, bool nesting,
+                       uint64_t currentBucketStartNs, uint64_t bucketSizeNs, bool conditionSliced,
                        const std::vector<sp<DurationAnomalyTracker>>& anomalyTrackers);
+
+    MaxDurationTracker(const MaxDurationTracker& tracker) = default;
+
+    unique_ptr<DurationTracker> clone(const uint64_t eventTime) override;
+
     void noteStart(const HashableDimensionKey& key, bool condition, const uint64_t eventTime,
                    const ConditionKey& conditionKey) override;
     void noteStop(const HashableDimensionKey& key, const uint64_t eventTime,
@@ -41,7 +46,7 @@
 
     bool flushIfNeeded(
             uint64_t timestampNs,
-            std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) override;
+            std::unordered_map<MetricDimensionKey, std::vector<DurationBucket>>* output) override;
 
     void onSlicedConditionMayChange(const uint64_t timestamp) override;
     void onConditionChanged(bool condition, const uint64_t timestamp) override;
diff --git a/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp b/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp
index 75d7c08..0feae36 100644
--- a/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp
+++ b/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.cpp
@@ -25,17 +25,25 @@
 using std::pair;
 
 OringDurationTracker::OringDurationTracker(
-        const ConfigKey& key, const int64_t& id, const HashableDimensionKey& eventKey,
-        sp<ConditionWizard> wizard, int conditionIndex, bool nesting, uint64_t currentBucketStartNs,
+        const ConfigKey& key, const int64_t& id, const MetricDimensionKey& eventKey,
+        sp<ConditionWizard> wizard, int conditionIndex,
+        const FieldMatcher& dimensionInCondition, bool nesting, uint64_t currentBucketStartNs,
         uint64_t bucketSizeNs, bool conditionSliced,
         const vector<sp<DurationAnomalyTracker>>& anomalyTrackers)
-    : DurationTracker(key, id, eventKey, wizard, conditionIndex, nesting, currentBucketStartNs,
-                      bucketSizeNs, conditionSliced, anomalyTrackers),
+    : DurationTracker(key, id, eventKey, wizard, conditionIndex, dimensionInCondition, nesting,
+                      currentBucketStartNs, bucketSizeNs, conditionSliced, anomalyTrackers),
       mStarted(),
       mPaused() {
     mLastStartTime = 0;
 }
 
+unique_ptr<DurationTracker> OringDurationTracker::clone(const uint64_t eventTime) {
+    auto clonedTracker = make_unique<OringDurationTracker>(*this);
+    clonedTracker->mLastStartTime = eventTime;
+    clonedTracker->mDuration = 0;
+    return clonedTracker;
+}
+
 bool OringDurationTracker::hitGuardRail(const HashableDimensionKey& newKey) {
     // ===========GuardRail==============
     // 1. Report the tuple count if the tuple count > soft limit
@@ -45,7 +53,7 @@
     if (mConditionKeyMap.size() > StatsdStats::kDimensionKeySizeSoftLimit - 1) {
         size_t newTupleCount = mConditionKeyMap.size() + 1;
         StatsdStats::getInstance().noteMetricDimensionSize(
-            mConfigKey, hashDimensionsValue(mTrackerId, mEventKey.getDimensionsValue()),
+            mConfigKey, hashMetricDimensionKey(mTrackerId, mEventKey),
             newTupleCount);
         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
         if (newTupleCount > StatsdStats::kDimensionKeySizeHardLimit) {
@@ -76,7 +84,6 @@
     if (mConditionSliced && mConditionKeyMap.find(key) == mConditionKeyMap.end()) {
         mConditionKeyMap[key] = conditionKey;
     }
-
     VLOG("Oring: %s start, condition %d", key.c_str(), condition);
 }
 
@@ -128,7 +135,7 @@
 }
 
 bool OringDurationTracker::flushIfNeeded(
-        uint64_t eventTime, unordered_map<HashableDimensionKey, vector<DurationBucket>>* output) {
+        uint64_t eventTime, unordered_map<MetricDimensionKey, vector<DurationBucket>>* output) {
     if (eventTime < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return false;
     }
@@ -184,8 +191,14 @@
                 ++it;
                 continue;
             }
-            if (mWizard->query(mConditionTrackerIndex, mConditionKeyMap[key]) !=
-                ConditionState::kTrue) {
+            std::unordered_set<HashableDimensionKey> conditionDimensionKeySet;
+            ConditionState conditionState =
+                mWizard->query(mConditionTrackerIndex, mConditionKeyMap[key],
+                               mDimensionInCondition, &conditionDimensionKeySet);
+            if (conditionState != ConditionState::kTrue ||
+                (mDimensionInCondition.has_field() &&
+                 conditionDimensionKeySet.find(mEventKey.getDimensionKeyInCondition()) ==
+                    conditionDimensionKeySet.end())) {
                 startedToPaused.push_back(*it);
                 it = mStarted.erase(it);
                 VLOG("Key %s started -> paused", key.c_str());
@@ -210,8 +223,14 @@
                 ++it;
                 continue;
             }
-            if (mWizard->query(mConditionTrackerIndex, mConditionKeyMap[key]) ==
-                ConditionState::kTrue) {
+            std::unordered_set<HashableDimensionKey> conditionDimensionKeySet;
+            ConditionState conditionState =
+                mWizard->query(mConditionTrackerIndex, mConditionKeyMap[key],
+                               mDimensionInCondition, &conditionDimensionKeySet);
+            if (conditionState == ConditionState::kTrue &&
+                (!mDimensionInCondition.has_field() ||
+                 conditionDimensionKeySet.find(mEventKey.getDimensionKeyInCondition())
+                    != conditionDimensionKeySet.end())) {
                 pausedToStarted.push_back(*it);
                 it = mPaused.erase(it);
                 VLOG("Key %s paused -> started", key.c_str());
diff --git a/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.h b/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.h
index 43469ca..75b5a81 100644
--- a/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.h
+++ b/cmds/statsd/src/metrics/duration_helper/OringDurationTracker.h
@@ -28,11 +28,15 @@
 class OringDurationTracker : public DurationTracker {
 public:
     OringDurationTracker(const ConfigKey& key, const int64_t& id,
-                         const HashableDimensionKey& eventKey, sp<ConditionWizard> wizard,
-                         int conditionIndex, bool nesting, uint64_t currentBucketStartNs,
-                         uint64_t bucketSizeNs, bool conditionSliced,
+                         const MetricDimensionKey& eventKey, sp<ConditionWizard> wizard,
+                         int conditionIndex, const FieldMatcher& dimensionInCondition, bool nesting,
+                         uint64_t currentBucketStartNs, uint64_t bucketSizeNs, bool conditionSliced,
                          const std::vector<sp<DurationAnomalyTracker>>& anomalyTrackers);
 
+    OringDurationTracker(const OringDurationTracker& tracker) = default;
+
+    unique_ptr<DurationTracker> clone(const uint64_t eventTime) override;
+
     void noteStart(const HashableDimensionKey& key, bool condition, const uint64_t eventTime,
                    const ConditionKey& conditionKey) override;
     void noteStop(const HashableDimensionKey& key, const uint64_t eventTime,
@@ -44,7 +48,7 @@
 
     bool flushIfNeeded(
             uint64_t timestampNs,
-            std::unordered_map<HashableDimensionKey, std::vector<DurationBucket>>* output) override;
+            std::unordered_map<MetricDimensionKey, std::vector<DurationBucket>>* output) override;
 
     int64_t predictAnomalyTimestampNs(const DurationAnomalyTracker& anomalyTracker,
                                       const uint64_t currentTimestamp) const override;