diff options
| author | 2019-02-06 15:10:36 +0000 | |
|---|---|---|
| committer | 2019-02-06 15:10:36 +0000 | |
| commit | 8333721a67c317e0de5eb44395dd7150cf56e13c (patch) | |
| tree | e34775f5c713d6468b716f0609d3c6f018aeccc4 | |
| parent | e8c4958b2f1bd08d87506eb6a86065daead2b74c (diff) | |
| parent | 9a5d359879b5e2f7e7c0730b04242d81dd10a5c8 (diff) | |
Merge changes I300ab05c,I0910b7db
* changes:
Adds the concept of invalid bucket.
Reset the base when pull fails.
| -rw-r--r-- | cmds/statsd/src/external/PullDataReceiver.h | 10 | ||||
| -rw-r--r-- | cmds/statsd/src/external/StatsPullerManager.cpp | 13 | ||||
| -rw-r--r-- | cmds/statsd/src/guardrail/StatsdStats.cpp | 5 | ||||
| -rw-r--r-- | cmds/statsd/src/guardrail/StatsdStats.h | 6 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.cpp | 5 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.h | 3 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 128 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 24 | ||||
| -rw-r--r-- | cmds/statsd/src/stats_log.proto | 1 | ||||
| -rw-r--r-- | cmds/statsd/src/stats_log_util.cpp | 3 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp | 22 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 407 |
12 files changed, 527 insertions, 100 deletions
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h index 0d505cb49e8f..b071682f8a59 100644 --- a/cmds/statsd/src/external/PullDataReceiver.h +++ b/cmds/statsd/src/external/PullDataReceiver.h @@ -28,9 +28,15 @@ namespace statsd { class PullDataReceiver : virtual public RefBase{ public: virtual ~PullDataReceiver() {} - virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0; + /** + * @param data The pulled data. + * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the + * bucket should be invalidated. + */ + virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, + bool pullSuccess) = 0; }; } // namespace statsd } // namespace os -} // namespace android
\ No newline at end of file +} // namespace android diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp index c69384c7077f..9b603d6c7957 100644 --- a/cmds/statsd/src/external/StatsPullerManager.cpp +++ b/cmds/statsd/src/external/StatsPullerManager.cpp @@ -358,12 +358,13 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& pullInfo : needToPull) { vector<shared_ptr<LogEvent>> data; - if (!Pull(pullInfo.first, &data)) { + bool pullSuccess = Pull(pullInfo.first, &data); + if (pullSuccess) { + StatsdStats::getInstance().notePullDelay( + pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs); + } else { VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs); - continue; } - StatsdStats::getInstance().notePullDelay(pullInfo.first, - getElapsedRealtimeNs() - elapsedTimeNs); // Convention is to mark pull atom timestamp at request time. // If we pull at t0, puller starts at t1, finishes at t2, and send back @@ -380,8 +381,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& receiverInfo : pullInfo.second) { sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote(); if (receiverPtr != nullptr) { - receiverPtr->onDataPulled(data); - // we may have just come out of a coma, compute next pull time + receiverPtr->onDataPulled(data, pullSuccess); + // We may have just come out of a coma, compute next pull time. int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs; receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs; diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp index 37ccad5f4a49..a5bd5c6b6364 100644 --- a/cmds/statsd/src/guardrail/StatsdStats.cpp +++ b/cmds/statsd/src/guardrail/StatsdStats.cpp @@ -448,6 +448,11 @@ void StatsdStats::noteConditionChangeInNextBucket(int metricId) { getAtomMetricStats(metricId).conditionChangeInNextBucket++; } +void StatsdStats::noteInvalidatedBucket(int metricId) { + lock_guard<std::mutex> lock(mLock); + getAtomMetricStats(metricId).invalidatedBucket++; +} + StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int metricId) { auto atomMetricStatsIter = mAtomMetricStats.find(metricId); if (atomMetricStatsIter != mAtomMetricStats.end()) { diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h index 01e9ca17e5fd..cb17061c1ed1 100644 --- a/cmds/statsd/src/guardrail/StatsdStats.h +++ b/cmds/statsd/src/guardrail/StatsdStats.h @@ -365,6 +365,11 @@ public: void noteConditionChangeInNextBucket(int atomId); /** + * A bucket has been tagged as invalid. + */ + void noteInvalidatedBucket(int metricId); + + /** * Reset the historical stats. Including all stats in icebox, and the tracked stats about * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue * to collect stats after reset() has been called. @@ -408,6 +413,7 @@ public: long skippedForwardBuckets = 0; long badValueType = 0; long conditionChangeInNextBucket = 0; + long invalidatedBucket = 0; } AtomMetricStats; private: diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index c2878f02a691..c9b71659aa58 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -406,9 +406,10 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo return gaugeFields; } -void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { +void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, + bool pullSuccess) { std::lock_guard<std::mutex> lock(mMutex); - if (allData.size() == 0) { + if (!pullSuccess || allData.size() == 0) { return; } for (const auto& data : allData) { diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index df0877954d70..d480941ed311 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -67,7 +67,8 @@ public: virtual ~GaugeMetricProducer(); // Handles when the pulled data arrives. - void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override; + void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, + bool pullSuccess) override; // GaugeMetric needs to immediately trigger another pull when we create the partial bucket. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 6aa8e842b021..9fb78e780870 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -104,6 +104,7 @@ ValueMetricProducer::ValueMetricProducer( mSkipZeroDiffOutput(metric.skip_zero_diff_output()), mUseZeroDefaultBase(metric.use_zero_default_base()), mHasGlobalBase(false), + mCurrentBucketIsInvalid(false), mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC : StatsdStats::kPullMaxDelayNs), mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()) { @@ -308,6 +309,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } } +void ValueMetricProducer::invalidateCurrentBucket() { + if (!mCurrentBucketIsInvalid) { + // Only report once per invalid bucket. + StatsdStats::getInstance().noteInvalidatedBucket(mMetricId); + } + mCurrentBucketIsInvalid = true; + resetBase(); +} + void ValueMetricProducer::resetBase() { for (auto& slice : mCurrentSlicedBucket) { for (auto& interval : slice.second) { @@ -323,6 +333,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId); + invalidateCurrentBucket(); return; } @@ -346,19 +357,20 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector<std::shared_ptr<LogEvent>> allData; if (!mPullerManager->Pull(mPullTagId, &allData)) { ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs); - resetBase(); + invalidateCurrentBucket(); return; } const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs; + StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (pullDelayNs > mMaxPullDelayNs) { ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, (long long)mMaxPullDelayNs); StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); - StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); - resetBase(); + // We are missing one pull from the bucket which means we will not have a complete view of + // what's going on. + invalidateCurrentBucket(); return; } - StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); if (timestampNs < mCurrentBucketStartTimeNs) { // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report @@ -382,9 +394,16 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; } -void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { +void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, + bool pullSuccess) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition) { + if (!pullSuccess) { + // If the pull failed, we won't be able to compute a diff. + invalidateCurrentBucket(); + return; + } + if (allData.size() == 0) { VLOG("Data pulled is empty"); StatsdStats::getInstance().noteEmptyData(mPullTagId); @@ -399,12 +418,13 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; - if (bucketEndTime < mCurrentBucketStartTimeNs) { + bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs; + if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); - return; } + for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == @@ -679,31 +699,13 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, (int)mCurrentSlicedBucket.size()); int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); - int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; - if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; + if (isBucketLargeEnough && !mCurrentBucketIsInvalid) { // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { - ValueBucket bucket; - bucket.mBucketStartNs = mCurrentBucketStartTimeNs; - bucket.mBucketEndNs = bucketEndTime; - for (const auto& interval : slice.second) { - if (interval.hasValue) { - // skip the output if the diff is zero - if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) { - continue; - } - bucket.valueIndex.push_back(interval.valueIndex); - if (mAggregationType != ValueMetric::AVG) { - bucket.values.push_back(interval.value); - } else { - double sum = interval.value.type == LONG ? (double)interval.value.long_value - : interval.value.double_value; - bucket.values.push_back(Value((double)sum / interval.sampleSize)); - } - } - } + ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); // it will auto create new vector of ValuebucketInfo if the key is not found. if (bucket.valueIndex.size() > 0) { auto& bucketList = mPastBuckets[slice.first]; @@ -714,6 +716,58 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime); } + if (!mCurrentBucketIsInvalid) { + appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); + } + initCurrentSlicedBucket(); + mCurrentBucketIsInvalid = false; +} + +ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, + const std::vector<Interval>& intervals) { + ValueBucket bucket; + bucket.mBucketStartNs = mCurrentBucketStartTimeNs; + bucket.mBucketEndNs = bucketEndTime; + for (const auto& interval : intervals) { + if (interval.hasValue) { + // skip the output if the diff is zero + if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) { + continue; + } + bucket.valueIndex.push_back(interval.valueIndex); + if (mAggregationType != ValueMetric::AVG) { + bucket.values.push_back(interval.value); + } else { + double sum = interval.value.type == LONG ? (double)interval.value.long_value + : interval.value.double_value; + bucket.values.push_back(Value((double)sum / interval.sampleSize)); + } + } + } + return bucket; +} + +void ValueMetricProducer::initCurrentSlicedBucket() { + for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { + bool obsolete = true; + for (auto& interval : it->second) { + interval.hasValue = false; + interval.sampleSize = 0; + if (interval.seenNewData) { + obsolete = false; + } + interval.seenNewData = false; + } + + if (obsolete) { + it = mCurrentSlicedBucket.erase(it); + } else { + it++; + } + } +} + +void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. if (mCurrentFullBucket.size() > 0) { @@ -751,24 +805,6 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { mCurrentFullBucket[slice.first] += slice.second[0].value.long_value; } } - - for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) { - bool obsolete = true; - for (auto& interval : it->second) { - interval.hasValue = false; - interval.sampleSize = 0; - if (interval.seenNewData) { - obsolete = false; - } - interval.seenNewData = false; - } - - if (obsolete) { - it = mCurrentSlicedBucket.erase(it); - } else { - it++; - } - } } size_t ValueMetricProducer::byteSizeLocked() const { diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index a8dfc5ba0e5d..d9bec5d588be 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -51,7 +51,8 @@ public: virtual ~ValueMetricProducer(); // Process data pulled on bucket boundary. - void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override; + void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, + bool pullSuccess) override; // ValueMetric needs special logic if it's a pulled atom. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, @@ -102,6 +103,9 @@ private: // Calculate previous bucket end time based on current time. int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs); + // Mark the data as invalid. + void invalidateCurrentBucket(); + const int mWhatMatcherIndex; sp<EventMatcherWizard> mEventMatcherWizard; @@ -155,6 +159,11 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); + ValueBucket buildPartialBucket(int64_t bucketEndTime, + const std::vector<Interval>& intervals); + void initCurrentSlicedBucket(); + void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs); + // Reset diff base and mHasGlobalBase void resetBase(); @@ -186,6 +195,12 @@ private: // diff against. bool mHasGlobalBase; + // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot + // trust any of the data in this bucket. + // + // For instance, one pull failed. + bool mCurrentBucketIsInvalid; + const int64_t mMaxPullDelayNs; const bool mSplitBucketForAppUpgrade; @@ -216,8 +231,13 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket); FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed); + FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); }; } // namespace statsd diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto index cca09ac017a3..5b9148283bdc 100644 --- a/cmds/statsd/src/stats_log.proto +++ b/cmds/statsd/src/stats_log.proto @@ -417,6 +417,7 @@ message StatsdStatsReport { optional int64 skipped_forward_buckets = 4; optional int64 bad_value_type = 5; optional int64 condition_change_in_next_bucket = 6; + optional int64 invalidated_bucket = 7; } repeated AtomMetricStats atom_metric_stats = 17; diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp index 9c9985ed271c..3cb7563b206b 100644 --- a/cmds/statsd/src/stats_log_util.cpp +++ b/cmds/statsd/src/stats_log_util.cpp @@ -78,6 +78,7 @@ const int FIELD_ID_LATE_LOG_EVENT_SKIPPED = 3; const int FIELD_ID_SKIPPED_FORWARD_BUCKETS = 4; const int FIELD_ID_BAD_VALUE_TYPE = 5; const int FIELD_ID_CONDITION_CHANGE_IN_NEXT_BUCKET = 6; +const int FIELD_ID_INVALIDATED_BUCKET = 7; namespace { @@ -494,6 +495,8 @@ void writeAtomMetricStatsToStream(const std::pair<int, StatsdStats::AtomMetricSt (long long)pair.second.badValueType); protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_CHANGE_IN_NEXT_BUCKET, (long long)pair.second.conditionChangeInNextBucket); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_INVALIDATED_BUCKET, + (long long)pair.second.invalidatedBucket); protoOutput->end(token); } diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp index 0ffbb54c8d48..1725160c00c7 100644 --- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp @@ -133,7 +133,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) { event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin(); EXPECT_EQ(INT, it->mValue.getType()); @@ -151,7 +151,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) { event2->write(25); event2->init(); allData.push_back(event2); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin(); EXPECT_EQ(INT, it->mValue.getType()); @@ -305,7 +305,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) { event->write(1); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -328,7 +328,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) { event->write(3); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin() @@ -371,7 +371,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled) { event->write(1); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -440,7 +440,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition) { event->write(110); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin() @@ -541,7 +541,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition) { event->write(110); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); @@ -590,7 +590,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event1->write(13); event1->init(); - gaugeProducer.onDataPulled({event1}); + gaugeProducer.onDataPulled({event1}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -604,7 +604,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event2->write(15); event2->init(); - gaugeProducer.onDataPulled({event2}); + gaugeProducer.onDataPulled({event2}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -619,7 +619,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event3->write(26); event3->init(); - gaugeProducer.onDataPulled({event3}); + gaugeProducer.onDataPulled({event3}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -633,7 +633,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10); event4->write("some value"); event4->init(); - gaugeProducer.onDataPulled({event4}); + gaugeProducer.onDataPulled({event4}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty()); } diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index c0648ee70032..64045520fb0b 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -160,7 +160,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -177,7 +177,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -196,7 +196,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -256,7 +256,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = @@ -274,7 +274,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -292,7 +292,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -341,7 +341,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -357,7 +357,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->write(10); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -373,7 +373,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -420,7 +420,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -436,7 +436,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->write(10); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -451,7 +451,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -525,7 +525,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); @@ -635,7 +635,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); @@ -650,7 +650,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { event->write(150); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs); EXPECT_EQ(20L, @@ -689,7 +689,7 @@ TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); @@ -993,7 +993,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -1011,7 +1011,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -1032,7 +1032,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; // startUpdated:false sum:12 @@ -1120,7 +1120,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(false, curInterval.hasBase); @@ -1224,7 +1224,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -1677,7 +1677,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1762,7 +1762,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1791,7 +1791,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { event1->write(5); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1813,7 +1813,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { event2->write(5); event2->init(); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1888,7 +1888,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1918,7 +1918,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->write(5); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); @@ -1941,7 +1941,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->write(13); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1951,7 +1951,60 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); } -TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) { +TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + // Used by onConditionChanged. + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(100); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); + // has one slice + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(100, curInterval.base.long_value); + EXPECT_EQ(false, curInterval.hasValue); + + vector<shared_ptr<LogEvent>> allData; + valueProducer.onDataPulled(allData, /** succeed */ false); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, valueProducer.mHasGlobalBase); +} + +TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); @@ -2007,6 +2060,56 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) { EXPECT_EQ(false, valueProducer.mHasGlobalBase); } +TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(100); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.mCondition = true; + + vector<shared_ptr<LogEvent>> allData; + valueProducer.onDataPulled(allData, /** succeed */ false); + EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); + + valueProducer.onConditionChanged(false, bucketStartTimeNs + 1); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, valueProducer.mHasGlobalBase); +} + TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) { ValueMetric metric; metric.set_id(metricId); @@ -2052,7 +2155,7 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); @@ -2073,6 +2176,250 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) { EXPECT_EQ(false, valueProducer.mHasGlobalBase); } +TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First onConditionChanged + .WillOnce(Return(false)) + // Second onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(130); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.mCondition = true; + + // Bucket start. + vector<shared_ptr<LogEvent>> allData; + allData.clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1); + event->write(1); + event->write(110); + event->init(); + allData.push_back(event); + valueProducer.onDataPulled(allData, /** succeed */ true); + + // This will fail and should invalidate the whole bucket since we do not have all the data + // needed to compute the metric value when the screen was on. + valueProducer.onConditionChanged(false, bucketStartTimeNs + 2); + valueProducer.onConditionChanged(true, bucketStartTimeNs + 3); + + // Bucket end. + allData.clear(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event2->write(1); + event2->write(140); + event2->init(); + allData.push_back(event2); + valueProducer.onDataPulled(allData, /** succeed */ true); + + valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1); + + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); + // Contains base from last pull which was successful. + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(140, curInterval.base.long_value); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); +} + +TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(120); + event->init(); + data->push_back(event); + return true; + })) + // Second onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(130); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.mCondition = true; + + // Bucket start. + vector<shared_ptr<LogEvent>> allData; + allData.clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1); + event->write(1); + event->write(110); + event->init(); + allData.push_back(event); + valueProducer.onDataPulled(allData, /** succeed */ false); + + valueProducer.onConditionChanged(false, bucketStartTimeNs + 2); + valueProducer.onConditionChanged(true, bucketStartTimeNs + 3); + + // Bucket end. + allData.clear(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event2->write(1); + event2->write(140); + event2->init(); + allData.push_back(event2); + valueProducer.onDataPulled(allData, /** succeed */ true); + + valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1); + + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); + // Contains base from last pull which was successful. + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(140, curInterval.base.long_value); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); +} + +TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(120); + event->init(); + data->push_back(event); + return true; + })) + // Second onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(130); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.mCondition = true; + + // Bucket start. + vector<shared_ptr<LogEvent>> allData; + allData.clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1); + event->write(1); + event->write(110); + event->init(); + allData.push_back(event); + valueProducer.onDataPulled(allData, /** succeed */ true); + + // This will fail and should invalidate the whole bucket since we do not have all the data + // needed to compute the metric value when the screen was on. + valueProducer.onConditionChanged(false, bucketStartTimeNs + 2); + valueProducer.onConditionChanged(true, bucketStartTimeNs + 3); + + // Bucket end. + allData.clear(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event2->write(1); + event2->write(140); + event2->init(); + allData.push_back(event2); + valueProducer.onDataPulled(allData, /** succeed */ false); + + valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1); + + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); + // Last pull failed so based has been reset. + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, valueProducer.mHasGlobalBase); +} + } // namespace statsd } // namespace os } // namespace android |