diff options
author | 2019-02-25 11:24:23 +0000 | |
---|---|---|
committer | 2019-02-25 14:00:20 +0000 | |
commit | a8b7011597f668815fc6c8868327d4241a46e477 (patch) | |
tree | 2a69f7936ea96461bc5e0e03f91b3e1e03ee31bb | |
parent | 47a9efce8fe53dc4084aeafdacac5a980e47c5c1 (diff) |
Invalidate the bucket when global base is missing.
For diffs, we need a global base at the beginning of the bucket. If we
do not have a global base, it means the bucket will contain incomplete
data.
Test: atest statsd_test
Change-Id: Ifea7ce09e31d7c5c44b1820b528dfda492dd0dc9
-rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 47 | ||||
-rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 81 | ||||
-rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 84 |
3 files changed, 153 insertions, 59 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 8902774503f3..9de62a2cce03 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -155,7 +155,7 @@ ValueMetricProducer::ValueMetricProducer( mCurrentBucketStartTimeNs = startTimeNs; // Kicks off the puller immediately if condition is true and diff based. if (mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) { - pullAndMatchEventsLocked(startTimeNs); + pullAndMatchEventsLocked(startTimeNs, mCondition); } VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(), (long long)mBucketSizeNs, (long long)mTimeBaseNs); @@ -208,7 +208,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, invalidateCurrentBucket(); break; case NO_TIME_CONSTRAINTS: - pullAndMatchEventsLocked(dumpTimeNs); + pullAndMatchEventsLocked(dumpTimeNs, mCondition); break; } } @@ -364,9 +364,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, } // Pull on condition changes. + ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; bool conditionChanged = - (mCondition == ConditionState::kTrue && condition == ConditionState::kFalse) - || (mCondition == ConditionState::kFalse && condition == ConditionState::kTrue); + (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse) + || (mCondition == ConditionState::kFalse && newCondition == ConditionState::kTrue); // We do not need to pull when we go from unknown to false. // // We also pull if the condition was already true in order to be able to flush the bucket at @@ -375,16 +376,16 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, // onConditionChangedLocked might happen on bucket boundaries if this is called before // #onDataPulled. if (mIsPulled && (conditionChanged || condition)) { - pullAndMatchEventsLocked(eventTimeNs); + pullAndMatchEventsLocked(eventTimeNs, newCondition); } // When condition change from true to false, clear diff base but don't // reset other counters as we may accumulate more value in the bucket. if (mUseDiff && mCondition == ConditionState::kTrue - && condition == ConditionState::kFalse) { + && newCondition == ConditionState::kFalse) { resetBase(); } - mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; + mCondition = newCondition; } else { VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, @@ -400,7 +401,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, flushIfNeededLocked(eventTimeNs); } -void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { +void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition) { vector<std::shared_ptr<LogEvent>> allData; if (!mPullerManager->Pull(mPullTagId, &allData)) { ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); @@ -408,7 +409,7 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { return; } - accumulateEvents(allData, timestampNs, timestampNs); + accumulateEvents(allData, timestampNs, timestampNs, condition); } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { @@ -430,7 +431,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven if (isEventLate) { // If the event is late, we are in the middle of a bucket. Just // process the data without trying to snap the data to the nearest bucket. - accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs); + accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition); } else { // For scheduled pulled data, the effective event time is snap to the nearest // bucket end. In the case of waking up from a deep sleep state, we will @@ -444,7 +445,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; StatsdStats::getInstance().noteBucketBoundaryDelayNs( mMetricId, originalPullTimeNs - bucketEndTime); - accumulateEvents(allData, originalPullTimeNs, bucketEndTime); + accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition); } } } @@ -455,7 +456,8 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven } void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, - int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { + int64_t originalPullTimeNs, int64_t eventElapsedTimeNs, + ConditionState condition) { bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", @@ -817,12 +819,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, if (!mCurrentBucketIsInvalid) { appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); } - StatsdStats::getInstance().noteBucketCount(mMetricId); - initCurrentSlicedBucket(); - mCurrentBucketIsInvalid = false; - mCurrentBucketStartTimeNs = nextBucketStartTimeNs; - VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, - (long long)mCurrentBucketStartTimeNs); + initCurrentSlicedBucket(nextBucketStartTimeNs); } ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, @@ -849,7 +846,9 @@ ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, return bucket; } -void ValueMetricProducer::initCurrentSlicedBucket() { +void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) { + StatsdStats::getInstance().noteBucketCount(mMetricId); + // Cleanup data structure to aggregate values. for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { bool obsolete = true; for (auto& interval : it->second) { @@ -867,6 +866,16 @@ void ValueMetricProducer::initCurrentSlicedBucket() { it++; } } + + mCurrentBucketIsInvalid = false; + // If we do not have a global base when the condition is true, + // we will have incomplete bucket for the next bucket. + if (mUseDiff && !mHasGlobalBase && mCondition) { + mCurrentBucketIsInvalid = false; + } + mCurrentBucketStartTimeNs = nextBucketStartTimeNs; + VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, + (long long)mCurrentBucketStartTimeNs); } void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 632479797cd6..f317c3768dd9 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -69,7 +69,7 @@ public: return; } if (mIsPulled && mCondition) { - pullAndMatchEventsLocked(eventTimeNs); + pullAndMatchEventsLocked(eventTimeNs, mCondition); } flushCurrentBucketLocked(eventTimeNs, eventTimeNs); }; @@ -100,7 +100,7 @@ private: void dumpStatesLocked(FILE* out, bool verbose) const override; - // For pulled metrics, this method should only be called if a pulled have be done. Else we will + // For pulled metrics, this method should only be called if a pull has be done. Else we will // not have complete data for the bucket. void flushIfNeededLocked(const int64_t& eventTime) override; @@ -176,14 +176,15 @@ private: bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); - void pullAndMatchEventsLocked(const int64_t timestampNs); + void pullAndMatchEventsLocked(const int64_t timestampNs, ConditionState condition); void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, - int64_t originalPullTimeNs, int64_t eventElapsedTimeNs); + int64_t originalPullTimeNs, int64_t eventElapsedTimeNs, + ConditionState condition); ValueBucket buildPartialBucket(int64_t bucketEndTime, const std::vector<Interval>& intervals); - void initCurrentSlicedBucket(); + void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs); void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs); // Reset diff base and mHasGlobalBase @@ -227,52 +228,54 @@ private: const bool mSplitBucketForAppUpgrade; + FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); + FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); + FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid); + FRIEND_TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet); + FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); + FRIEND_TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged); + FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary); + FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged); + FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled); + FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); + FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed); + FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff); + FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff); + FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated); + FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition); - FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset); - FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition); - FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); + FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade); - FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated); FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse); FRIEND_TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled); - FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); - FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition); - FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2); - FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin); - FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateAvg); + FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMax); + FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateMin); FRIEND_TEST(ValueMetricProducerTest, TestPushedAggregateSum); - FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); - FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); + FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithCondition); + FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithUpgrade); + FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutputMultiValue); + FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); - FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed); - FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded); - FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange); - FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled); - FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged); - FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary); - FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries); - FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange); - FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade); - FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff); - FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff); friend class ValueMetricProducerTestHelper; }; diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 01a0c91836d0..e5e453490159 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -55,8 +55,12 @@ double epsilon = 0.001; static void assertPastBucketValuesSingleKey( const std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>>& mPastBuckets, const std::initializer_list<int>& expectedValuesList) { - std::vector<int> expectedValues(expectedValuesList); + if (expectedValues.size() == 0) { + ASSERT_EQ(0, mPastBuckets.size()); + return; + } + ASSERT_EQ(1, mPastBuckets.size()); ASSERT_EQ(expectedValues.size(), mPastBuckets.begin()->second.size()); @@ -2613,6 +2617,84 @@ TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade) { assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9}); } +TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged) { + ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First on condition changed. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1)); + return true; + })) + // Second on condition changed. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 3)); + return true; + })); + + sp<ValueMetricProducer> valueProducer = + ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); + + valueProducer->onConditionChanged(true, bucketStartTimeNs + 8); + valueProducer->onConditionChanged(false, bucketStartTimeNs + 10); + valueProducer->onConditionChanged(false, bucketStartTimeNs + 10); + + EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); + auto curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(2, curInterval.value.long_value); +} + +TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet) { + ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First condition change. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1)); + return true; + })) + // 2nd condition change. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 1)); + return true; + })) + // 3rd condition change. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 1)); + return true; + })); + + sp<ValueMetricProducer> valueProducer = + ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); + valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10); + + vector<shared_ptr<LogEvent>> allData; + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 3, 10)); + valueProducer->onDataPulled(allData, /** succeed */ false, bucketStartTimeNs + 3); + + allData.clear(); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20)); + valueProducer->onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs); + + valueProducer->onConditionChanged(false, bucket2StartTimeNs + 8); + valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10); + + allData.clear(); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs, 30)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + + // There was not global base available so all buckets are invalid. + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {}); +} + static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) { vector<uint8_t> bytes; bytes.resize(proto->size()); |