diff options
| -rw-r--r-- | cmds/statsd/src/external/PullDataReceiver.h | 10 | ||||
| -rw-r--r-- | cmds/statsd/src/external/StatsPullerManager.cpp | 13 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.cpp | 5 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.h | 3 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 14 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 7 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp | 22 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 160 |
8 files changed, 177 insertions, 57 deletions
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h index 0d505cb49e8f..b071682f8a59 100644 --- a/cmds/statsd/src/external/PullDataReceiver.h +++ b/cmds/statsd/src/external/PullDataReceiver.h @@ -28,9 +28,15 @@ namespace statsd { class PullDataReceiver : virtual public RefBase{ public: virtual ~PullDataReceiver() {} - virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0; + /** + * @param data The pulled data. + * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the + * bucket should be invalidated. + */ + virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, + bool pullSuccess) = 0; }; } // namespace statsd } // namespace os -} // namespace android
\ No newline at end of file +} // namespace android diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp index c69384c7077f..9b603d6c7957 100644 --- a/cmds/statsd/src/external/StatsPullerManager.cpp +++ b/cmds/statsd/src/external/StatsPullerManager.cpp @@ -358,12 +358,13 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& pullInfo : needToPull) { vector<shared_ptr<LogEvent>> data; - if (!Pull(pullInfo.first, &data)) { + bool pullSuccess = Pull(pullInfo.first, &data); + if (pullSuccess) { + StatsdStats::getInstance().notePullDelay( + pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs); + } else { VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs); - continue; } - StatsdStats::getInstance().notePullDelay(pullInfo.first, - getElapsedRealtimeNs() - elapsedTimeNs); // Convention is to mark pull atom timestamp at request time. // If we pull at t0, puller starts at t1, finishes at t2, and send back @@ -380,8 +381,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& receiverInfo : pullInfo.second) { sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote(); if (receiverPtr != nullptr) { - receiverPtr->onDataPulled(data); - // we may have just come out of a coma, compute next pull time + receiverPtr->onDataPulled(data, pullSuccess); + // We may have just come out of a coma, compute next pull time. int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs; receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index c2878f02a691..c9b71659aa58 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -406,9 +406,10 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo return gaugeFields; } -void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { +void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, + bool pullSuccess) { std::lock_guard<std::mutex> lock(mMutex); - if (allData.size() == 0) { + if (!pullSuccess || allData.size() == 0) { return; } for (const auto& data : allData) { diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index df0877954d70..d480941ed311 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -67,7 +67,8 @@ public: virtual ~GaugeMetricProducer(); // Handles when the pulled data arrives. - void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override; + void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, + bool pullSuccess) override; // GaugeMetric needs to immediately trigger another pull when we create the partial bucket. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 6aa8e842b021..9fe07e12133f 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -382,9 +382,16 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs; } -void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) { +void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, + bool pullSuccess) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition) { + if (!pullSuccess) { + // If the pull failed, we won't be able to compute a diff. + resetBase(); + return; + } + if (allData.size() == 0) { VLOG("Data pulled is empty"); StatsdStats::getInstance().noteEmptyData(mPullTagId); @@ -399,12 +406,13 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven // if the diff base will be cleared and this new data will serve as new diff base. int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; - if (bucketEndTime < mCurrentBucketStartTimeNs) { + bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs; + if (isEventLate) { VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, (long long)mCurrentBucketStartTimeNs); StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); - return; } + for (const auto& data : allData) { LogEvent localCopy = data->makeCopy(); if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index a8dfc5ba0e5d..cab50aaef55f 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -51,7 +51,8 @@ public: virtual ~ValueMetricProducer(); // Process data pulled on bucket boundary. - void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override; + void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, + bool pullSuccess) override; // ValueMetric needs special logic if it's a pulled atom. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, @@ -216,7 +217,9 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey); - FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket); FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate); }; diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp index 0ffbb54c8d48..1725160c00c7 100644 --- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp @@ -133,7 +133,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) { event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin(); EXPECT_EQ(INT, it->mValue.getType()); @@ -151,7 +151,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) { event2->write(25); event2->init(); allData.push_back(event2); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin(); EXPECT_EQ(INT, it->mValue.getType()); @@ -305,7 +305,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) { event->write(1); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -328,7 +328,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) { event->write(3); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin() @@ -371,7 +371,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled) { event->write(1); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -440,7 +440,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition) { event->write(110); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin() @@ -541,7 +541,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition) { event->write(110); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData); + gaugeProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); @@ -590,7 +590,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event1->write(13); event1->init(); - gaugeProducer.onDataPulled({event1}); + gaugeProducer.onDataPulled({event1}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -604,7 +604,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event2->write(15); event2->init(); - gaugeProducer.onDataPulled({event2}); + gaugeProducer.onDataPulled({event2}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -619,7 +619,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event3->write(26); event3->init(); - gaugeProducer.onDataPulled({event3}); + gaugeProducer.onDataPulled({event3}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -633,7 +633,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10); event4->write("some value"); event4->init(); - gaugeProducer.onDataPulled({event4}); + gaugeProducer.onDataPulled({event4}, /** succeed */ true); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty()); } diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index c0648ee70032..df0ae382a5c4 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -160,7 +160,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -177,7 +177,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -196,7 +196,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -256,7 +256,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = @@ -274,7 +274,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -292,7 +292,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -341,7 +341,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -357,7 +357,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->write(10); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -373,7 +373,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -420,7 +420,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -436,7 +436,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->write(10); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -451,7 +451,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -525,7 +525,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); @@ -635,7 +635,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); @@ -650,7 +650,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { event->write(150); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs); EXPECT_EQ(20L, @@ -689,7 +689,7 @@ TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); @@ -993,7 +993,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -1011,7 +1011,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -1032,7 +1032,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; // startUpdated:false sum:12 @@ -1120,7 +1120,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(false, curInterval.hasBase); @@ -1224,7 +1224,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -1677,7 +1677,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1762,7 +1762,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1791,7 +1791,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { event1->write(5); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1813,7 +1813,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { event2->write(5); event2->init(); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1888,7 +1888,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1918,7 +1918,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->write(5); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); @@ -1941,7 +1941,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->write(13); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1951,7 +1951,60 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); } -TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) { +TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + // Used by onConditionChanged. + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(100); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.onConditionChanged(true, bucketStartTimeNs + 8); + // has one slice + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(100, curInterval.base.long_value); + EXPECT_EQ(false, curInterval.hasValue); + + vector<shared_ptr<LogEvent>> allData; + valueProducer.onDataPulled(allData, /** succeed */ false); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, valueProducer.mHasGlobalBase); +} + +TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange) { ValueMetric metric; metric.set_id(metricId); metric.set_bucket(ONE_MINUTE); @@ -2007,6 +2060,53 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) { EXPECT_EQ(false, valueProducer.mHasGlobalBase); } +TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(0); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8); + event->write(tagId); + event->write(100); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.mCondition = true; + + vector<shared_ptr<LogEvent>> allData; + valueProducer.onDataPulled(allData, /** succeed */ false); + EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + + valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1); + EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); +} + TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) { ValueMetric metric; metric.set_id(metricId); @@ -2052,7 +2152,7 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData); + valueProducer.onDataPulled(allData, /** succeed */ true); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); |