summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmds/statsd/src/FieldValue.cpp20
-rw-r--r--cmds/statsd/src/FieldValue.h2
-rw-r--r--cmds/statsd/src/guardrail/StatsdStats.cpp15
-rw-r--r--cmds/statsd/src/guardrail/StatsdStats.h6
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp259
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.h57
-rw-r--r--cmds/statsd/src/stats_log.proto1
-rw-r--r--cmds/statsd/src/stats_log_util.cpp2
-rw-r--r--cmds/statsd/src/stats_log_util.h5
-rw-r--r--cmds/statsd/src/statsd_config.proto11
-rw-r--r--cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp1
-rw-r--r--cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp425
12 files changed, 417 insertions, 387 deletions
diff --git a/cmds/statsd/src/FieldValue.cpp b/cmds/statsd/src/FieldValue.cpp
index fc1a61cac558..80ed80776829 100644
--- a/cmds/statsd/src/FieldValue.cpp
+++ b/cmds/statsd/src/FieldValue.cpp
@@ -18,6 +18,7 @@
#include "Log.h"
#include "FieldValue.h"
#include "HashableDimensionKey.h"
+#include "math.h"
namespace android {
namespace os {
@@ -174,6 +175,25 @@ std::string Value::toString() const {
}
}
+bool Value::isZero() const {
+ switch (type) {
+ case INT:
+ return int_value == 0;
+ case LONG:
+ return long_value == 0;
+ case FLOAT:
+ return fabs(float_value) <= std::numeric_limits<float>::epsilon();
+ case DOUBLE:
+ return fabs(double_value) <= std::numeric_limits<double>::epsilon();
+ case STRING:
+ return str_value.size() == 0;
+ case STORAGE:
+ return storage_value.size() == 0;
+ default:
+ return false;
+ }
+}
+
bool Value::operator==(const Value& that) const {
if (type != that.getType()) return false;
diff --git a/cmds/statsd/src/FieldValue.h b/cmds/statsd/src/FieldValue.h
index 77163f9d8619..a5d00ac4e72b 100644
--- a/cmds/statsd/src/FieldValue.h
+++ b/cmds/statsd/src/FieldValue.h
@@ -331,6 +331,8 @@ struct Value {
std::string toString() const;
+ bool isZero() const;
+
Type getType() const {
return type;
}
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index d2919c51a65e..a0d77d6f922d 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -362,11 +362,17 @@ void StatsdStats::notePullDelay(int pullAtomId, int64_t pullDelayNs) {
lock_guard<std::mutex> lock(mLock);
auto& pullStats = mPulledAtomStats[pullAtomId];
pullStats.maxPullDelayNs = std::max(pullStats.maxPullDelayNs, pullDelayNs);
- pullStats.avgPullDelayNs = (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) /
- (pullStats.numPullDelay + 1);
+ pullStats.avgPullDelayNs =
+ (pullStats.avgPullDelayNs * pullStats.numPullDelay + pullDelayNs) /
+ (pullStats.numPullDelay + 1);
pullStats.numPullDelay += 1;
}
+void StatsdStats::notePullDataError(int pullAtomId) {
+ lock_guard<std::mutex> lock(mLock);
+ mPulledAtomStats[pullAtomId].dataError++;
+}
+
void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
lock_guard<std::mutex> lock(mLock);
@@ -422,6 +428,7 @@ void StatsdStats::resetInternalLocked() {
pullStats.second.avgPullDelayNs = 0;
pullStats.second.maxPullDelayNs = 0;
pullStats.second.numPullDelay = 0;
+ pullStats.second.dataError = 0;
}
}
@@ -530,11 +537,11 @@ void StatsdStats::dumpStats(int out) const {
dprintf(out,
"Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average "
"pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, "
- "(max pull delay nanos)%lld\n",
+ "(max pull delay nanos)%lld, (data error)%ld\n",
(int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
(long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs,
(long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs,
- (long long)pair.second.maxPullDelayNs);
+ (long long)pair.second.maxPullDelayNs, pair.second.dataError);
}
if (mAnomalyAlarmRegisteredStats > 0) {
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 777d8652d2b6..2008abdb2345 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -279,6 +279,11 @@ public:
void notePullFromCache(int pullAtomId);
/*
+ * Notify data error for pulled atom.
+ */
+ void notePullDataError(int pullAtomId);
+
+ /*
* Records time for actual pulling, not including those served from cache and not including
* statsd processing delays.
*/
@@ -329,6 +334,7 @@ public:
int64_t avgPullDelayNs = 0;
int64_t maxPullDelayNs = 0;
long numPullDelay = 0;
+ long dataError = 0;
} PulledAtomStats;
private:
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 636747985526..c8b1cf07eb32 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -92,7 +92,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
: StatsdStats::kDimensionKeySizeHardLimit),
mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
mAggregationType(metric.aggregation_type()),
- mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) {
+ mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
+ mValueDirection(metric.value_direction()),
+ mSkipZeroDiffOutput(metric.skip_zero_diff_output()) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -125,24 +127,25 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
}
mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
- HasPositionALL(metric.dimensions_in_condition());
+ HasPositionALL(metric.dimensions_in_condition());
flushIfNeededLocked(startTimeNs);
- // Kicks off the puller immediately.
+
if (mIsPulled) {
mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
mBucketSizeNs);
}
- // TODO: Only do this for partial buckets like first bucket. All other buckets should use
+ // Only do this for partial buckets like first bucket. All other buckets should use
// flushIfNeeded to adjust start and end to bucket boundaries.
// Adjust start for partial bucket
mCurrentBucketStartTimeNs = startTimeNs;
- if (mIsPulled) {
+ // Kicks off the puller immediately if condition is true and diff based.
+ if (mIsPulled && mCondition && mUseDiff) {
pullLocked(startTimeNs);
}
- VLOG("value metric %lld created. bucket size %lld start_time: %lld",
- (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs);
+ VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
+ (long long)mBucketSizeNs, (long long)mTimeBaseNs);
}
ValueMetricProducer::~ValueMetricProducer() {
@@ -188,14 +191,14 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
// Fills the dimension path if not slicing by ALL.
if (!mSliceByPositionALL) {
if (!mDimensionsInWhat.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
+ uint64_t dimenPathToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
protoOutput->end(dimenPathToken);
}
if (!mDimensionsInCondition.empty()) {
- uint64_t dimenPathToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
+ uint64_t dimenPathToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
protoOutput->end(dimenPathToken);
}
@@ -221,15 +224,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
// First fill dimension.
if (mSliceByPositionALL) {
- uint64_t dimensionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
+ uint64_t dimensionToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
protoOutput->end(dimensionToken);
if (dimensionKey.hasDimensionKeyInCondition()) {
- uint64_t dimensionInConditionToken = protoOutput->start(
- FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
- writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(),
- str_set, protoOutput);
+ uint64_t dimensionInConditionToken =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
+ writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
+ protoOutput);
protoOutput->end(dimensionInConditionToken);
}
} else {
@@ -237,8 +240,8 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
if (dimensionKey.hasDimensionKeyInCondition()) {
writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
- FIELD_ID_DIMENSION_LEAF_IN_CONDITION,
- str_set, protoOutput);
+ FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
+ protoOutput);
}
}
@@ -256,15 +259,20 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
(long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
}
- if (mValueType == LONG) {
+ if (bucket.value.getType() == LONG) {
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
- (long long)bucket.mValueLong);
+ (long long)bucket.value.long_value);
+ VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value);
+ } else if (bucket.value.getType() == DOUBLE) {
+ protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
+ bucket.value.double_value);
+ VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, bucket.value.double_value);
} else {
- protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble);
+ VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType());
}
protoOutput->end(bucketInfoToken);
- VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs,
- (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble);
}
protoOutput->end(wrapperToken);
}
@@ -279,8 +287,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
void ValueMetricProducer::onConditionChangedLocked(const bool condition,
const int64_t eventTimeNs) {
- mCondition = condition;
-
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
@@ -289,9 +295,19 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
flushIfNeededLocked(eventTimeNs);
- if (mIsPulled) {
+ // Pull on condition changes.
+ if (mIsPulled && (mCondition != condition)) {
pullLocked(eventTimeNs);
}
+
+ // when condition change from true to false, clear diff base
+ if (mUseDiff && mCondition && !condition) {
+ for (auto& slice : mCurrentSlicedBucket) {
+ slice.second.hasBase = false;
+ }
+ }
+
+ mCondition = condition;
}
void ValueMetricProducer::pullLocked(const int64_t timestampNs) {
@@ -306,30 +322,33 @@ void ValueMetricProducer::pullLocked(const int64_t timestampNs) {
}
}
+int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
+ return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
+}
+
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
std::lock_guard<std::mutex> lock(mMutex);
- if (mCondition == true || mConditionTrackerIndex < 0) {
+ if (mCondition) {
if (allData.size() == 0) {
return;
}
// For scheduled pulled data, the effective event time is snap to the nearest
- // bucket boundary to make bucket finalize.
+ // bucket end. In the case of waking up from a deep sleep state, we will
+ // attribute to the previous bucket end. If the sleep was long but not very long, we
+ // will be in the immediate next bucket. Previous bucket may get a larger number as
+ // we pull at a later time than real bucket end.
+ // If the sleep was very long, we skip more than one bucket before sleep. In this case,
+ // if the diff base will be cleared and this new data will serve as new diff base.
int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
- int64_t eventTime = mTimeBaseNs +
- ((realEventTime - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
-
- // close the end of the bucket
- mCondition = false;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTime - 1);
- onMatchedLogEventLocked(0, *data);
+ int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
+ if (bucketEndTime < mCurrentBucketStartTimeNs) {
+ VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
+ (long long)mCurrentBucketStartTimeNs);
+ return;
}
-
- // start a new bucket
- mCondition = true;
for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTime);
+ data->setElapsedTimestampNs(bucketEndTime);
onMatchedLogEventLocked(0, *data);
}
}
@@ -363,8 +382,8 @@ bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
// 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
if (newTupleCount > mDimensionHardLimit) {
- ALOGE("ValueMetric %lld dropping data for dimension key %s",
- (long long)mMetricId, newKey.toString().c_str());
+ ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
+ newKey.toString().c_str());
return true;
}
}
@@ -393,10 +412,10 @@ const Value getDoubleOrLong(const Value& value) {
return v;
}
-void ValueMetricProducer::onMatchedLogEventInternalLocked(
- const size_t matcherIndex, const MetricDimensionKey& eventKey,
- const ConditionKey& conditionKey, bool condition,
- const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
+ const MetricDimensionKey& eventKey,
+ const ConditionKey& conditionKey,
+ bool condition, const LogEvent& event) {
int64_t eventTimeNs = event.GetElapsedTimestampNs();
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -406,6 +425,14 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
flushIfNeededLocked(eventTimeNs);
+ // For pulled data, we already check condition when we decide to pull or
+ // in onDataPulled. So take all of them.
+ // For pushed data, just check condition.
+ if (!(mIsPulled || condition)) {
+ VLOG("ValueMetric skip event because condition is false");
+ return;
+ }
+
if (hitGuardRailLocked(eventKey)) {
return;
}
@@ -418,71 +445,70 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
}
Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);
- Value diff;
- bool hasDiff = false;
- if (mIsPulled) {
- // Always require condition for pulled events. In the case of no condition, only pull
- // on bucket boundaries, in which we fake condition changes.
- if (mCondition == true) {
- if (!interval.startUpdated) {
- interval.start = value;
- interval.startUpdated = true;
- } else {
- // Skip it if there is already value recorded for the start. Happens when puller
- // takes too long to finish. In this case we take the previous value.
- VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str());
- }
- } else {
- // Generally we expect value to be monotonically increasing.
- // If not, take absolute value or drop it, based on config.
- if (interval.startUpdated) {
- if (value >= interval.start) {
- diff = (value - interval.start);
- hasDiff = true;
+ if (mUseDiff) {
+ // no base. just update base and return.
+ if (!interval.hasBase) {
+ interval.base = value;
+ interval.hasBase = true;
+ return;
+ }
+ Value diff;
+ switch (mValueDirection) {
+ case ValueMetric::INCREASING:
+ if (value >= interval.base) {
+ diff = value - interval.base;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
} else {
- if (mUseAbsoluteValueOnReset) {
- diff = value;
- hasDiff = true;
- } else {
- VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId,
- interval.start.toString().c_str(), value.toString().c_str());
- }
+ VLOG("Unexpected decreasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
}
- interval.startUpdated = false;
- } else {
- VLOG("No start for matching end %s", value.toString().c_str());
- }
- }
- } else {
- // for pushed events, only aggregate when sliced condition is true
- if (condition == true || mConditionTrackerIndex < 0) {
- diff = value;
- hasDiff = true;
+ break;
+ case ValueMetric::DECREASING:
+ if (interval.base >= value) {
+ diff = interval.base - value;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
+ } else {
+ VLOG("Unexpected increasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
+ }
+ break;
+ case ValueMetric::ANY:
+ diff = value - interval.base;
+ break;
+ default:
+ break;
}
+ interval.base = value;
+ value = diff;
}
- if (hasDiff) {
- if (interval.hasValue) {
- switch (mAggregationType) {
- case ValueMetric::SUM:
+
+ if (interval.hasValue) {
+ switch (mAggregationType) {
+ case ValueMetric::SUM:
// for AVG, we add up and take average when flushing the bucket
- case ValueMetric::AVG:
- interval.value += diff;
- break;
- case ValueMetric::MIN:
- interval.value = diff < interval.value ? diff : interval.value;
- break;
- case ValueMetric::MAX:
- interval.value = diff > interval.value ? diff : interval.value;
- break;
- default:
- break;
- }
- } else {
- interval.value = diff;
- interval.hasValue = true;
+ case ValueMetric::AVG:
+ interval.value += value;
+ break;
+ case ValueMetric::MIN:
+ interval.value = std::min(value, interval.value);
+ break;
+ case ValueMetric::MAX:
+ interval.value = std::max(value, interval.value);
+ break;
+ default:
+ break;
}
- interval.sampleSize += 1;
+ } else {
+ interval.value = value;
+ interval.hasValue = true;
}
+ interval.sampleSize += 1;
// TODO: propgate proper values down stream when anomaly support doubles
long wholeBucketVal = interval.value.long_value;
@@ -512,6 +538,10 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
if (numBucketsForward > 1) {
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
+ // take base again in future good bucket.
+ for (auto& slice : mCurrentSlicedBucket) {
+ slice.second.hasBase = false;
+ }
}
VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
(long long)mCurrentBucketStartTimeNs);
@@ -534,8 +564,18 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
if (slice.second.hasValue) {
- info.mValueLong = slice.second.value.long_value;
- info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize;
+ // skip the output if the diff is zero
+ if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) {
+ continue;
+ }
+ if (mAggregationType != ValueMetric::AVG) {
+ info.value = slice.second.value;
+ } else {
+ double sum = slice.second.value.type == LONG
+ ? (double)slice.second.value.long_value
+ : slice.second.value.double_value;
+ info.value.setDouble(sum / slice.second.sampleSize);
+ }
// it will auto create new vector of ValuebucketInfo if the key is not found.
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(info);
@@ -581,7 +621,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
}
// Reset counters
- mCurrentSlicedBucket.clear();
+ for (auto& slice : mCurrentSlicedBucket) {
+ slice.second.hasValue = false;
+ slice.second.sampleSize = 0;
+ }
}
size_t ValueMetricProducer::byteSizeLocked() const {
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 8db2d9553c2f..3416afe06b81 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -34,8 +34,7 @@ namespace statsd {
struct ValueBucket {
int64_t mBucketStartNs;
int64_t mBucketEndNs;
- int64_t mValueLong;
- double mValueDouble;
+ Value value;
};
class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
@@ -54,35 +53,11 @@ public:
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
const int64_t version) override {
std::lock_guard<std::mutex> lock(mMutex);
-
- if (mIsPulled && (mCondition == true || mConditionTrackerIndex < 0)) {
- vector<shared_ptr<LogEvent>> allData;
- mPullerManager->Pull(mPullTagId, eventTimeNs, &allData);
- if (allData.size() == 0) {
- // This shouldn't happen since this valuemetric is not useful now.
- }
-
- // Pretend the pulled data occurs right before the app upgrade event.
- mCondition = false;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTimeNs - 1);
- onMatchedLogEventLocked(0, *data);
- }
-
- flushCurrentBucketLocked(eventTimeNs);
- mCurrentBucketStartTimeNs = eventTimeNs;
-
- mCondition = true;
- for (const auto& data : allData) {
- data->setElapsedTimestampNs(eventTimeNs);
- onMatchedLogEventLocked(0, *data);
- }
- } else {
- // For pushed value metric or pulled metric where condition is not true,
- // we simply flush and reset the current bucket start.
- flushCurrentBucketLocked(eventTimeNs);
- mCurrentBucketStartTimeNs = eventTimeNs;
+ if (mIsPulled && mCondition) {
+ pullLocked(eventTimeNs - 1);
}
+ flushCurrentBucketLocked(eventTimeNs);
+ mCurrentBucketStartTimeNs = eventTimeNs;
};
protected:
@@ -117,6 +92,9 @@ private:
void dropDataLocked(const int64_t dropTimeNs) override;
+ // Calculate previous bucket end time based on current time.
+ int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);
+
sp<StatsPullerManager> mPullerManager;
const FieldMatcher mValueField;
@@ -131,11 +109,10 @@ private:
// internal state of a bucket.
typedef struct {
- // Pulled data always come in pair of <start, end>. This holds the value
- // for start. The diff (end - start) is taken as the real value.
- Value start;
- // Whether the start data point is updated
- bool startUpdated;
+ // Holds current base value of the dimension. Take diff and update if necessary.
+ Value base;
+ // Whether there is a base to diff to.
+ bool hasBase;
// Current value, depending on the aggregation type.
Value value;
// Number of samples collected.
@@ -172,7 +149,11 @@ private:
const ValueMetric::AggregationType mAggregationType;
- const Type mValueType;
+ const bool mUseDiff;
+
+ const ValueMetric::ValueDirection mValueDirection;
+
+ const bool mSkipZeroDiffOutput;
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -187,13 +168,13 @@ private:
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced);
FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
+ FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
+ FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
};
} // namespace statsd
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 8bfa36059e9a..4da3828ff88d 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -374,6 +374,7 @@ message StatsdStatsReport {
optional int64 max_pull_time_nanos = 6;
optional int64 average_pull_delay_nanos = 7;
optional int64 max_pull_delay_nanos = 8;
+ optional int64 data_error = 9;
}
repeated PulledAtomStats pulled_atom_stats = 10;
diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp
index 44fa72e77a0d..504c5864f2ec 100644
--- a/cmds/statsd/src/stats_log_util.cpp
+++ b/cmds/statsd/src/stats_log_util.cpp
@@ -63,6 +63,7 @@ const int FIELD_ID_AVERAGE_PULL_TIME_NANOS = 5;
const int FIELD_ID_MAX_PULL_TIME_NANOS = 6;
const int FIELD_ID_AVERAGE_PULL_DELAY_NANOS = 7;
const int FIELD_ID_MAX_PULL_DELAY_NANOS = 8;
+const int FIELD_ID_DATA_ERROR = 9;
namespace {
@@ -446,6 +447,7 @@ void writePullerStatsToStream(const std::pair<int, StatsdStats::PulledAtomStats>
(long long)pair.second.avgPullDelayNs);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_MAX_PULL_DELAY_NANOS,
(long long)pair.second.maxPullDelayNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DATA_ERROR, (long long)pair.second.dataError);
protoOutput->end(token);
}
diff --git a/cmds/statsd/src/stats_log_util.h b/cmds/statsd/src/stats_log_util.h
index b8f6850ddc29..61f31eb3fa17 100644
--- a/cmds/statsd/src/stats_log_util.h
+++ b/cmds/statsd/src/stats_log_util.h
@@ -21,6 +21,7 @@
#include "HashableDimensionKey.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
#include "guardrail/StatsdStats.h"
+#include "statslog.h"
namespace android {
namespace os {
@@ -87,6 +88,10 @@ bool parseProtoOutputStream(util::ProtoOutputStream& protoOutput, T* message) {
// Returns the truncated timestamp.
int64_t truncateTimestampNsToFiveMinutes(int64_t timestampNs);
+inline bool isPushedAtom(int atomId) {
+ return atomId <= util::kMaxPushedAtomId && atomId > 1;
+}
+
} // namespace statsd
} // namespace os
} // namespace android
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index d5f81a593082..5c46a296b9bf 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -270,6 +270,17 @@ message ValueMetric {
optional int64 min_bucket_size_nanos = 10;
optional bool use_absolute_value_on_reset = 11 [default = false];
+
+ optional bool use_diff = 12;
+
+ enum ValueDirection {
+ INCREASING = 1;
+ DECREASING = 2;
+ ANY = 3;
+ }
+ optional ValueDirection value_direction = 13 [default = INCREASING];
+
+ optional bool skip_zero_diff_output = 14 [default = true];
}
message Alert {
diff --git a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
index fed5a3fb4277..095b4017b440 100644
--- a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
@@ -49,6 +49,7 @@ StatsdConfig CreateStatsdConfig() {
CreateDimensions(android::util::TEMPERATURE, {2/* sensor name field */ });
valueMetric->set_bucket(FIVE_MINUTES);
valueMetric->set_use_absolute_value_on_reset(true);
+ valueMetric->set_skip_zero_diff_output(false);
return config;
}
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 57aab971eaaa..ffa07081c781 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -47,7 +47,35 @@ const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs;
const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs;
-const int64_t eventUpgradeTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC;
+double epsilon = 0.001;
+
+/*
+ * Tests that the first bucket works correctly
+ */
+TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+
+ int64_t startTimeBase = 11;
+
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+ // statsd started long ago.
+ // The metric starts in the middle of the bucket
+ ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+ -1, startTimeBase, 22, pullerManager);
+
+ EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10));
+ EXPECT_EQ(startTimeBase, valueProducer.calcPreviousBucketEndTime(60 * NS_PER_SEC + 10));
+ EXPECT_EQ(60 * NS_PER_SEC + startTimeBase,
+ valueProducer.calcPreviousBucketEndTime(2 * 60 * NS_PER_SEC));
+ EXPECT_EQ(2 * 60 * NS_PER_SEC + startTimeBase,
+ valueProducer.calcPreviousBucketEndTime(3 * 60 * NS_PER_SEC));
+}
/*
* Tests that the first bucket works correctly
@@ -90,7 +118,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(tagId);
event->write(3);
event->init();
@@ -114,12 +142,11 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:true sum:0 start:11
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(8, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(8, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
@@ -131,12 +158,14 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // tartUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(23, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(12, curInterval.value.long_value);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+ EXPECT_EQ(8, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
allData.clear();
event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -147,12 +176,14 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(13, curInterval.value.long_value);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(3UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
+ EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
/*
@@ -170,7 +201,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager);
@@ -188,9 +219,9 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
@@ -203,11 +234,11 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(10, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(10, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -218,11 +249,13 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(26, curInterval.value.long_value);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+ EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
/*
@@ -257,9 +290,9 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
allData.clear();
@@ -272,7 +305,8 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) {
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(10, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
@@ -285,11 +319,11 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) {
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(26, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
}
/*
@@ -309,21 +343,10 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- // should not take effect
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
- event->write(tagId);
- event->write(3);
- event->init();
- data->push_back(event);
- return true;
- }))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
event->write(100);
event->init();
@@ -333,7 +356,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(120);
event->init();
@@ -349,8 +372,8 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
@@ -366,20 +389,20 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:110
- EXPECT_EQ(110, curInterval.start.long_value);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(110, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(10, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:110
+ EXPECT_EQ(true, curInterval.hasValue);
EXPECT_EQ(10, curInterval.value.long_value);
- EXPECT_EQ(false, curInterval.startUpdated);
+ EXPECT_EQ(false, curInterval.hasBase);
}
TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) {
@@ -401,9 +424,9 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) {
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1);
+ valueProducer.notifyAppUpgrade(bucketStartTimeNs + 150, "ANY.APP", 1, 1);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(bucketStartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 59 * NS_PER_SEC);
event2->write(1);
@@ -411,7 +434,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade) {
event2->init();
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(bucketStartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
// Next value should create a new bucket.
shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 65 * NS_PER_SEC);
@@ -435,11 +458,11 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
+ .WillOnce(Return(true))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 149);
event->write(tagId);
event->write(120);
event->init();
@@ -451,7 +474,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
vector<shared_ptr<LogEvent>> allData;
allData.clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(100);
event->init();
@@ -460,21 +483,21 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
valueProducer.onDataPulled(allData);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1);
+ valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
- EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
+ EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value);
allData.clear();
- event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
event->write(tagId);
event->write(150);
event->init();
allData.push_back(event);
valueProducer.onDataPulled(allData);
- EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
- EXPECT_EQ(bucket2StartTimeNs, valueProducer.mCurrentBucketStartTimeNs);
- EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValueLong);
+ EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
+ EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value);
}
TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
@@ -490,11 +513,10 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
event->write(tagId);
event->write(100);
event->init();
@@ -504,7 +526,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs - 100);
event->write(tagId);
event->write(120);
event->init();
@@ -523,7 +545,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
- EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
+ EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].value.long_value);
EXPECT_FALSE(valueProducer.mCondition);
}
@@ -565,7 +587,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
@@ -587,9 +609,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
event1->init();
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
// has 1 slice
- EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
valueProducer.onConditionChangedLocked(true, bucketStartTimeNs + 15);
shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
@@ -600,6 +620,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
EXPECT_EQ(20, curInterval.value.long_value);
@@ -629,7 +650,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestAnomalyDetection) {
@@ -727,7 +748,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager);
@@ -747,9 +768,9 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:true sum:0 start:11
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(11, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(11, curInterval.start.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull 2 at correct time
@@ -764,11 +785,11 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// tartUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(23, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(12, curInterval.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull 3 come late.
// The previous bucket gets closed with error. (Has start value 23, no ending)
@@ -784,12 +805,12 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:false sum:12
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(36, curInterval.start.long_value);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(36, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
/*
@@ -810,12 +831,11 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
// condition becomes true
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
event->write(100);
event->init();
@@ -826,7 +846,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(120);
event->init();
@@ -841,17 +861,17 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull on bucket boundary come late, condition change happens before it
valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// Now the alarm is delivered.
@@ -866,8 +886,9 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
valueProducer.onDataPulled(allData);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
}
@@ -889,12 +910,11 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
// condition becomes true
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
event->write(100);
event->init();
@@ -905,7 +925,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
event->write(120);
event->init();
@@ -916,7 +936,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 25);
event->write(tagId);
event->write(130);
event->init();
@@ -932,24 +952,26 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
// startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// pull on bucket boundary come late, condition change happens before it
valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// condition changed to true again, before the pull alarm is delivered
valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(130, curInterval.start.long_value);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(130, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
// Now the alarm is delivered, but it is considered late, it has no effect
@@ -963,89 +985,10 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
valueProducer.onDataPulled(allData);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(130, curInterval.start.long_value);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
-}
-
-/*
- * Test pulled event with non sliced condition. The pull on boundary come late because the puller is
- * very slow.
- */
-TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3) {
- ValueMetric metric;
- metric.set_id(metricId);
- metric.set_bucket(ONE_MINUTE);
- metric.mutable_value_field()->set_field(tagId);
- metric.mutable_value_field()->add_child()->set_field(2);
- metric.set_condition(StringToId("SCREEN_ON"));
-
- sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
- sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
-
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Return(false))
- // condition becomes true
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
- event->write(tagId);
- event->write(100);
- event->init();
- data->push_back(event);
- return true;
- }))
- // condition becomes false
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 20);
- event->write(tagId);
- event->write(120);
- event->init();
- data->push_back(event);
- return true;
- }));
-
- ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
- bucketStartTimeNs, pullerManager);
- valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
-
- // has one slice
- EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
- ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- // startUpdated:false sum:0 start:100
- EXPECT_EQ(100, curInterval.start.long_value);
- EXPECT_EQ(true, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
-
- // pull on bucket boundary come late, condition change happens before it.
- // But puller is very slow in this one, so the data come after bucket finish
- valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
- curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
-
- // Alarm is delivered in time, but the pull is very slow, and pullers are called in order,
- // so this one comes even later
- vector<shared_ptr<LogEvent>> allData;
- allData.clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 30);
- event->write(1);
- event->write(110);
- event->init();
- allData.push_back(event);
- valueProducer.onDataPulled(allData);
-
- curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(false, curInterval.startUpdated);
- EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(130, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
}
@@ -1088,7 +1031,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMin) {
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
@@ -1130,7 +1073,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
@@ -1175,7 +1118,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(12.5, valueProducer.mPastBuckets.begin()->second.back().mValueDouble);
+ EXPECT_TRUE(std::abs(valueProducer.mPastBuckets.begin()->second.back().value.double_value - 12.5) < epsilon);
}
TEST(ValueMetricProducerTest, TestPushedAggregateSum) {
@@ -1217,67 +1160,75 @@ TEST(ValueMetricProducerTest, TestPushedAggregateSum) {
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
-TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced) {
- string slicedConditionName = "UID";
- const int conditionTagId = 2;
+TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) {
ValueMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
- metric.mutable_value_field()->add_child()->set_field(1);
- metric.set_aggregation_type(ValueMetric::SUM);
-
- metric.set_condition(StringToId(slicedConditionName));
- MetricConditionLink* link = metric.add_links();
- link->set_condition(StringToId(slicedConditionName));
- buildSimpleAtomFieldMatcher(tagId, 2, link->mutable_fields_in_what());
- buildSimpleAtomFieldMatcher(conditionTagId, 2, link->mutable_fields_in_condition());
-
- LogEvent event1(tagId, bucketStartTimeNs + 10);
- event1.write(10); // value
- event1.write("111"); // uid
- event1.init();
- ConditionKey key1;
- key1[StringToId(slicedConditionName)] =
- {getMockedDimensionKey(conditionTagId, 2, "111")};
-
- LogEvent event2(tagId, bucketStartTimeNs + 20);
- event2.write(15);
- event2.write("222");
- event2.init();
- ConditionKey key2;
- key2[StringToId(slicedConditionName)] =
- {getMockedDimensionKey(conditionTagId, 2, "222")};
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_aggregation_type(ValueMetric::MIN);
+ metric.set_use_diff(true);
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
- EXPECT_CALL(*wizard, query(_, key1, _, _, _, _)).WillOnce(Return(ConditionState::kFalse));
- EXPECT_CALL(*wizard, query(_, key2, _, _, _, _)).WillOnce(Return(ConditionState::kTrue));
-
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, -1, bucketStartTimeNs,
+ ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
bucketStartTimeNs, pullerManager);
- valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
-
+ shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+ event1->write(1);
+ event1->write(10);
+ event1->init();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 15);
+ event2->write(1);
+ event2->write(15);
+ event2->init();
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+ // has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(10, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
- EXPECT_EQ(15, curInterval.value.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(5, curInterval.value.long_value);
+
+ // no change in data.
+ shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
+ event3->write(1);
+ event3->write(15);
+ event3->init();
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event3);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(15, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
+
+ shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 15);
+ event4->write(1);
+ event4->write(15);
+ event4->init();
+ valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event4);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(15, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasValue);
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(15, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+ EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().value.long_value);
}
} // namespace statsd