summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp47
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.h81
-rw-r--r--cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp84
3 files changed, 153 insertions, 59 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 8902774503f3..9de62a2cce03 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -155,7 +155,7 @@ ValueMetricProducer::ValueMetricProducer(
mCurrentBucketStartTimeNs = startTimeNs;
// Kicks off the puller immediately if condition is true and diff based.
if (mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
- pullAndMatchEventsLocked(startTimeNs);
+ pullAndMatchEventsLocked(startTimeNs, mCondition);
}
VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
(long long)mBucketSizeNs, (long long)mTimeBaseNs);
@@ -208,7 +208,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
invalidateCurrentBucket();
break;
case NO_TIME_CONSTRAINTS:
- pullAndMatchEventsLocked(dumpTimeNs);
+ pullAndMatchEventsLocked(dumpTimeNs, mCondition);
break;
}
}
@@ -364,9 +364,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
}
// Pull on condition changes.
+ ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
bool conditionChanged =
- (mCondition == ConditionState::kTrue && condition == ConditionState::kFalse)
- || (mCondition == ConditionState::kFalse && condition == ConditionState::kTrue);
+ (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)
+ || (mCondition == ConditionState::kFalse && newCondition == ConditionState::kTrue);
// We do not need to pull when we go from unknown to false.
//
// We also pull if the condition was already true in order to be able to flush the bucket at
@@ -375,16 +376,16 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
// onConditionChangedLocked might happen on bucket boundaries if this is called before
// #onDataPulled.
if (mIsPulled && (conditionChanged || condition)) {
- pullAndMatchEventsLocked(eventTimeNs);
+ pullAndMatchEventsLocked(eventTimeNs, newCondition);
}
// When condition change from true to false, clear diff base but don't
// reset other counters as we may accumulate more value in the bucket.
if (mUseDiff && mCondition == ConditionState::kTrue
- && condition == ConditionState::kFalse) {
+ && newCondition == ConditionState::kFalse) {
resetBase();
}
- mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
+ mCondition = newCondition;
} else {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -400,7 +401,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
flushIfNeededLocked(eventTimeNs);
}
-void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
+void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition) {
vector<std::shared_ptr<LogEvent>> allData;
if (!mPullerManager->Pull(mPullTagId, &allData)) {
ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
@@ -408,7 +409,7 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
return;
}
- accumulateEvents(allData, timestampNs, timestampNs);
+ accumulateEvents(allData, timestampNs, timestampNs, condition);
}
int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -430,7 +431,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
if (isEventLate) {
// If the event is late, we are in the middle of a bucket. Just
// process the data without trying to snap the data to the nearest bucket.
- accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
+ accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition);
} else {
// For scheduled pulled data, the effective event time is snap to the nearest
// bucket end. In the case of waking up from a deep sleep state, we will
@@ -444,7 +445,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
StatsdStats::getInstance().noteBucketBoundaryDelayNs(
mMetricId, originalPullTimeNs - bucketEndTime);
- accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
+ accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition);
}
}
}
@@ -455,7 +456,8 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
}
void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
- int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
+ int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
+ ConditionState condition) {
bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
if (isEventLate) {
VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
@@ -817,12 +819,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
if (!mCurrentBucketIsInvalid) {
appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
}
- StatsdStats::getInstance().noteBucketCount(mMetricId);
- initCurrentSlicedBucket();
- mCurrentBucketIsInvalid = false;
- mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
- VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
- (long long)mCurrentBucketStartTimeNs);
+ initCurrentSlicedBucket(nextBucketStartTimeNs);
}
ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
@@ -849,7 +846,9 @@ ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
return bucket;
}
-void ValueMetricProducer::initCurrentSlicedBucket() {
+void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
+ StatsdStats::getInstance().noteBucketCount(mMetricId);
+ // Cleanup data structure to aggregate values.
for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
bool obsolete = true;
for (auto& interval : it->second) {
@@ -867,6 +866,16 @@ void ValueMetricProducer::initCurrentSlicedBucket() {
it++;
}
}
+
+ mCurrentBucketIsInvalid = false;
+ // If we do not have a global base when the condition is true,
+ // we will have incomplete bucket for the next bucket.
+ if (mUseDiff && !mHasGlobalBase && mCondition) {
+ mCurrentBucketIsInvalid = false;
+ }
+ mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
+ VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
+ (long long)mCurrentBucketStartTimeNs);
}
void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 632479797cd6..f317c3768dd9 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -69,7 +69,7 @@ public:
return;
}
if (mIsPulled && mCondition) {
- pullAndMatchEventsLocked(eventTimeNs);
+ pullAndMatchEventsLocked(eventTimeNs, mCondition);
}
flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
};
@@ -100,7 +100,7 @@ private:
void dumpStatesLocked(FILE* out, bool verbose) const override;
- // For pulled metrics, this method should only be called if a pulled have be done. Else we will
+ // For pulled metrics, this method should only be called if a pull has be done. Else we will
// not have complete data for the bucket.
void flushIfNeededLocked(const int64_t& eventTime) override;
@@ -176,14 +176,15 @@ private:
bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey);
- void pullAndMatchEventsLocked(const int64_t timestampNs);
+ void pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition);
void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
- int64_t originalPullTimeNs, int64_t eventElapsedTimeNs);
+ int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
+ ConditionState condition);
ValueBucket buildPartialBucket(int64_t bucketEndTime,
const std::vector<Interval>& intervals);
- void initCurrentSlicedBucket();
+ void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs);
void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);
// Reset diff base and mHasGlobalBase
@@ -227,52 +228,54 @@ private:
const bool mSplitBucketForAppUpgrade;
+ FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
+ FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet);
+ FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
+ FRIEND_TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged);
+ FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
+ FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged);
+ FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
+ FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
+ FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
+ FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
+ FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
+ FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated);
+ FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
- FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset);
- FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade);
+ FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade);
- FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated);
FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse);
FRIEND_TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition);
- FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
- FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg);
+ FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax);
+ FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin);
FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum);
- FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
- FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
+ FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition);
+ FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade);
+ FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue);
+ FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
- FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
- FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
- FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit);
- FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
- FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
- FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
- FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
- FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged);
- FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
- FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
- FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
- FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
- FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
friend class ValueMetricProducerTestHelper;
};
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 01a0c91836d0..e5e453490159 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -55,8 +55,12 @@ double epsilon = 0.001;
static void assertPastBucketValuesSingleKey(
const std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>>& mPastBuckets,
const std::initializer_list<int>& expectedValuesList) {
-
std::vector<int> expectedValues(expectedValuesList);
+ if (expectedValues.size() == 0) {
+ ASSERT_EQ(0, mPastBuckets.size());
+ return;
+ }
+
ASSERT_EQ(1, mPastBuckets.size());
ASSERT_EQ(expectedValues.size(), mPastBuckets.begin()->second.size());
@@ -2613,6 +2617,84 @@ TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade) {
assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9});
}
+TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged) {
+ ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // First on condition changed.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1));
+ return true;
+ }))
+ // Second on condition changed.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 3));
+ return true;
+ }));
+
+ sp<ValueMetricProducer> valueProducer =
+ ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+ valueProducer->onConditionChanged(true, bucketStartTimeNs + 8);
+ valueProducer->onConditionChanged(false, bucketStartTimeNs + 10);
+ valueProducer->onConditionChanged(false, bucketStartTimeNs + 10);
+
+ EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
+ auto curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasValue);
+ EXPECT_EQ(2, curInterval.value.long_value);
+}
+
+TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet) {
+ ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // First condition change.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1));
+ return true;
+ }))
+ // 2nd condition change.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 1));
+ return true;
+ }))
+ // 3rd condition change.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 1));
+ return true;
+ }));
+
+ sp<ValueMetricProducer> valueProducer =
+ ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+ valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10);
+
+ vector<shared_ptr<LogEvent>> allData;
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 3, 10));
+ valueProducer->onDataPulled(allData, /** succeed */ false, bucketStartTimeNs + 3);
+
+ allData.clear();
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20));
+ valueProducer->onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs);
+
+ valueProducer->onConditionChanged(false, bucket2StartTimeNs + 8);
+ valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10);
+
+ allData.clear();
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs, 30));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+
+ // There was not global base available so all buckets are invalid.
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {});
+}
+
static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) {
vector<uint8_t> bytes;
bytes.resize(proto->size());