diff options
| -rw-r--r-- | cmds/statsd/src/FieldValue.cpp | 20 | ||||
| -rw-r--r-- | cmds/statsd/src/FieldValue.h | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/guardrail/StatsdStats.cpp | 15 | ||||
| -rw-r--r-- | cmds/statsd/src/guardrail/StatsdStats.h | 6 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 259 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 57 | ||||
| -rw-r--r-- | cmds/statsd/src/stats_log.proto | 1 | ||||
| -rw-r--r-- | cmds/statsd/src/stats_log_util.cpp | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/stats_log_util.h | 5 | ||||
| -rw-r--r-- | cmds/statsd/src/statsd_config.proto | 11 | ||||
| -rw-r--r-- | cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp | 1 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 425 |
12 files changed, 417 insertions, 387 deletions
diff --git a/cmds/statsd/src/FieldValue.cpp b/cmds/statsd/src/FieldValue.cpp index fc1a61cac558..80ed80776829 100644 --- a/cmds/statsd/src/FieldValue.cpp +++ b/cmds/statsd/src/FieldValue.cpp @@ -18,6 +18,7 @@ #include "Log.h" #include "FieldValue.h" #include "HashableDimensionKey.h" +#include "math.h" namespace android { namespace os { @@ -174,6 +175,25 @@ std::string Value::toString() const { } } +bool Value::isZero() const { + switch (type) { + case INT: + return int_value == 0; + case LONG: + return long_value == 0; + case FLOAT: + return fabs(float_value) <= std::numeric_limits<float>::epsilon(); + case DOUBLE: + return fabs(double_value) <= std::numeric_limits<double>::epsilon(); + case STRING: + return str_value.size() == 0; + case STORAGE: + return storage_value.size() == 0; + default: + return false; + } +} + bool Value::operator==(const Value& that) const { if (type != that.getType()) return false; diff --git a/cmds/statsd/src/FieldValue.h b/cmds/statsd/src/FieldValue.h index 77163f9d8619..a5d00ac4e72b 100644 --- a/cmds/statsd/src/FieldValue.h +++ b/cmds/statsd/src/FieldValue.h @@ -331,6 +331,8 @@ struct Value { std::string toString() const; + bool isZero() const; + Type getType() const { return type; } diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp index d2919c51a65e..a0d77d6f922d 100644 --- a/cmds/statsd/src/guardrail/StatsdStats.cpp +++ b/cmds/statsd/src/guardrail/StatsdStats.cpp @@ -362,11 +362,17 @@ void StatsdStats::notePullDelay(int pullAtomId, int64_t pullDelayNs) { lock_guard<std::mutex> lock(mLock); auto& pullStats = mPulledAtomStats[pullAtomId]; pullStats.maxPullDelayNs = std::max(pullStats.maxPullDelayNs, pullDelayNs); - pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) / - (pullStats.numPullDelay + 1); + pullStats.avgPullDelayNs = + (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) / + (pullStats.numPullDelay + 1); pullStats.numPullDelay += 1; } +void StatsdStats::notePullDataError(int pullAtomId) { + lock_guard<std::mutex> lock(mLock); + mPulledAtomStats[pullAtomId].dataError++; +} + void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) { lock_guard<std::mutex> lock(mLock); @@ -422,6 +428,7 @@ void StatsdStats::resetInternalLocked() { pullStats.second.avgPullDelayNs = 0; pullStats.second.maxPullDelayNs = 0; pullStats.second.numPullDelay = 0; + pullStats.second.dataError = 0; } } @@ -530,11 +537,11 @@ void StatsdStats::dumpStats(int out) const { dprintf(out, "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average " "pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, " - "(max pull delay nanos)%lld\n", + "(max pull delay nanos)%lld, (data error)%ld\n", (int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache, (long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs, (long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs, - (long long)pair.second.maxPullDelayNs); + (long long)pair.second.maxPullDelayNs, pair.second.dataError); } if (mAnomalyAlarmRegisteredStats > 0) { diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h index 777d8652d2b6..2008abdb2345 100644 --- a/cmds/statsd/src/guardrail/StatsdStats.h +++ b/cmds/statsd/src/guardrail/StatsdStats.h @@ -279,6 +279,11 @@ public: void notePullFromCache(int pullAtomId); /* + * Notify data error for pulled atom. + */ + void notePullDataError(int pullAtomId); + + /* * Records time for actual pulling, not including those served from cache and not including * statsd processing delays. */ @@ -329,6 +334,7 @@ public: int64_t avgPullDelayNs = 0; int64_t maxPullDelayNs = 0; long numPullDelay = 0; + long dataError = 0; } PulledAtomStats; private: diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 636747985526..c8b1cf07eb32 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -92,7 +92,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric : StatsdStats::kDimensionKeySizeHardLimit), mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()), mAggregationType(metric.aggregation_type()), - mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) { + mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), + mValueDirection(metric.value_direction()), + mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); @@ -125,24 +127,25 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric } mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0); mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) || - HasPositionALL(metric.dimensions_in_condition()); + HasPositionALL(metric.dimensions_in_condition()); flushIfNeededLocked(startTimeNs); - // Kicks off the puller immediately. + if (mIsPulled) { mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(), mBucketSizeNs); } - // TODO: Only do this for partial buckets like first bucket. All other buckets should use + // Only do this for partial buckets like first bucket. All other buckets should use // flushIfNeeded to adjust start and end to bucket boundaries. // Adjust start for partial bucket mCurrentBucketStartTimeNs = startTimeNs; - if (mIsPulled) { + // Kicks off the puller immediately if condition is true and diff based. + if (mIsPulled && mCondition && mUseDiff) { pullLocked(startTimeNs); } - VLOG("value metric %lld created. bucket size %lld start_time: %lld", - (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); + VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), + (long long)mBucketSizeNs, (long long)mTimeBaseNs); } ValueMetricProducer::~ValueMetricProducer() { @@ -188,14 +191,14 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // Fills the dimension path if not slicing by ALL. if (!mSliceByPositionALL) { if (!mDimensionsInWhat.empty()) { - uint64_t dimenPathToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); + uint64_t dimenPathToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT); writeDimensionPathToProto(mDimensionsInWhat, protoOutput); protoOutput->end(dimenPathToken); } if (!mDimensionsInCondition.empty()) { - uint64_t dimenPathToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); + uint64_t dimenPathToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION); writeDimensionPathToProto(mDimensionsInCondition, protoOutput); protoOutput->end(dimenPathToken); } @@ -221,15 +224,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, // First fill dimension. if (mSliceByPositionALL) { - uint64_t dimensionToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); + uint64_t dimensionToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT); writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput); protoOutput->end(dimensionToken); if (dimensionKey.hasDimensionKeyInCondition()) { - uint64_t dimensionInConditionToken = protoOutput->start( - FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); - writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), - str_set, protoOutput); + uint64_t dimensionInConditionToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION); + writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set, + protoOutput); protoOutput->end(dimensionInConditionToken); } } else { @@ -237,8 +240,8 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput); if (dimensionKey.hasDimensionKeyInCondition()) { writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(), - FIELD_ID_DIMENSION_LEAF_IN_CONDITION, - str_set, protoOutput); + FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set, + protoOutput); } } @@ -256,15 +259,20 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM, (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs))); } - if (mValueType == LONG) { + if (bucket.value.getType() == LONG) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, - (long long)bucket.mValueLong); + (long long)bucket.value.long_value); + VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs, + (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value); + } else if (bucket.value.getType() == DOUBLE) { + protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, + bucket.value.double_value); + VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs, + (long long)bucket.mBucketEndNs, bucket.value.double_value); } else { - protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble); + VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType()); } protoOutput->end(bucketInfoToken); - VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs, - (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble); } protoOutput->end(wrapperToken); } @@ -279,8 +287,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { - mCondition = condition; - if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); @@ -289,9 +295,19 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); - if (mIsPulled) { + // Pull on condition changes. + if (mIsPulled && (mCondition != condition)) { pullLocked(eventTimeNs); } + + // when condition change from true to false, clear diff base + if (mUseDiff && mCondition && !condition) { + for (auto& slice : mCurrentSlicedBucket) { + slice.second.hasBase = false; + } + } + + mCondition = condition; } void ValueMetricProducer::pullLocked(const int64_t timestampNs) { @@ -306,30 +322,33 @@ void ValueMetricProducer::pullLocked(const int64_t timestampNs) { } } +int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { + return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; +} + void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { std::lock_guard<std::mutex> lock(mMutex); - if (mCondition == true || mConditionTrackerIndex < 0) { + if (mCondition) { if (allData.size() == 0) { return; } // For scheduled pulled data, the effective event time is snap to the nearest - // bucket boundary to make bucket finalize. + // bucket end. In the case of waking up from a deep sleep state, we will + // attribute to the previous bucket end. If the sleep was long but not very long, we + // will be in the immediate next bucket. Previous bucket may get a larger number as + // we pull at a later time than real bucket end. + // If the sleep was very long, we skip more than one bucket before sleep. In this case, + // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); - int64_t eventTime = mTimeBaseNs + - ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; - - // close the end of the bucket - mCondition = false; - for (const auto& data : allData) { - data->setElapsedTimestampNs(eventTime - 1); - onMatchedLogEventLocked(0, *data); + int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; + if (bucketEndTime < mCurrentBucketStartTimeNs) { + VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, + (long long)mCurrentBucketStartTimeNs); + return; } - - // start a new bucket - mCondition = true; for (const auto& data : allData) { - data->setElapsedTimestampNs(eventTime); + data->setElapsedTimestampNs(bucketEndTime); onMatchedLogEventLocked(0, *data); } } @@ -363,8 +382,8 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) { StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount); // 2. Don't add more tuples, we are above the allowed threshold. Drop the data. if (newTupleCount > mDimensionHardLimit) { - ALOGE("ValueMetric %lld dropping data for dimension key %s", - (long long)mMetricId, newKey.toString().c_str()); + ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId, + newKey.toString().c_str()); return true; } } @@ -393,10 +412,10 @@ const Value getDoubleOrLong(const Value& value) { return v; } -void ValueMetricProducer::onMatchedLogEventInternalLocked( - const size_t matcherIndex, const MetricDimensionKey& eventKey, - const ConditionKey& conditionKey, bool condition, - const LogEvent& event) { +void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex, + const MetricDimensionKey& eventKey, + const ConditionKey& conditionKey, + bool condition, const LogEvent& event) { int64_t eventTimeNs = event.GetElapsedTimestampNs(); if (eventTimeNs < mCurrentBucketStartTimeNs) { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, @@ -406,6 +425,14 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( flushIfNeededLocked(eventTimeNs); + // For pulled data, we already check condition when we decide to pull or + // in onDataPulled. So take all of them. + // For pushed data, just check condition. + if (!(mIsPulled || condition)) { + VLOG("ValueMetric skip event because condition is false"); + return; + } + if (hitGuardRailLocked(eventKey)) { return; } @@ -418,71 +445,70 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked( } Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue); - Value diff; - bool hasDiff = false; - if (mIsPulled) { - // Always require condition for pulled events. In the case of no condition, only pull - // on bucket boundaries, in which we fake condition changes. - if (mCondition == true) { - if (!interval.startUpdated) { - interval.start = value; - interval.startUpdated = true; - } else { - // Skip it if there is already value recorded for the start. Happens when puller - // takes too long to finish. In this case we take the previous value. - VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str()); - } - } else { - // Generally we expect value to be monotonically increasing. - // If not, take absolute value or drop it, based on config. - if (interval.startUpdated) { - if (value >= interval.start) { - diff = (value - interval.start); - hasDiff = true; + if (mUseDiff) { + // no base. just update base and return. + if (!interval.hasBase) { + interval.base = value; + interval.hasBase = true; + return; + } + Value diff; + switch (mValueDirection) { + case ValueMetric::INCREASING: + if (value >= interval.base) { + diff = value - interval.base; + } else if (mUseAbsoluteValueOnReset) { + diff = value; } else { - if (mUseAbsoluteValueOnReset) { - diff = value; - hasDiff = true; - } else { - VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId, - interval.start.toString().c_str(), value.toString().c_str()); - } + VLOG("Unexpected decreasing value"); + StatsdStats::getInstance().notePullDataError(mPullTagId); + interval.base = value; + return; } - interval.startUpdated = false; - } else { - VLOG("No start for matching end %s", value.toString().c_str()); - } - } - } else { - // for pushed events, only aggregate when sliced condition is true - if (condition == true || mConditionTrackerIndex < 0) { - diff = value; - hasDiff = true; + break; + case ValueMetric::DECREASING: + if (interval.base >= value) { + diff = interval.base - value; + } else if (mUseAbsoluteValueOnReset) { + diff = value; + } else { + VLOG("Unexpected increasing value"); + StatsdStats::getInstance().notePullDataError(mPullTagId); + interval.base = value; + return; + } + break; + case ValueMetric::ANY: + diff = value - interval.base; + break; + default: + break; } + interval.base = value; + value = diff; } - if (hasDiff) { - if (interval.hasValue) { - switch (mAggregationType) { - case ValueMetric::SUM: + + if (interval.hasValue) { + switch (mAggregationType) { + case ValueMetric::SUM: // for AVG, we add up and take average when flushing the bucket - case ValueMetric::AVG: - interval.value += diff; - break; - case ValueMetric::MIN: - interval.value = diff < interval.value ? diff : interval.value; - break; - case ValueMetric::MAX: - interval.value = diff > interval.value ? diff : interval.value; - break; - default: - break; - } - } else { - interval.value = diff; - interval.hasValue = true; + case ValueMetric::AVG: + interval.value += value; + break; + case ValueMetric::MIN: + interval.value = std::min(value, interval.value); + break; + case ValueMetric::MAX: + interval.value = std::max(value, interval.value); + break; + default: + break; } - interval.sampleSize += 1; + } else { + interval.value = value; + interval.hasValue = true; } + interval.sampleSize += 1; // TODO: propgate proper values down stream when anomaly support doubles long wholeBucketVal = interval.value.long_value; @@ -512,6 +538,10 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); + // take base again in future good bucket. + for (auto& slice : mCurrentSlicedBucket) { + slice.second.hasBase = false; + } } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); @@ -534,8 +564,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { if (slice.second.hasValue) { - info.mValueLong = slice.second.value.long_value; - info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize; + // skip the output if the diff is zero + if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) { + continue; + } + if (mAggregationType != ValueMetric::AVG) { + info.value = slice.second.value; + } else { + double sum = slice.second.value.type == LONG + ? (double)slice.second.value.long_value + : slice.second.value.double_value; + info.value.setDouble(sum / slice.second.sampleSize); + } // it will auto create new vector of ValuebucketInfo if the key is not found. auto& bucketList = mPastBuckets[slice.first]; bucketList.push_back(info); @@ -581,7 +621,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } // Reset counters - mCurrentSlicedBucket.clear(); + for (auto& slice : mCurrentSlicedBucket) { + slice.second.hasValue = false; + slice.second.sampleSize = 0; + } } size_t ValueMetricProducer::byteSizeLocked() const { diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 8db2d9553c2f..3416afe06b81 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -34,8 +34,7 @@ namespace statsd { struct ValueBucket { int64_t mBucketStartNs; int64_t mBucketEndNs; - int64_t mValueLong; - double mValueDouble; + Value value; }; class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { @@ -54,35 +53,11 @@ public: void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, const int64_t version) override { std::lock_guard<std::mutex> lock(mMutex); - - if (mIsPulled && (mCondition == true || mConditionTrackerIndex < 0)) { - vector<shared_ptr<LogEvent>> allData; - mPullerManager->Pull(mPullTagId, eventTimeNs, &allData); - if (allData.size() == 0) { - // This shouldn't happen since this valuemetric is not useful now. - } - - // Pretend the pulled data occurs right before the app upgrade event. - mCondition = false; - for (const auto& data : allData) { - data->setElapsedTimestampNs(eventTimeNs - 1); - onMatchedLogEventLocked(0, *data); - } - - flushCurrentBucketLocked(eventTimeNs); - mCurrentBucketStartTimeNs = eventTimeNs; - - mCondition = true; - for (const auto& data : allData) { - data->setElapsedTimestampNs(eventTimeNs); - onMatchedLogEventLocked(0, *data); - } - } else { - // For pushed value metric or pulled metric where condition is not true, - // we simply flush and reset the current bucket start. - flushCurrentBucketLocked(eventTimeNs); - mCurrentBucketStartTimeNs = eventTimeNs; + if (mIsPulled && mCondition) { + pullLocked(eventTimeNs - 1); } + flushCurrentBucketLocked(eventTimeNs); + mCurrentBucketStartTimeNs = eventTimeNs; }; protected: @@ -117,6 +92,9 @@ private: void dropDataLocked(const int64_t dropTimeNs) override; + // Calculate previous bucket end time based on current time. + int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs); + sp<StatsPullerManager> mPullerManager; const FieldMatcher mValueField; @@ -131,11 +109,10 @@ private: // internal state of a bucket. typedef struct { - // Pulled data always come in pair of <start, end>. This holds the value - // for start. The diff (end - start) is taken as the real value. - Value start; - // Whether the start data point is updated - bool startUpdated; + // Holds current base value of the dimension. Take diff and update if necessary. + Value base; + // Whether there is a base to diff to. + bool hasBase; // Current value, depending on the aggregation type. Value value; // Number of samples collected. @@ -172,7 +149,11 @@ private: const ValueMetric::AggregationType mAggregationType; - const Type mValueType; + const bool mUseDiff; + + const ValueMetric::ValueDirection mValueDirection; + + const bool mSkipZeroDiffOutput; FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); @@ -187,13 +168,13 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum); - FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced); FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); + FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); + FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); }; } // namespace statsd diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto index 8bfa36059e9a..4da3828ff88d 100644 --- a/cmds/statsd/src/stats_log.proto +++ b/cmds/statsd/src/stats_log.proto @@ -374,6 +374,7 @@ message StatsdStatsReport { optional int64 max_pull_time_nanos = 6; optional int64 average_pull_delay_nanos = 7; optional int64 max_pull_delay_nanos = 8; + optional int64 data_error = 9; } repeated PulledAtomStats pulled_atom_stats = 10; diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp index 44fa72e77a0d..504c5864f2ec 100644 --- a/cmds/statsd/src/stats_log_util.cpp +++ b/cmds/statsd/src/stats_log_util.cpp @@ -63,6 +63,7 @@ const int FIELD_ID_AVERAGE_PULL_TIME_NANOS = 5; const int FIELD_ID_MAX_PULL_TIME_NANOS = 6; const int FIELD_ID_AVERAGE_PULL_DELAY_NANOS = 7; const int FIELD_ID_MAX_PULL_DELAY_NANOS = 8; +const int FIELD_ID_DATA_ERROR = 9; namespace { @@ -446,6 +447,7 @@ void writePullerStatsToStream(const std::pair<int, StatsdStats::PulledAtomStats> (long long)pair.second.avgPullDelayNs); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_MAX_PULL_DELAY_NANOS, (long long)pair.second.maxPullDelayNs); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DATA_ERROR, (long long)pair.second.dataError); protoOutput->end(token); } diff --git a/cmds/statsd/src/stats_log_util.h b/cmds/statsd/src/stats_log_util.h index b8f6850ddc29..61f31eb3fa17 100644 --- a/cmds/statsd/src/stats_log_util.h +++ b/cmds/statsd/src/stats_log_util.h @@ -21,6 +21,7 @@ #include "HashableDimensionKey.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" #include "guardrail/StatsdStats.h" +#include "statslog.h" namespace android { namespace os { @@ -87,6 +88,10 @@ bool parseProtoOutputStream(util::ProtoOutputStream& protoOutput, T* message) { // Returns the truncated timestamp. int64_t truncateTimestampNsToFiveMinutes(int64_t timestampNs); +inline bool isPushedAtom(int atomId) { + return atomId <= util::kMaxPushedAtomId && atomId > 1; +} + } // namespace statsd } // namespace os } // namespace android diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto index d5f81a593082..5c46a296b9bf 100644 --- a/cmds/statsd/src/statsd_config.proto +++ b/cmds/statsd/src/statsd_config.proto @@ -270,6 +270,17 @@ message ValueMetric { optional int64 min_bucket_size_nanos = 10; optional bool use_absolute_value_on_reset = 11 [default = false]; + + optional bool use_diff = 12; + + enum ValueDirection { + INCREASING = 1; + DECREASING = 2; + ANY = 3; + } + optional ValueDirection value_direction = 13 [default = INCREASING]; + + optional bool skip_zero_diff_output = 14 [default = true]; } message Alert { diff --git a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp index fed5a3fb4277..095b4017b440 100644 --- a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp @@ -49,6 +49,7 @@ StatsdConfig CreateStatsdConfig() { CreateDimensions(android::util::TEMPERATURE, {2/* sensor name field */ }); valueMetric->set_bucket(FIVE_MINUTES); valueMetric->set_use_absolute_value_on_reset(true); + valueMetric->set_skip_zero_diff_output(false); return config; } diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 57aab971eaaa..ffa07081c781 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -47,7 +47,35 @@ const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs; const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs; const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs; const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs; -const int64_t eventUpgradeTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC; +double epsilon = 0.001; + +/* + * Tests that the first bucket works correctly + */ +TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + + int64_t startTimeBase = 11; + + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + + // statsd started long ago. + // The metric starts in the middle of the bucket + ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, + -1, startTimeBase, 22, pullerManager); + + EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10)); + EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10)); + EXPECT_EQ(60 * NS_PER_SEC + startTimeBase, + valueProducer.calcPreviousBucketEndTime(2 * 60 * NS_PER_SEC)); + EXPECT_EQ(2 * 60 * NS_PER_SEC + startTimeBase, + valueProducer.calcPreviousBucketEndTime(3 * 60 * NS_PER_SEC)); +} /* * Tests that the first bucket works correctly @@ -90,7 +118,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); event->write(tagId); event->write(3); event->init(); @@ -114,12 +142,11 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // startUpdated:true sum:0 start:11 - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(11, curInterval.start.long_value); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(8, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(11, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(8, curInterval.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); allData.clear(); event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1); @@ -131,12 +158,14 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // tartUpdated:false sum:12 - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(23, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(12, curInterval.value.long_value); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); + EXPECT_EQ(8, valueProducer.mPastBuckets.begin()->second.back().value.long_value); allData.clear(); event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1); @@ -147,12 +176,14 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // startUpdated:false sum:12 - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(36, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(13, curInterval.value.long_value); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(3UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); + EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } /* @@ -170,7 +201,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); - EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false)); + EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true)); ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager); @@ -188,9 +219,9 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(11, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(11, curInterval.start.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); allData.clear(); @@ -203,11 +234,11 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(10, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(10, curInterval.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); allData.clear(); event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1); @@ -218,11 +249,13 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(36, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(26, curInterval.value.long_value); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); + EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } /* @@ -257,9 +290,9 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(11, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(11, curInterval.start.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); allData.clear(); @@ -272,7 +305,8 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(10, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); @@ -285,11 +319,11 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(36, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(26, curInterval.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); } /* @@ -309,21 +343,10 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) - // should not take effect - .WillOnce(Invoke([](int tagId, int64_t timeNs, - vector<std::shared_ptr<LogEvent>>* data) { - data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); - event->write(tagId); - event->write(3); - event->init(); - data->push_back(event); - return true; - })) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); event->write(tagId); event->write(100); event->init(); @@ -333,7 +356,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); event->write(120); event->init(); @@ -349,8 +372,8 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false sum:0 start:100 - EXPECT_EQ(100, curInterval.start.long_value); - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(100, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); @@ -366,20 +389,20 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // startUpdated:false sum:0 start:110 - EXPECT_EQ(110, curInterval.start.long_value); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(110, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(10, curInterval.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // startUpdated:false sum:0 start:110 + EXPECT_EQ(true, curInterval.hasValue); EXPECT_EQ(10, curInterval.value.long_value); - EXPECT_EQ(false, curInterval.startUpdated); + EXPECT_EQ(false, curInterval.hasBase); } TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) { @@ -401,9 +424,9 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) { valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); - valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1); + valueProducer.notifyAppUpgrade(bucketStartTimeNs + 150, "ANY.APP", 1, 1); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); - EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs); + EXPECT_EQ(bucketStartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs); shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 59 * NS_PER_SEC); event2->write(1); @@ -411,7 +434,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) { event2->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); - EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs); + EXPECT_EQ(bucketStartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs); // Next value should create a new bucket. shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 65 * NS_PER_SEC); @@ -435,11 +458,11 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) - .WillOnce(Return(false)) + .WillOnce(Return(true)) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 149); event->write(tagId); event->write(120); event->init(); @@ -451,7 +474,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { vector<shared_ptr<LogEvent>> allData; allData.clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); event->write(100); event->init(); @@ -460,21 +483,21 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { valueProducer.onDataPulled(allData); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); - valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1); + valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); - EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs); - EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong); + EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs); + EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value); allData.clear(); - event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1); event->write(tagId); event->write(150); event->init(); allData.push_back(event); valueProducer.onDataPulled(allData); - EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); - EXPECT_EQ(bucket2StartTimeNs, valueProducer.mCurrentBucketStartTimeNs); - EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValueLong); + EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); + EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs); + EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value); } TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) { @@ -490,11 +513,10 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) { EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) - .WillOnce(Return(false)) .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1); event->write(tagId); event->write(100); event->init(); @@ -504,7 +526,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) { .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs - 100); event->write(tagId); event->write(120); event->init(); @@ -523,7 +545,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) { EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs); - EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong); + EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value); EXPECT_FALSE(valueProducer.mCondition); } @@ -565,7 +587,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) { @@ -587,9 +609,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) { event1->init(); valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1); // has 1 slice - EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); - ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.onConditionChangedLocked(true, bucketStartTimeNs + 15); shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20); @@ -600,6 +620,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) { // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; EXPECT_EQ(20, curInterval.value.long_value); @@ -629,7 +650,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) { valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } TEST(ValueMetricProducerTest, TestAnomalyDetection) { @@ -727,7 +748,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); - EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false)); + EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true)); ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager); @@ -747,9 +768,9 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:true sum:0 start:11 - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(11, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(11, curInterval.start.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull 2 at correct time @@ -764,11 +785,11 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // tartUpdated:false sum:12 - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(23, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(12, curInterval.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull 3 come late. // The previous bucket gets closed with error. (Has start value 23, no ending) @@ -784,12 +805,12 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false sum:12 - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(36, curInterval.start.long_value); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(36, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } /* @@ -810,12 +831,11 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) - .WillOnce(Return(false)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); event->write(tagId); event->write(100); event->init(); @@ -826,7 +846,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); event->write(120); event->init(); @@ -841,17 +861,17 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // startUpdated:false sum:0 start:100 - EXPECT_EQ(100, curInterval.start.long_value); - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(100, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(false, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // Now the alarm is delivered. @@ -866,8 +886,9 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(false, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); } @@ -889,12 +910,11 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) - .WillOnce(Return(false)) // condition becomes true .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); event->write(tagId); event->write(100); event->init(); @@ -905,7 +925,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); event->write(120); event->init(); @@ -916,7 +936,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { .WillOnce(Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 25); event->write(tagId); event->write(130); event->init(); @@ -932,24 +952,26 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; // startUpdated:false sum:0 start:100 - EXPECT_EQ(100, curInterval.start.long_value); - EXPECT_EQ(true, curInterval.startUpdated); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(100, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // pull on bucket boundary come late, condition change happens before it valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(false, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // condition changed to true again, before the pull alarm is delivered valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(130, curInterval.start.long_value); - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(130, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); // Now the alarm is delivered, but it is considered late, it has no effect @@ -963,89 +985,10 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { valueProducer.onDataPulled(allData); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(130, curInterval.start.long_value); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); -} - -/* - * Test pulled event with non sliced condition. The pull on boundary come late because the puller is - * very slow. - */ -TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3) { - ValueMetric metric; - metric.set_id(metricId); - metric.set_bucket(ONE_MINUTE); - metric.mutable_value_field()->set_field(tagId); - metric.mutable_value_field()->add_child()->set_field(2); - metric.set_condition(StringToId("SCREEN_ON")); - - sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); - sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); - EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); - EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); - - EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) - .WillOnce(Return(false)) - // condition becomes true - .WillOnce(Invoke([](int tagId, int64_t timeNs, - vector<std::shared_ptr<LogEvent>>* data) { - data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); - event->write(tagId); - event->write(100); - event->init(); - data->push_back(event); - return true; - })) - // condition becomes false - .WillOnce(Invoke([](int tagId, int64_t timeNs, - vector<std::shared_ptr<LogEvent>>* data) { - data->clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 20); - event->write(tagId); - event->write(120); - event->init(); - data->push_back(event); - return true; - })); - - ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs, - bucketStartTimeNs, pullerManager); - valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); - - // has one slice - EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); - ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - // startUpdated:false sum:0 start:100 - EXPECT_EQ(100, curInterval.start.long_value); - EXPECT_EQ(true, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); - - // pull on bucket boundary come late, condition change happens before it. - // But puller is very slow in this one, so the data come after bucket finish - valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); - curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(false, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); - - // Alarm is delivered in time, but the pull is very slow, and pullers are called in order, - // so this one comes even later - vector<shared_ptr<LogEvent>> allData; - allData.clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 30); - event->write(1); - event->write(110); - event->init(); - allData.push_back(event); - valueProducer.onDataPulled(allData); - - curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(false, curInterval.startUpdated); - EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(130, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); } @@ -1088,7 +1031,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMin) { valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } TEST(ValueMetricProducerTest, TestPushedAggregateMax) { @@ -1130,7 +1073,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMax) { valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } TEST(ValueMetricProducerTest, TestPushedAggregateAvg) { @@ -1175,7 +1118,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateAvg) { valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(12.5, valueProducer.mPastBuckets.begin()->second.back().mValueDouble); + EXPECT_TRUE(std::abs(valueProducer.mPastBuckets.begin()->second.back().value.double_value - 12.5) < epsilon); } TEST(ValueMetricProducerTest, TestPushedAggregateSum) { @@ -1217,67 +1160,75 @@ TEST(ValueMetricProducerTest, TestPushedAggregateSum) { valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } -TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced) { - string slicedConditionName = "UID"; - const int conditionTagId = 2; +TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); metric.mutable_value_field()->set_field(tagId); - metric.mutable_value_field()->add_child()->set_field(1); - metric.set_aggregation_type(ValueMetric::SUM); - - metric.set_condition(StringToId(slicedConditionName)); - MetricConditionLink* link = metric.add_links(); - link->set_condition(StringToId(slicedConditionName)); - buildSimpleAtomFieldMatcher(tagId, 2, link->mutable_fields_in_what()); - buildSimpleAtomFieldMatcher(conditionTagId, 2, link->mutable_fields_in_condition()); - - LogEvent event1(tagId, bucketStartTimeNs + 10); - event1.write(10); // value - event1.write("111"); // uid - event1.init(); - ConditionKey key1; - key1[StringToId(slicedConditionName)] = - {getMockedDimensionKey(conditionTagId, 2, "111")}; - - LogEvent event2(tagId, bucketStartTimeNs + 20); - event2.write(15); - event2.write("222"); - event2.init(); - ConditionKey key2; - key2[StringToId(slicedConditionName)] = - {getMockedDimensionKey(conditionTagId, 2, "222")}; + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_aggregation_type(ValueMetric::MIN); + metric.set_use_diff(true); sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); - EXPECT_CALL(*wizard, query(_, key1, _, _, _, _)).WillOnce(Return(ConditionState::kFalse)); - EXPECT_CALL(*wizard, query(_, key2, _, _, _, _)).WillOnce(Return(ConditionState::kTrue)); - sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); - ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, -1, bucketStartTimeNs, + ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs, bucketStartTimeNs, pullerManager); - valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event1); - + shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10); + event1->write(1); + event1->write(10); + event1->init(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 15); + event2->write(1); + event2->write(15); + event2->init(); + valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1); + // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(10, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); - valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event2); + valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; - EXPECT_EQ(15, curInterval.value.long_value); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(5, curInterval.value.long_value); + + // no change in data. + shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10); + event3->write(1); + event3->write(15); + event3->init(); + valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event3); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(15, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); + + shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 15); + event4->write(1); + event4->write(15); + event4->init(); + valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event4); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + curInterval = valueProducer.mCurrentSlicedBucket.begin()->second; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(15, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasValue); valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(15, valueProducer.mPastBuckets.begin()->second.back().mValueLong); + EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().value.long_value); } } // namespace statsd |