ValueMetric supports multiple aggregation types

1. Add support for MIN, MAX, AVG
2. ValueMetric also allow floats now, in addition to long data type.
AnomalyDetection still takes long only. I am not sure if it makes
sense to do anomaly on AVG. I will leave that for later.
3. ValueMetric supports sliced condition change for pushed events.
I don't think it makes sense for pulled events to have sliced condition
changes so leave it for now.

Test: unit test
Change-Id: I8bc510d98ea9b8a6eb16d04ff99dce6b574249cd
diff --git a/cmds/statsd/src/FieldValue.cpp b/cmds/statsd/src/FieldValue.cpp
index f150f07..7b6d29b 100644
--- a/cmds/statsd/src/FieldValue.cpp
+++ b/cmds/statsd/src/FieldValue.cpp
@@ -141,6 +141,9 @@
         case FLOAT:
             float_value = from.float_value;
             break;
+        case DOUBLE:
+            double_value = from.double_value;
+            break;
         case STRING:
             str_value = from.str_value;
             break;
@@ -157,6 +160,8 @@
             return std::to_string(long_value) + "[L]";
         case FLOAT:
             return std::to_string(float_value) + "[F]";
+        case DOUBLE:
+            return std::to_string(double_value) + "[D]";
         case STRING:
             return str_value + "[S]";
         default:
@@ -174,6 +179,8 @@
             return long_value == that.long_value;
         case FLOAT:
             return float_value == that.float_value;
+        case DOUBLE:
+            return double_value == that.double_value;
         case STRING:
             return str_value == that.str_value;
         default:
@@ -190,6 +197,8 @@
             return long_value != that.long_value;
         case FLOAT:
             return float_value != that.float_value;
+        case DOUBLE:
+            return double_value != that.double_value;
         case STRING:
             return str_value != that.str_value;
         default:
@@ -207,6 +216,8 @@
             return long_value < that.long_value;
         case FLOAT:
             return float_value < that.float_value;
+        case DOUBLE:
+            return double_value < that.double_value;
         case STRING:
             return str_value < that.str_value;
         default:
@@ -214,6 +225,142 @@
     }
 }
 
+bool Value::operator>(const Value& that) const {
+    if (type != that.getType()) return type > that.getType();
+
+    switch (type) {
+        case INT:
+            return int_value > that.int_value;
+        case LONG:
+            return long_value > that.long_value;
+        case FLOAT:
+            return float_value > that.float_value;
+        case DOUBLE:
+            return double_value > that.double_value;
+        case STRING:
+            return str_value > that.str_value;
+        default:
+            return false;
+    }
+}
+
+bool Value::operator>=(const Value& that) const {
+    if (type != that.getType()) return type >= that.getType();
+
+    switch (type) {
+        case INT:
+            return int_value >= that.int_value;
+        case LONG:
+            return long_value >= that.long_value;
+        case FLOAT:
+            return float_value >= that.float_value;
+        case DOUBLE:
+            return double_value >= that.double_value;
+        case STRING:
+            return str_value >= that.str_value;
+        default:
+            return false;
+    }
+}
+
+Value Value::operator-(const Value& that) const {
+    Value v;
+    if (type != that.type) {
+        ALOGE("Can't operate on different value types, %d, %d", type, that.type);
+        return v;
+    }
+    if (type == STRING) {
+        ALOGE("Can't operate on string value type");
+        return v;
+    }
+
+    switch (type) {
+        case INT:
+            v.setInt(int_value - that.int_value);
+            break;
+        case LONG:
+            v.setLong(long_value - that.long_value);
+            break;
+        case FLOAT:
+            v.setFloat(float_value - that.float_value);
+            break;
+        case DOUBLE:
+            v.setDouble(double_value - that.double_value);
+            break;
+        default:
+            break;
+    }
+    return v;
+}
+
+Value& Value::operator=(const Value& that) {
+    type = that.type;
+    switch (type) {
+        case INT:
+            int_value = that.int_value;
+            break;
+        case LONG:
+            long_value = that.long_value;
+            break;
+        case FLOAT:
+            float_value = that.float_value;
+            break;
+        case DOUBLE:
+            double_value = that.double_value;
+            break;
+        case STRING:
+            str_value = that.str_value;
+            break;
+        default:
+            break;
+    }
+    return *this;
+}
+
+Value& Value::operator+=(const Value& that) {
+    if (type != that.type) {
+        ALOGE("Can't operate on different value types, %d, %d", type, that.type);
+        return *this;
+    }
+    if (type == STRING) {
+        ALOGE("Can't operate on string value type");
+        return *this;
+    }
+
+    switch (type) {
+        case INT:
+            int_value += that.int_value;
+            break;
+        case LONG:
+            long_value += that.long_value;
+            break;
+        case FLOAT:
+            float_value += that.float_value;
+            break;
+        case DOUBLE:
+            double_value += that.double_value;
+            break;
+        default:
+            break;
+    }
+    return *this;
+}
+
+double Value::getDouble() const {
+    switch (type) {
+        case INT:
+            return int_value;
+        case LONG:
+            return long_value;
+        case FLOAT:
+            return float_value;
+        case DOUBLE:
+            return double_value;
+        default:
+            return 0;
+    }
+}
+
 bool equalDimensions(const std::vector<Matcher>& dimension_a,
                      const std::vector<Matcher>& dimension_b) {
     bool eq = dimension_a.size() == dimension_b.size();
diff --git a/cmds/statsd/src/FieldValue.h b/cmds/statsd/src/FieldValue.h
index b1d6ab3..b1b885e 100644
--- a/cmds/statsd/src/FieldValue.h
+++ b/cmds/statsd/src/FieldValue.h
@@ -32,7 +32,7 @@
 const int32_t kClearLastBitDeco = 0x7f;
 const int32_t kClearAllPositionMatcherMask = 0xffff00ff;
 
-enum Type { UNKNOWN, INT, LONG, FLOAT, STRING };
+enum Type { UNKNOWN, INT, LONG, FLOAT, DOUBLE, STRING };
 
 int32_t getEncodedField(int32_t pos[], int32_t depth, bool includeDepth);
 
@@ -283,6 +283,11 @@
         type = FLOAT;
     }
 
+    Value(double v) {
+        double_value = v;
+        type = DOUBLE;
+    }
+
     Value(const std::string& v) {
         str_value = v;
         type = STRING;
@@ -298,10 +303,21 @@
         type = LONG;
     }
 
+    void setFloat(float v) {
+        float_value = v;
+        type = FLOAT;
+    }
+
+    void setDouble(double v) {
+        double_value = v;
+        type = DOUBLE;
+    }
+
     union {
         int32_t int_value;
         int64_t long_value;
         float float_value;
+        double double_value;
     };
     std::string str_value;
 
@@ -313,12 +329,19 @@
         return type;
     }
 
+    double getDouble() const;
+
     Value(const Value& from);
 
     bool operator==(const Value& that) const;
     bool operator!=(const Value& that) const;
 
     bool operator<(const Value& that) const;
+    bool operator>(const Value& that) const;
+    bool operator>=(const Value& that) const;
+    Value operator-(const Value& that) const;
+    Value& operator+=(const Value& that);
+    Value& operator=(const Value& that);
 };
 
 /**
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index f5e953a..c6f7bb4 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#define DEBUG false  // STOPSHIP if true
+#define DEBUG true  // STOPSHIP if true
 #include "Log.h"
 
 #include "ValueMetricProducer.h"
@@ -27,7 +27,7 @@
 
 using android::util::FIELD_COUNT_REPEATED;
 using android::util::FIELD_TYPE_BOOL;
-using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_DOUBLE;
 using android::util::FIELD_TYPE_INT32;
 using android::util::FIELD_TYPE_INT64;
 using android::util::FIELD_TYPE_MESSAGE;
@@ -64,7 +64,8 @@
 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
 // for ValueBucketInfo
-const int FIELD_ID_VALUE = 3;
+const int FIELD_ID_VALUE_LONG = 3;
+const int FIELD_ID_VALUE_DOUBLE = 7;
 const int FIELD_ID_BUCKET_NUM = 4;
 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
@@ -79,6 +80,7 @@
       mPullerManager(pullerManager),
       mValueField(metric.value_field()),
       mPullTagId(pullTagId),
+      mIsPulled(pullTagId != -1),
       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
@@ -88,7 +90,9 @@
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
                                   : StatsdStats::kDimensionKeySizeHardLimit),
-      mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) {
+      mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
+      mAggregationType(metric.aggregation_type()),
+      mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) {
     int64_t bucketSizeMills = 0;
     if (metric.has_bucket()) {
         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -123,9 +127,9 @@
     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
             HasPositionALL(metric.dimensions_in_condition());
 
-    // Kicks off the puller immediately.
     flushIfNeededLocked(startTimestampNs);
-    if (mPullTagId != -1) {
+    // Kicks off the puller immediately.
+    if (mIsPulled) {
         mPullerManager->RegisterReceiver(mPullTagId, this,
                                          mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs);
     }
@@ -136,7 +140,7 @@
 
 ValueMetricProducer::~ValueMetricProducer() {
     VLOG("~ValueMetricProducer() called");
-    if (mPullTagId != -1) {
+    if (mIsPulled) {
         mPullerManager->UnRegisterReceiver(mPullTagId, this);
     }
 }
@@ -245,11 +249,15 @@
                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
             }
-
-            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
+            if (mValueType == LONG) {
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
+                                   (long long)bucket.mValueLong);
+            } else {
+                protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble);
+            }
             protoOutput->end(bucketInfoToken);
-            VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
-                 (long long)bucket.mBucketEndNs, (long long)bucket.mValue);
+            VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs,
+                 (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble);
         }
         protoOutput->end(wrapperToken);
     }
@@ -271,7 +279,7 @@
 
     flushIfNeededLocked(eventTimeNs);
 
-    if (mPullTagId != -1) {
+    if (mIsPulled) {
         vector<shared_ptr<LogEvent>> allData;
         if (mPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) {
             if (allData.size() == 0) {
@@ -321,10 +329,10 @@
             (unsigned long)mCurrentSlicedBucket.size());
     if (verbose) {
         for (const auto& it : mCurrentSlicedBucket) {
-            fprintf(out, "\t(what)%s\t(condition)%s  (value)%lld\n",
-                it.first.getDimensionKeyInWhat().toString().c_str(),
-                it.first.getDimensionKeyInCondition().toString().c_str(),
-                (unsigned long long)it.second.sum);
+            fprintf(out, "\t(what)%s\t(condition)%s  (value)%s\n",
+                    it.first.getDimensionKeyInWhat().toString().c_str(),
+                    it.first.getDimensionKeyInCondition().toString().c_str(),
+                    it.second.value.toString().c_str());
         }
     }
 }
@@ -349,6 +357,27 @@
     return false;
 }
 
+const Value getDoubleOrLong(const Value& value) {
+    Value v;
+    switch (value.type) {
+        case INT:
+            v.setLong(value.int_value);
+            break;
+        case LONG:
+            v.setLong(value.long_value);
+            break;
+        case FLOAT:
+            v.setDouble(value.float_value);
+            break;
+        case DOUBLE:
+            v.setDouble(value.double_value);
+            break;
+        default:
+            break;
+    }
+    return v;
+}
+
 void ValueMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKey, bool condition,
@@ -367,19 +396,25 @@
     }
     Interval& interval = mCurrentSlicedBucket[eventKey];
 
-    int error = 0;
-    const int64_t value = event.GetLong(mField, &error);
-    if (error < 0) {
+    if (mField > event.size()) {
+        VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(),
+             (int)event.size());
         return;
     }
+    Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);
 
-    if (mPullTagId != -1) { // for pulled events
+    Value diff;
+    bool hasDiff = false;
+    if (mIsPulled) {
+        // Always require condition for pulled events. In the case of no condition, only pull
+        // on bucket boundaries, in which we fake condition changes.
         if (mCondition == true) {
             if (!interval.startUpdated) {
                 interval.start = value;
                 interval.startUpdated = true;
             } else {
-                // skip it if there is already value recorded for the start
+                // Skip it if there is already value recorded for the start. Happens when puller
+                // takes too long to finish. In this case we take the previous value.
                 VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str());
             }
         } else {
@@ -387,31 +422,55 @@
             // If not, take absolute value or drop it, based on config.
             if (interval.startUpdated) {
                 if (value >= interval.start) {
-                    interval.sum += (value - interval.start);
-                    interval.hasValue = true;
+                    diff = (value - interval.start);
+                    hasDiff = true;
                 } else {
                     if (mUseAbsoluteValueOnReset) {
-                        interval.sum += value;
-                        interval.hasValue = true;
+                        diff = value;
+                        hasDiff = true;
                     } else {
-                        VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId,
-                             (long long)interval.start, (long long)value);
+                        VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId,
+                             interval.start.toString().c_str(), value.toString().c_str());
                     }
                 }
                 interval.startUpdated = false;
             } else {
-                VLOG("No start for matching end %lld", (long long)value);
-                interval.tainted += 1;
+                VLOG("No start for matching end %s", value.toString().c_str());
             }
         }
-    } else {    // for pushed events, only accumulate when condition is true
-        if (mCondition == true || mConditionTrackerIndex < 0) {
-            interval.sum += value;
-            interval.hasValue = true;
+    } else {
+        // for pushed events, only aggregate when sliced condition is true
+        if (condition == true || mConditionTrackerIndex < 0) {
+            diff = value;
+            hasDiff = true;
         }
     }
+    if (hasDiff) {
+        if (interval.hasValue) {
+            switch (mAggregationType) {
+                case ValueMetric::SUM:
+                // for AVG, we add up and take average when flushing the bucket
+                case ValueMetric::AVG:
+                    interval.value += diff;
+                    break;
+                case ValueMetric::MIN:
+                    interval.value = diff < interval.value ? diff : interval.value;
+                    break;
+                case ValueMetric::MAX:
+                    interval.value = diff > interval.value ? diff : interval.value;
+                    break;
+                default:
+                    break;
+            }
+        } else {
+            interval.value = diff;
+            interval.hasValue = true;
+        }
+        interval.sampleSize += 1;
+    }
 
-    long wholeBucketVal = interval.sum;
+    // TODO: propgate proper values down stream when anomaly support doubles
+    long wholeBucketVal = interval.value.long_value;
     auto prev = mCurrentFullBucket.find(eventKey);
     if (prev != mCurrentFullBucket.end()) {
         wholeBucketVal += prev->second;
@@ -458,18 +517,15 @@
 
     if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
         // The current bucket is large enough to keep.
-        int tainted = 0;
         for (const auto& slice : mCurrentSlicedBucket) {
-            tainted += slice.second.tainted;
-            tainted += slice.second.startUpdated;
             if (slice.second.hasValue) {
-                info.mValue = slice.second.sum;
+                info.mValueLong = slice.second.value.long_value;
+                info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize;
                 // it will auto create new vector of ValuebucketInfo if the key is not found.
                 auto& bucketList = mPastBuckets[slice.first];
                 bucketList.push_back(info);
             }
         }
-        VLOG("%d tainted pairs in the bucket", tainted);
     } else {
         mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
@@ -478,7 +534,8 @@
         // Accumulate partial buckets with current value and then send to anomaly tracker.
         if (mCurrentFullBucket.size() > 0) {
             for (const auto& slice : mCurrentSlicedBucket) {
-                mCurrentFullBucket[slice.first] += slice.second.sum;
+                // TODO: fix this when anomaly can accept double values
+                mCurrentFullBucket[slice.first] += slice.second.value.long_value;
             }
             for (const auto& slice : mCurrentFullBucket) {
                 for (auto& tracker : mAnomalyTrackers) {
@@ -493,7 +550,9 @@
             for (const auto& slice : mCurrentSlicedBucket) {
                 for (auto& tracker : mAnomalyTrackers) {
                     if (tracker != nullptr) {
-                        tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum);
+                        // TODO: fix this when anomaly can accept double values
+                        tracker->addPastBucket(slice.first, slice.second.value.long_value,
+                                               mCurrentBucketNum);
                     }
                 }
             }
@@ -501,7 +560,8 @@
     } else {
         // Accumulate partial bucket.
         for (const auto& slice : mCurrentSlicedBucket) {
-            mCurrentFullBucket[slice.first] += slice.second.sum;
+            // TODO: fix this when anomaly can accept double values
+            mCurrentFullBucket[slice.first] += slice.second.value.long_value;
         }
     }
 
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 943d6dc..188e3de 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -23,6 +23,7 @@
 #include "../condition/ConditionTracker.h"
 #include "../external/PullDataReceiver.h"
 #include "../external/StatsPullerManager.h"
+#include "../stats_log_util.h"
 #include "MetricProducer.h"
 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
 
@@ -33,7 +34,8 @@
 struct ValueBucket {
     int64_t mBucketStartNs;
     int64_t mBucketEndNs;
-    int64_t mValue;
+    int64_t mValueLong;
+    double mValueDouble;
 };
 
 class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
@@ -45,6 +47,7 @@
 
     virtual ~ValueMetricProducer();
 
+    // Process data pulled on bucket boundary.
     void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
 
     // ValueMetric needs special logic if it's a pulled atom.
@@ -120,20 +123,22 @@
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
+    // if this is pulled metric
+    const bool mIsPulled;
+
     int mField;
 
     // internal state of a bucket.
     typedef struct {
         // Pulled data always come in pair of <start, end>. This holds the value
-        // for start. The diff (end - start) is added to sum.
-        int64_t start;
+        // for start. The diff (end - start) is taken as the real value.
+        Value start;
         // Whether the start data point is updated
         bool startUpdated;
-        // If end data point comes before the start, record this pair as tainted
-        // and the value is not added to the running sum.
-        int tainted;
-        // Running sum of known pairs in this bucket
-        int64_t sum;
+        // Current value, depending on the aggregation type.
+        Value value;
+        // Number of samples collected.
+        int sampleSize;
         // If this dimension has any non-tainted value. If not, don't report the
         // dimension.
         bool hasValue;
@@ -162,6 +167,10 @@
 
     const bool mUseAbsoluteValueOnReset;
 
+    const ValueMetric::AggregationType mAggregationType;
+
+    const Type mValueType;
+
     FRIEND_TEST(ValueMetricProducerTest, TestNonDimensionalEvents);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset);
@@ -176,6 +185,11 @@
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3);
+    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
+    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
+    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg);
+    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum);
+    FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 2fe17da..cfd6269 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -106,7 +106,11 @@
 
   optional int64 end_bucket_elapsed_nanos = 2;
 
-  optional int64 value = 3;
+  oneof values {
+      int64 value_long = 3;
+
+      double value_double = 7;
+  }
 
   optional int64 bucket_num = 4;
 
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index 638fbb9..fabc5f9 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -262,6 +262,9 @@
 
   enum AggregationType {
     SUM = 1;
+    MIN = 2;
+    MAX = 3;
+    AVG = 4;
   }
   optional AggregationType aggregation_type = 8 [default = SUM];
 
diff --git a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
index 744828e..f2e8f58 100644
--- a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
@@ -141,23 +141,23 @@
 
     EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(0).has_value());
+    EXPECT_TRUE(data.bucket_info(0).has_value_long());
 
     EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(1).has_value());
+    EXPECT_TRUE(data.bucket_info(1).has_value_long());
 
     EXPECT_EQ(baseTimeNs + 4 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 5 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(2).has_value());
+    EXPECT_TRUE(data.bucket_info(2).has_value_long());
 
     EXPECT_EQ(baseTimeNs + 6 * bucketSizeNs, data.bucket_info(3).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(3).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(3).has_value());
+    EXPECT_TRUE(data.bucket_info(3).has_value_long());
 
     EXPECT_EQ(baseTimeNs + 7 * bucketSizeNs, data.bucket_info(4).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(4).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(4).has_value());
+    EXPECT_TRUE(data.bucket_info(4).has_value_long());
 }
 
 TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) {
@@ -248,15 +248,15 @@
 
     EXPECT_EQ(baseTimeNs + 2 * bucketSizeNs, data.bucket_info(0).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 3 * bucketSizeNs, data.bucket_info(0).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(0).has_value());
+    EXPECT_TRUE(data.bucket_info(0).has_value_long());
 
     EXPECT_EQ(baseTimeNs + 8 * bucketSizeNs, data.bucket_info(1).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(1).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(1).has_value());
+    EXPECT_TRUE(data.bucket_info(1).has_value_long());
 
     EXPECT_EQ(baseTimeNs + 9 * bucketSizeNs, data.bucket_info(2).start_bucket_elapsed_nanos());
     EXPECT_EQ(baseTimeNs + 10 * bucketSizeNs, data.bucket_info(2).end_bucket_elapsed_nanos());
-    EXPECT_TRUE(data.bucket_info(2).has_value());
+    EXPECT_TRUE(data.bucket_info(2).has_value_long());
 }
 
 #else
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 5195f01..3559a7c 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -80,13 +80,11 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    valueProducer.setBucketSize(60 * NS_PER_SEC);
 
-    // startUpdated:true tainted:0 sum:0 start:11
+    // startUpdated:true sum:0 start:11
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -99,13 +97,12 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // tartUpdated:false tainted:0 sum:12
+    // tartUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0, curInterval.value.long_value);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     allData.clear();
     event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -116,13 +113,12 @@
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:12
+    // startUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0, curInterval.value.long_value);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -157,12 +153,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    valueProducer.setBucketSize(60 * NS_PER_SEC);
 
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -176,11 +170,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     allData.clear();
     event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -192,11 +185,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -230,12 +222,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    valueProducer.setBucketSize(60 * NS_PER_SEC);
 
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -249,8 +239,7 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -263,11 +252,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -316,11 +304,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     vector<shared_ptr<LogEvent>> allData;
@@ -335,19 +322,19 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:110
-    EXPECT_EQ(110, curInterval.start);
+    // startUpdated:false sum:0 start:110
+    EXPECT_EQ(110, curInterval.start.long_value);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:110
-    EXPECT_EQ(10, curInterval.sum);
+    // startUpdated:false sum:0 start:110
+    EXPECT_EQ(10, curInterval.value.long_value);
     EXPECT_EQ(false, curInterval.startUpdated);
 }
 
@@ -433,7 +420,7 @@
     valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
-    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue);
+    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
 
     allData.clear();
     event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
@@ -444,7 +431,7 @@
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(bucket2StartTimeNs, valueProducer.mCurrentBucketStartTimeNs);
-    EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue);
+    EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValueLong);
 }
 
 TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
@@ -493,7 +480,7 @@
     EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
-    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue);
+    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
     EXPECT_FALSE(valueProducer.mCondition);
 }
 
@@ -523,19 +510,20 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(10, curInterval.sum);
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
 
     valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(30, curInterval.sum);
+    EXPECT_EQ(30, curInterval.value.long_value);
 
     valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
@@ -572,7 +560,7 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(20, curInterval.sum);
+    EXPECT_EQ(20, curInterval.value.long_value);
 
     shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 30);
     event3->write(1);
@@ -583,7 +571,7 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(50, curInterval.sum);
+    EXPECT_EQ(50, curInterval.value.long_value);
 
     valueProducer.onConditionChangedLocked(false, bucketStartTimeNs + 35);
     shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 40);
@@ -595,12 +583,12 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(50, curInterval.sum);
+    EXPECT_EQ(50, curInterval.value.long_value);
 
     valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 TEST(ValueMetricProducerTest, TestAnomalyDetection) {
@@ -718,11 +706,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
 
-    // startUpdated:true tainted:0 sum:0 start:11
+    // startUpdated:true sum:0 start:11
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull 2 at correct time
@@ -736,13 +723,12 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // tartUpdated:false tainted:0 sum:12
+    // tartUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     // pull 3 come late.
     // The previous bucket gets closed with error. (Has start value 23, no ending)
@@ -757,14 +743,13 @@
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:12
+    // startUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(36, curInterval.start);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(36, curInterval.start.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -816,19 +801,17 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull on bucket boundary come late, condition change happens before it
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // Now the alarm is delivered.
@@ -844,8 +827,7 @@
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 }
 
@@ -909,28 +891,25 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull on bucket boundary come late, condition change happens before it
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // condition changed to true again, before the pull alarm is delivered
     valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(130, curInterval.start);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(130, curInterval.start.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // Now the alarm is delivered, but it is considered late, it has no effect
@@ -945,9 +924,8 @@
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(130, curInterval.start);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(130, curInterval.start.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 }
 
@@ -1000,11 +978,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull on bucket boundary come late, condition change happens before it.
@@ -1012,8 +989,7 @@
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // Alarm is delivered in time, but the pull is very slow, and pullers are called in order,
@@ -1029,11 +1005,247 @@
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 }
 
+TEST(ValueMetricProducerTest, TestPushedAggregateMin) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::MIN);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(20);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::MAX);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(20);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(20, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::AVG);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(15);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval;
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+    EXPECT_EQ(1, curInterval.sampleSize);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(25, curInterval.value.long_value);
+    EXPECT_EQ(2, curInterval.sampleSize);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(12.5, valueProducer.mPastBuckets.begin()->second.back().mValueDouble);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateSum) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::SUM);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(15);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(25, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced) {
+    string slicedConditionName = "UID";
+    const int conditionTagId = 2;
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(1);
+    metric.set_aggregation_type(ValueMetric::SUM);
+
+    metric.set_condition(StringToId(slicedConditionName));
+    MetricConditionLink* link = metric.add_links();
+    link->set_condition(StringToId(slicedConditionName));
+    buildSimpleAtomFieldMatcher(tagId, 2, link->mutable_fields_in_what());
+    buildSimpleAtomFieldMatcher(conditionTagId, 2, link->mutable_fields_in_condition());
+
+    LogEvent event1(tagId, bucketStartTimeNs + 10);
+    event1.write(10);  // value
+    event1.write("111"); // uid
+    event1.init();
+    ConditionKey key1;
+    key1[StringToId(slicedConditionName)] =
+        {getMockedDimensionKey(conditionTagId, 2, "111")};
+
+    LogEvent event2(tagId, bucketStartTimeNs + 20);
+    event2.write(15);
+    event2.write("222");
+    event2.init();
+    ConditionKey key2;
+    key2[StringToId(slicedConditionName)] =
+        {getMockedDimensionKey(conditionTagId, 2, "222")};
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    EXPECT_CALL(*wizard, query(_, key1, _, _, _, _)).WillOnce(Return(ConditionState::kFalse));
+
+    EXPECT_CALL(*wizard, query(_, key2, _, _, _, _)).WillOnce(Return(ConditionState::kTrue));
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
+
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(15, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(15, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android