diff options
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 164 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 23 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 278 |
3 files changed, 323 insertions, 142 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 8d1cf33b0001..8902774503f3 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -174,13 +174,17 @@ void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition } void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) { - flushIfNeededLocked(dropTimeNs); StatsdStats::getInstance().noteBucketDropped(mMetricId); - mPastBuckets.clear(); + // We are going to flush the data without doing a pull first so we need to invalidte the data. + bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; + if (pullNeeded) { + invalidateCurrentBucket(); + } + flushIfNeededLocked(dropTimeNs); + clearPastBucketsLocked(dropTimeNs); } void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { - flushIfNeededLocked(dumpTimeNs); mPastBuckets.clear(); mSkippedBuckets.clear(); } @@ -192,7 +196,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, std::set<string> *str_set, ProtoOutputStream* protoOutput) { VLOG("metric %lld dump report now...", (long long)mMetricId); - flushIfNeededLocked(dumpTimeNs); if (include_current_partial_bucket) { // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the // current bucket will have incomplete data and the next will have the wrong snapshot to do @@ -208,7 +211,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, pullAndMatchEventsLocked(dumpTimeNs); break; } - } + } flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); @@ -325,12 +328,16 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } } -void ValueMetricProducer::invalidateCurrentBucket() { +void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() { if (!mCurrentBucketIsInvalid) { // Only report once per invalid bucket. StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); } mCurrentBucketIsInvalid = true; +} + +void ValueMetricProducer::invalidateCurrentBucket() { + invalidateCurrentBucketWithoutResetBase(); resetBase(); } @@ -345,42 +352,58 @@ void ValueMetricProducer::resetBase() { void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { - if (eventTimeNs < mCurrentBucketStartTimeNs) { - VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, - (long long)mCurrentBucketStartTimeNs); - StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); - invalidateCurrentBucket(); - return; - } - - flushIfNeededLocked(eventTimeNs); + bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs; + if (!isEventTooLate) { + if (mCondition == ConditionState::kUnknown) { + // If the condition was unknown, we mark the bucket as invalid since the bucket will + // contain partial data. For instance, the condition change might happen close to the + // end of the bucket and we might miss lots of data. + // + // We still want to pull to set the base. + invalidateCurrentBucket(); + } - if (mCondition != ConditionState::kUnknown) { // Pull on condition changes. - bool conditionChanged = mCondition != condition; + bool conditionChanged = + (mCondition == ConditionState::kTrue && condition == ConditionState::kFalse) + || (mCondition == ConditionState::kFalse && condition == ConditionState::kTrue); // We do not need to pull when we go from unknown to false. - if (mIsPulled && conditionChanged) { + // + // We also pull if the condition was already true in order to be able to flush the bucket at + // the end if needed. + // + // onConditionChangedLocked might happen on bucket boundaries if this is called before + // #onDataPulled. + if (mIsPulled && (conditionChanged || condition)) { pullAndMatchEventsLocked(eventTimeNs); } - // when condition change from true to false, clear diff base but don't + // When condition change from true to false, clear diff base but don't // reset other counters as we may accumulate more value in the bucket. - if (mUseDiff && mCondition == ConditionState::kTrue && condition == ConditionState::kFalse) { + if (mUseDiff && mCondition == ConditionState::kTrue + && condition == ConditionState::kFalse) { resetBase(); } + mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; + } else { - // If the condition was unknown, we mark the bucket as invalid since the bucket will contain - // partial data. For instance, the condition change might happen close to the end of the - // bucket and we might miss lots of data. + VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, + (long long)mCurrentBucketStartTimeNs); + StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); invalidateCurrentBucket(); + // Something weird happened. If we received another event if the future, the condition might + // be wrong. + mCondition = ConditionState::kUnknown; } - mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse; + + // This part should alway be called. + flushIfNeededLocked(eventTimeNs); } void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector<std::shared_ptr<LogEvent>> allData; if (!mPullerManager->Pull(mPullTagId, &allData)) { - ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); + ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); invalidateCurrentBucket(); return; } @@ -392,38 +415,46 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; } +// By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely +// to be delayed. Other events like condition changes or app upgrade which are not based on +// AlarmManager might have arrived earlier and close the bucket. void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); - if (mCondition == ConditionState::kTrue) { - if (!pullSuccess) { + if (mCondition == ConditionState::kTrue) { // If the pull failed, we won't be able to compute a diff. - invalidateCurrentBucket(); - return; + if (!pullSuccess) { + invalidateCurrentBucket(); + } else { + bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs(); + if (isEventLate) { + // If the event is late, we are in the middle of a bucket. Just + // process the data without trying to snap the data to the nearest bucket. + accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs); + } else { + // For scheduled pulled data, the effective event time is snap to the nearest + // bucket end. In the case of waking up from a deep sleep state, we will + // attribute to the previous bucket end. If the sleep was long but not very + // long, we will be in the immediate next bucket. Previous bucket may get a + // larger number as we pull at a later time than real bucket end. + // + // If the sleep was very long, we skip more than one bucket before sleep. In + // this case, if the diff base will be cleared and this new data will serve as + // new diff base. + int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; + StatsdStats::getInstance().noteBucketBoundaryDelayNs( + mMetricId, originalPullTimeNs - bucketEndTime); + accumulateEvents(allData, originalPullTimeNs, bucketEndTime); + } + } } - // For scheduled pulled data, the effective event time is snap to the nearest - // bucket end. In the case of waking up from a deep sleep state, we will - // attribute to the previous bucket end. If the sleep was long but not very long, we - // will be in the immediate next bucket. Previous bucket may get a larger number as - // we pull at a later time than real bucket end. - // If the sleep was very long, we skip more than one bucket before sleep. In this case, - // if the diff base will be cleared and this new data will serve as new diff base. - int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; - StatsdStats::getInstance().noteBucketBoundaryDelayNs( - mMetricId, originalPullTimeNs - bucketEndTime); - accumulateEvents(allData, originalPullTimeNs, bucketEndTime); - - // We can probably flush the bucket. Since we used bucketEndTime when calling - // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. - flushIfNeededLocked(originalPullTimeNs); - - } else { - VLOG("No need to commit data on condition false."); - } + // We can probably flush the bucket. Since we used bucketEndTime when calling + // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. + flushIfNeededLocked(originalPullTimeNs); } -void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, +void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; if (isEventLate) { @@ -463,7 +494,7 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log // If the new pulled data does not contains some keys we track in our intervals, we need to // reset the base. for (auto& slice : mCurrentSlicedBucket) { - bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) + bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) != mMatchedMetricDimensionKeys.end(); if (!presentInPulledData) { for (auto& interval : slice.second) { @@ -587,7 +618,10 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn } mMatchedMetricDimensionKeys.insert(eventKey); - flushIfNeededLocked(eventTimeNs); + if (!mIsPulled) { + // We cannot flush without doing a pull first. + flushIfNeededLocked(eventTimeNs); + } // For pulled data, we already check condition when we decide to pull or // in onDataPulled. So take all of them. @@ -722,26 +756,26 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn } } +// For pulled metrics, we always need to make sure we do a pull before flushing the bucket +// if mCondition is true! void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); - if (eventTimeNs < currentBucketEndTimeNs) { VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, (long long)(currentBucketEndTimeNs)); return; } - - int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; + int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs); +} - mCurrentBucketNum += numBucketsForward; - if (numBucketsForward > 1) { - VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); - StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); - // take base again in future good bucket. - resetBase(); +int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const { + int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs(); + if (eventTimeNs < currentBucketEndTimeNs) { + return 0; } + return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; } void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, @@ -750,6 +784,16 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); } + int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); + mCurrentBucketNum += numBucketsForward; + if (numBucketsForward > 1) { + VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); + StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); + // Something went wrong. Maybe the device was sleeping for a long time. It is better + // to mark the current bucket as invalid. The last pull might have been successful through. + invalidateCurrentBucketWithoutResetBase(); + } + VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 24e14b1ffc92..632479797cd6 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -39,6 +39,13 @@ struct ValueBucket { std::vector<Value> values; }; + +// Aggregates values within buckets. +// +// There are different events that might complete a bucket +// - a condition change +// - an app upgrade +// - an alarm set to the end of the bucket class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver { public: ValueMetricProducer(const ConfigKey& key, const ValueMetric& valueMetric, @@ -61,9 +68,8 @@ public: if (!mSplitBucketForAppUpgrade) { return; } - flushIfNeededLocked(eventTimeNs - 1); if (mIsPulled && mCondition) { - pullAndMatchEventsLocked(eventTimeNs - 1); + pullAndMatchEventsLocked(eventTimeNs); } flushCurrentBucketLocked(eventTimeNs, eventTimeNs); }; @@ -94,9 +100,12 @@ private: void dumpStatesLocked(FILE* out, bool verbose) const override; - // Util function to flush the old packet. + // For pulled metrics, this method should only be called if a pulled have be done. Else we will + // not have complete data for the bucket. void flushIfNeededLocked(const int64_t& eventTime) override; + // For pulled metrics, this method should only be called if a pulled have be done. Else we will + // not have complete data for the bucket. void flushCurrentBucketLocked(const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) override; @@ -105,8 +114,12 @@ private: // Calculate previous bucket end time based on current time. int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs); + // Calculate how many buckets are present between the current bucket and eventTimeNs. + int64_t calcBucketsForwardCount(const int64_t& eventTimeNs) const; + // Mark the data as invalid. void invalidateCurrentBucket(); + void invalidateCurrentBucketWithoutResetBase(); const int mWhatMatcherIndex; @@ -256,6 +269,10 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary); FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries); FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade); + FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff); + FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff); friend class ValueMetricProducerTestHelper; }; diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 28caedea5b79..01a0c91836d0 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -52,10 +52,34 @@ const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs; const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs; double epsilon = 0.001; +static void assertPastBucketValuesSingleKey( + const std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>>& mPastBuckets, + const std::initializer_list<int>& expectedValuesList) { + + std::vector<int> expectedValues(expectedValuesList); + ASSERT_EQ(1, mPastBuckets.size()); + ASSERT_EQ(expectedValues.size(), mPastBuckets.begin()->second.size()); + + auto buckets = mPastBuckets.begin()->second; + for (int i = 0; i < expectedValues.size(); i++) { + EXPECT_EQ(expectedValues[i], buckets[i].values[0].long_value) + << "Values differ at index " << i; + } +} + class ValueMetricProducerTestHelper { public: + static shared_ptr<LogEvent> createEvent(int64_t eventTimeNs, int64_t value) { + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, eventTimeNs); + event->write(tagId); + event->write(value); + event->write(value); + event->init(); + return event; + } + static sp<ValueMetricProducer> createValueProducerNoConditions( sp<MockStatsPullerManager>& pullerManager, ValueMetric& metric) { UidMap uidMap; @@ -541,7 +565,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { data->clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); event->write(tagId); - event->write(120); + event->write(130); event->init(); data->push_back(event); return true; @@ -569,6 +593,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { event->init(); allData.push_back(event); valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {10}); // has one slice EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); @@ -577,16 +602,15 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { EXPECT_EQ(110, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); EXPECT_EQ(10, curInterval.value.long_value); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.size()); - EXPECT_EQ(10, valueProducer->mPastBuckets.begin()->second[0].values[0].long_value); valueProducer->onConditionChanged(false, bucket2StartTimeNs + 1); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {10}); // has one slice EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasValue); - EXPECT_EQ(10, curInterval.value.long_value); + EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(false, curInterval.hasBase); } @@ -805,10 +829,8 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(30, curInterval.value.long_value); - valueProducer.flushIfNeededLocked(bucket3StartTimeNs); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); + valueProducer.flushIfNeededLocked(bucket2StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {30}); } TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) { @@ -872,10 +894,8 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) { curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(50, curInterval.value.long_value); - valueProducer.flushIfNeededLocked(bucket3StartTimeNs); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); + valueProducer.flushIfNeededLocked(bucket2StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {50}); } TEST(ValueMetricProducerTest, TestAnomalyDetection) { @@ -1006,8 +1026,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { EXPECT_EQ(true, curInterval.hasBase); EXPECT_EQ(23, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.size()); - EXPECT_EQ(12, valueProducer->mPastBuckets.begin()->second.back().values[0].long_value); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12}); // pull 3 come late. // The previous bucket gets closed with error. (Has start value 23, no ending) @@ -1026,9 +1045,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { EXPECT_EQ(true, curInterval.hasBase); EXPECT_EQ(36, curInterval.base.long_value); EXPECT_EQ(false, curInterval.hasValue); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.begin()->second.size()); - EXPECT_EQ(12, valueProducer->mPastBuckets.begin()->second.back().values[0].long_value); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12}); } /* @@ -1076,27 +1093,19 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { // pull on bucket boundary come late, condition change happens before it valueProducer->onConditionChanged(false, bucket2StartTimeNs + 1); curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20}); EXPECT_EQ(false, curInterval.hasBase); - EXPECT_EQ(true, curInterval.hasValue); - EXPECT_EQ(20, curInterval.value.long_value); - EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); // Now the alarm is delivered. // since the condition turned to off before this pull finish, it has no effect vector<shared_ptr<LogEvent>> allData; - allData.clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30); - event->write(1); - event->write(110); - event->init(); - allData.push_back(event); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 30, 110)); valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20}); curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(false, curInterval.hasBase); - EXPECT_EQ(true, curInterval.hasValue); - EXPECT_EQ(20, curInterval.value.long_value); - EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); + EXPECT_EQ(false, curInterval.hasValue); } /* @@ -1155,37 +1164,37 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { // pull on bucket boundary come late, condition change happens before it valueProducer->onConditionChanged(false, bucket2StartTimeNs + 1); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20}); + EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(false, curInterval.hasBase); - EXPECT_EQ(true, curInterval.hasValue); - EXPECT_EQ(20, curInterval.value.long_value); - EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); + EXPECT_EQ(false, curInterval.hasValue); // condition changed to true again, before the pull alarm is delivered valueProducer->onConditionChanged(true, bucket2StartTimeNs + 25); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20}); curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); EXPECT_EQ(130, curInterval.base.long_value); - EXPECT_EQ(true, curInterval.hasValue); - EXPECT_EQ(20, curInterval.value.long_value); - EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); + EXPECT_EQ(false, curInterval.hasValue); - // Now the alarm is delivered, but it is considered late, the bucket is invalidated. + // Now the alarm is delivered, but it is considered late, the data will be used + // for the new bucket since it was just pulled. vector<shared_ptr<LogEvent>> allData; - allData.clear(); - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50); - event->write(1); - event->write(110); - event->init(); - allData.push_back(event); - valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 50, 140)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs + 50); curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; - EXPECT_EQ(false, curInterval.hasBase); - EXPECT_EQ(130, curInterval.base.long_value); + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(140, curInterval.base.long_value); EXPECT_EQ(true, curInterval.hasValue); - EXPECT_EQ(20, curInterval.value.long_value); - EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); + EXPECT_EQ(10, curInterval.value.long_value); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20}); + + allData.clear(); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs, 160)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20, 30}); } TEST(ValueMetricProducerTest, TestPushedAggregateMin) { @@ -1227,10 +1236,8 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMin) { curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(10, curInterval.value.long_value); - valueProducer.flushIfNeededLocked(bucket3StartTimeNs); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); + valueProducer.flushIfNeededLocked(bucket2StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {10}); } TEST(ValueMetricProducerTest, TestPushedAggregateMax) { @@ -1273,9 +1280,9 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMax) { EXPECT_EQ(20, curInterval.value.long_value); valueProducer.flushIfNeededLocked(bucket3StartTimeNs); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); + /* EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); */ + /* EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); */ + /* EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); */ } TEST(ValueMetricProducerTest, TestPushedAggregateAvg) { @@ -1320,7 +1327,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateAvg) { EXPECT_EQ(25, curInterval.value.long_value); EXPECT_EQ(2, curInterval.sampleSize); - valueProducer.flushIfNeededLocked(bucket3StartTimeNs); + valueProducer.flushIfNeededLocked(bucket2StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_TRUE(std::abs(valueProducer.mPastBuckets.begin()->second.back().values[0].double_value - 12.5) < epsilon); @@ -1365,10 +1372,8 @@ TEST(ValueMetricProducerTest, TestPushedAggregateSum) { curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(25, curInterval.value.long_value); - valueProducer.flushIfNeededLocked(bucket3StartTimeNs); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); - EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); - EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); + valueProducer.flushIfNeededLocked(bucket2StartTimeNs); + assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {25}); } TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { @@ -1700,10 +1705,8 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); - EXPECT_EQ(4, interval2.base.long_value); + EXPECT_EQ(5, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); - EXPECT_EQ(true, interval1.hasBase); - EXPECT_EQ(false, interval1.hasValue); EXPECT_EQ(true, valueProducer->mHasGlobalBase); EXPECT_EQ(2UL, valueProducer->mPastBuckets.size()); @@ -1721,14 +1724,16 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { valueProducer->onDataPulled(allData, /** succeed */ true, bucket5StartTimeNs); EXPECT_EQ(2UL, valueProducer->mCurrentSlicedBucket.size()); - EXPECT_EQ(true, interval2.hasBase); - EXPECT_EQ(5, interval2.base.long_value); - EXPECT_EQ(false, interval2.hasValue); - EXPECT_EQ(5, interval2.value.long_value); - EXPECT_EQ(true, interval1.hasBase); - EXPECT_EQ(13, interval1.base.long_value); - EXPECT_EQ(false, interval1.hasValue); - EXPECT_EQ(8, interval1.value.long_value); + auto it1 = std::next(valueProducer->mCurrentSlicedBucket.begin())->second[0]; + EXPECT_EQ(true, it1.hasBase); + EXPECT_EQ(13, it1.base.long_value); + EXPECT_EQ(false, it1.hasValue); + EXPECT_EQ(8, it1.value.long_value); + auto it2 = valueProducer->mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, it2.hasBase); + EXPECT_EQ(5, it2.base.long_value); + EXPECT_EQ(false, it2.hasValue); + EXPECT_EQ(5, it2.value.long_value); EXPECT_EQ(true, valueProducer->mHasGlobalBase); EXPECT_EQ(2UL, valueProducer->mPastBuckets.size()); } @@ -1785,8 +1790,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { EXPECT_EQ(false, interval1.hasValue); EXPECT_EQ(8, interval1.value.long_value); EXPECT_FALSE(interval1.seenNewData); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.size()); - EXPECT_EQ(8, valueProducer->mPastBuckets.begin()->second[0].values[0].long_value); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {8}); auto it = valueProducer->mCurrentSlicedBucket.begin(); for (; it != valueProducer->mCurrentSlicedBucket.end(); it++) { @@ -1801,7 +1805,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { EXPECT_EQ(4, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); EXPECT_FALSE(interval2.seenNewData); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.size()); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {8}); // next pull somehow did not happen, skip to end of bucket 3 allData.clear(); @@ -1811,7 +1815,6 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->init(); allData.push_back(event1); valueProducer->onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); - // Only one interval left. One was trimmed. EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); interval2 = valueProducer->mCurrentSlicedBucket.begin()->second[0]; @@ -1820,7 +1823,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { EXPECT_EQ(5, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); EXPECT_FALSE(interval2.seenNewData); - EXPECT_EQ(1UL, valueProducer->mPastBuckets.size()); + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {8}); allData.clear(); event1 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1); @@ -1835,7 +1838,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { EXPECT_EQ(14, interval2.base.long_value); EXPECT_EQ(false, interval2.hasValue); EXPECT_FALSE(interval2.seenNewData); - EXPECT_EQ(2UL, valueProducer->mPastBuckets.size()); + ASSERT_EQ(2UL, valueProducer->mPastBuckets.size()); auto iterator = valueProducer->mPastBuckets.begin(); EXPECT_EQ(9, iterator->second[0].values[0].long_value); iterator++; @@ -2106,7 +2109,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit) { // First onConditionChanged .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { for (int i = 0; i < 2000; i++) { - shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1); event->write(i); event->write(i); event->init(); @@ -2117,9 +2120,9 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit) { sp<ValueMetricProducer> valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); - valueProducer->mCondition = ConditionState::kFalse; - valueProducer->onConditionChanged(true, bucket2StartTimeNs + 2); + + valueProducer->onConditionChanged(true, bucketStartTimeNs + 2); EXPECT_EQ(true, valueProducer->mCurrentBucketIsInvalid); EXPECT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size()); } @@ -2493,6 +2496,123 @@ TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid) { EXPECT_EQ(0UL, valueProducer->mPastBuckets.size()); } +TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange) { + ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Second onConditionChanged. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 10, 5)); + return true; + })) + // Third onConditionChanged. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs + 10, 7)); + return true; + })); + + sp<ValueMetricProducer> valueProducer = + ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); + valueProducer->mCondition = ConditionState::kUnknown; + + valueProducer->onConditionChanged(false, bucketStartTimeNs); + ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size()); + + // End of first bucket + vector<shared_ptr<LogEvent>> allData; + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 1, 4)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs + 1); + ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size()); + + valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10); + ASSERT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size()); + auto curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(5, curInterval.base.long_value); + EXPECT_EQ(false, curInterval.hasValue); + + valueProducer->onConditionChanged(false, bucket3StartTimeNs + 10); + + // Bucket should have been completed. + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {2}); +} + +TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff) { + ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); + metric.set_use_diff(false); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + sp<ValueMetricProducer> valueProducer = + ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric); + + vector<shared_ptr<LogEvent>> allData; + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 30, 10)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucketStartTimeNs + 30); + + allData.clear(); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + + // Bucket should have been completed. + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {30}); +} + +TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff) { + ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Initialization. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1)); + return true; + })); + + sp<ValueMetricProducer> valueProducer = + ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric); + + vector<shared_ptr<LogEvent>> allData; + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 30, 10)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucketStartTimeNs + 30); + + allData.clear(); + allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20)); + valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + + // Bucket should have been completed. + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {19}); +} + +TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade) { + ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Initialization. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1)); + return true; + })) + // notifyAppUpgrade. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 2, 10)); + return true; + })); + + sp<ValueMetricProducer> valueProducer = + ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric); + + valueProducer->notifyAppUpgrade(bucket2StartTimeNs + 2, "com.foo", 10000, 1); + + // Bucket should have been completed. + assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9}); +} + static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) { vector<uint8_t> bytes; bytes.resize(proto->size()); |