diff options
| -rw-r--r-- | cmds/statsd/src/external/PullDataReceiver.h | 3 | ||||
| -rw-r--r-- | cmds/statsd/src/external/StatsPullerManager.cpp | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.cpp | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/GaugeMetricProducer.h | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 111 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 12 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp | 22 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 340 |
8 files changed, 387 insertions, 107 deletions
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h index b071682f8a59..d2193f41b80a 100644 --- a/cmds/statsd/src/external/PullDataReceiver.h +++ b/cmds/statsd/src/external/PullDataReceiver.h @@ -32,9 +32,10 @@ class PullDataReceiver : virtual public RefBase{ * @param data The pulled data. * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the * bucket should be invalidated. + * @param originalPullTimeNs This is when all the pulls have been initiated (elapsed time). */ virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, - bool pullSuccess) = 0; + bool pullSuccess, int64_t originalPullTimeNs) = 0; }; } // namespace statsd diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp index 991bef4b14b2..ecdcd21d44dd 100644 --- a/cmds/statsd/src/external/StatsPullerManager.cpp +++ b/cmds/statsd/src/external/StatsPullerManager.cpp @@ -384,7 +384,7 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) { for (const auto& receiverInfo : pullInfo.second) { sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote(); if (receiverPtr != nullptr) { - receiverPtr->onDataPulled(data, pullSuccess); + receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs); // We may have just come out of a coma, compute next pull time. int numBucketsAhead = (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index 2609937924ac..d56a355f15d6 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -407,7 +407,7 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo } void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, - bool pullSuccess) { + bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); if (!pullSuccess || allData.size() == 0) { return; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index d480941ed311..64a18337481b 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -68,7 +68,7 @@ public: // Handles when the pulled data arrives. void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, - bool pullSuccess) override; + bool pullSuccess, int64_t originalPullTimeNs) override; // GaugeMetric needs to immediately trigger another pull when we create the partial bucket. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 1bd3ef20a578..4fc9c37a5537 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -361,34 +361,8 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { invalidateCurrentBucket(); return; } - const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs; - StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); - if (pullDelayNs > mMaxPullDelayNs) { - ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, - (long long)mMaxPullDelayNs); - StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); - // We are missing one pull from the bucket which means we will not have a complete view of - // what's going on. - invalidateCurrentBucket(); - return; - } - if (timestampNs < mCurrentBucketStartTimeNs) { - // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report - // for every event, just the pull - StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); - } - - for (const auto& data : allData) { - // make a copy before doing and changes - LogEvent localCopy = data->makeCopy(); - localCopy.setElapsedTimestampNs(timestampNs); - if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == - MatchingState::kMatched) { - onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); - } - } - mHasGlobalBase = true; + accumulateEvents(allData, timestampNs, timestampNs); } int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) { @@ -396,7 +370,7 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime } void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData, - bool pullSuccess) { + bool pullSuccess, int64_t originalPullTimeNs) { std::lock_guard<std::mutex> lock(mMutex); if (mCondition) { if (!pullSuccess) { @@ -405,11 +379,6 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven return; } - if (allData.size() == 0) { - VLOG("Data pulled is empty"); - StatsdStats::getInstance().noteEmptyData(mPullTagId); - return; - } // For scheduled pulled data, the effective event time is snap to the nearest // bucket end. In the case of waking up from a deep sleep state, we will // attribute to the previous bucket end. If the sleep was long but not very long, we @@ -417,33 +386,70 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven // we pull at a later time than real bucket end. // If the sleep was very long, we skip more than one bucket before sleep. In this case, // if the diff base will be cleared and this new data will serve as new diff base. - int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs(); - int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1; - bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs; - if (isEventLate) { - VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime, - (long long)mCurrentBucketStartTimeNs); - StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); - } - - for (const auto& data : allData) { - LogEvent localCopy = data->makeCopy(); - if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == - MatchingState::kMatched) { - localCopy.setElapsedTimestampNs(bucketEndTime); - onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); - } - } - mHasGlobalBase = true; + int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1; + accumulateEvents(allData, originalPullTimeNs, bucketEndTime); // We can probably flush the bucket. Since we used bucketEndTime when calling // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed. - flushIfNeededLocked(realEventTime); + flushIfNeededLocked(originalPullTimeNs); + } else { VLOG("No need to commit data on condition false."); } } +void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, + int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) { + bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs; + if (isEventLate) { + VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", + (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs); + StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId); + invalidateCurrentBucket(); + return; + } + + const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs; + StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs); + if (pullDelayNs > mMaxPullDelayNs) { + ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId, + (long long)mMaxPullDelayNs); + StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId); + // We are missing one pull from the bucket which means we will not have a complete view of + // what's going on. + invalidateCurrentBucket(); + return; + } + + if (allData.size() == 0) { + VLOG("Data pulled is empty"); + StatsdStats::getInstance().noteEmptyData(mPullTagId); + } + + mMatchedMetricDimensionKeys.clear(); + for (const auto& data : allData) { + LogEvent localCopy = data->makeCopy(); + if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) == + MatchingState::kMatched) { + localCopy.setElapsedTimestampNs(eventElapsedTimeNs); + onMatchedLogEventLocked(mWhatMatcherIndex, localCopy); + } + } + // If the new pulled data does not contains some keys we track in our intervals, we need to + // reset the base. + for (auto& slice : mCurrentSlicedBucket) { + bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) + != mMatchedMetricDimensionKeys.end(); + if (!presentInPulledData) { + for (auto& interval : slice.second) { + interval.hasBase = false; + } + } + } + mMatchedMetricDimensionKeys.clear(); + mHasGlobalBase = true; +} + void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const { if (mCurrentSlicedBucket.size() == 0) { return; @@ -539,6 +545,7 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn (long long)mCurrentBucketStartTimeNs); return; } + mMatchedMetricDimensionKeys.insert(eventKey); flushIfNeededLocked(eventTimeNs); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 0cfefa9e3eb4..d1c2315b28be 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -52,7 +52,7 @@ public: // Process data pulled on bucket boundary. void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, - bool pullSuccess) override; + bool pullSuccess, int64_t originalPullTimeNs) override; // ValueMetric needs special logic if it's a pulled atom. void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid, @@ -116,6 +116,9 @@ private: // Value fields for matching. std::vector<Matcher> mFieldMatchers; + // Value fields for matching. + std::set<MetricDimensionKey> mMatchedMetricDimensionKeys; + // tagId for pulled data. -1 if this is not pulled const int mPullTagId; @@ -160,6 +163,9 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); + void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, + int64_t originalPullTimeNs, int64_t eventElapsedTimeNs); + ValueBucket buildPartialBucket(int64_t bucketEndTime, const std::vector<Interval>& intervals); void initCurrentSlicedBucket(); @@ -242,6 +248,10 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed); FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded); FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange); + FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled); + FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged); + FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary); + FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries); }; } // namespace statsd diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp index 1725160c00c7..62868232d8e7 100644 --- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp @@ -133,7 +133,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) { event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin(); EXPECT_EQ(INT, it->mValue.getType()); @@ -151,7 +151,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) { event2->write(25); event2->init(); allData.push_back(event2); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin(); EXPECT_EQ(INT, it->mValue.getType()); @@ -305,7 +305,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) { event->write(1); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -328,7 +328,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) { event->write(3); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs + bucketSizeNs); EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin() @@ -371,7 +371,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled) { event->write(1); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -440,7 +440,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition) { event->write(110); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin() @@ -541,7 +541,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition) { event->write(110); event->init(); allData.push_back(event); - gaugeProducer.onDataPulled(allData, /** succeed */ true); + gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size()); @@ -590,7 +590,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event1->write(13); event1->init(); - gaugeProducer.onDataPulled({event1}, /** succeed */ true); + gaugeProducer.onDataPulled({event1}, /** succeed */ true, bucketStartTimeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -604,7 +604,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event2->write(15); event2->init(); - gaugeProducer.onDataPulled({event2}, /** succeed */ true); + gaugeProducer.onDataPulled({event2}, /** succeed */ true, bucketStartTimeNs + bucketSizeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -619,7 +619,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { event3->write(26); event3->init(); - gaugeProducer.onDataPulled({event3}, /** succeed */ true); + gaugeProducer.onDataPulled({event3}, /** succeed */ true, bucket2StartTimeNs + 2 * bucketSizeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin() ->second.front() @@ -633,7 +633,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) { std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10); event4->write("some value"); event4->init(); - gaugeProducer.onDataPulled({event4}, /** succeed */ true); + gaugeProducer.onDataPulled({event4}, /** succeed */ true, bucketStartTimeNs + 3 * bucketSizeNs); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty()); } diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 91b98ecffa29..ae3cdbcb5eb4 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -160,7 +160,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -178,7 +178,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -198,7 +198,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -265,7 +265,7 @@ TEST(ValueMetricProducerTest, TestPartialBucketCreated) { event->write(2); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** success */ true); + valueProducer.onDataPulled(allData, /** success */ true, bucket2StartTimeNs); // Partial buckets created in 2nd bucket. valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 2, "com.foo", 10000, 1); @@ -327,7 +327,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = @@ -346,7 +346,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); // No new data seen, so data has been cleared. EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); @@ -363,7 +363,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -412,7 +412,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -428,7 +428,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->write(10); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -445,7 +445,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -493,7 +493,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -509,7 +509,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->write(10); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -524,7 +524,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, curInterval.hasBase); @@ -599,7 +599,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); @@ -710,7 +710,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); @@ -725,7 +725,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) { event->write(150); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size()); EXPECT_EQ(bucket3StartTimeNs, valueProducer.mCurrentBucketStartTimeNs); EXPECT_EQ(20L, @@ -764,7 +764,7 @@ TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1); @@ -1068,7 +1068,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -1086,7 +1086,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->write(23); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); // has one slice EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; @@ -1107,7 +1107,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) { event->write(36); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket6StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; // startUpdated:false sum:12 @@ -1195,7 +1195,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(false, curInterval.hasBase); @@ -1291,7 +1291,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { EXPECT_EQ(20, curInterval.value.long_value); EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); - // Now the alarm is delivered, but it is considered late, it has no effect + // Now the alarm is delivered, but it is considered late, the bucket is invalidated. vector<shared_ptr<LogEvent>> allData; allData.clear(); shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50); @@ -1299,10 +1299,10 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; - EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasBase); EXPECT_EQ(130, curInterval.base.long_value); EXPECT_EQ(true, curInterval.hasValue); EXPECT_EQ(20, curInterval.value.long_value); @@ -1752,7 +1752,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1842,7 +1842,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -1871,7 +1871,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { event1->write(5); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1893,7 +1893,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { event2->write(5); event2->init(); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket5StartTimeNs); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval2.hasBase); @@ -1968,7 +1968,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { allData.push_back(event1); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(true, interval1.hasBase); EXPECT_EQ(11, interval1.base.long_value); @@ -2000,7 +2000,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->write(5); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs); // Only one interval left. One was trimmed. EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); @@ -2018,7 +2018,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) { event1->write(14); event1->init(); allData.push_back(event1); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket5StartTimeNs); interval2 = valueProducer.mCurrentSlicedBucket.begin()->second[0]; EXPECT_EQ(true, interval2.hasBase); @@ -2078,7 +2078,7 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfB EXPECT_EQ(false, curInterval.hasValue); vector<shared_ptr<LogEvent>> allData; - valueProducer.onDataPulled(allData, /** succeed */ false); + valueProducer.onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs); EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); EXPECT_EQ(false, curInterval.hasBase); EXPECT_EQ(false, curInterval.hasValue); @@ -2179,7 +2179,7 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange) { valueProducer.mCondition = true; vector<shared_ptr<LogEvent>> allData; - valueProducer.onDataPulled(allData, /** succeed */ false); + valueProducer.onDataPulled(allData, /** succeed */ false, bucketStartTimeNs); EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); valueProducer.onConditionChanged(false, bucketStartTimeNs + 1); @@ -2361,7 +2361,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs); // This will fail and should invalidate the whole bucket since we do not have all the data // needed to compute the metric value when the screen was on. @@ -2375,7 +2375,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed) { event2->write(140); event2->init(); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1); @@ -2446,7 +2446,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ false); + valueProducer.onDataPulled(allData, /** succeed */ false, bucketStartTimeNs); valueProducer.onConditionChanged(false, bucketStartTimeNs + 2); valueProducer.onConditionChanged(true, bucketStartTimeNs + 3); @@ -2458,7 +2458,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed) { event2->write(140); event2->init(); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1); @@ -2529,7 +2529,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) { event->write(110); event->init(); allData.push_back(event); - valueProducer.onDataPulled(allData, /** succeed */ true); + valueProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs); // This will fail and should invalidate the whole bucket since we do not have all the data // needed to compute the metric value when the screen was on. @@ -2543,7 +2543,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) { event2->write(140); event2->init(); allData.push_back(event2); - valueProducer.onDataPulled(allData, /** succeed */ false); + valueProducer.onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs); valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1); @@ -2557,6 +2557,268 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) { EXPECT_EQ(false, valueProducer.mHasGlobalBase); } +TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Start bucket. + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(3); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, + logEventMatcherIndex, eventMatcherWizard, tagId, + bucketStartTimeNs, bucketStartTimeNs, pullerManager); + + // Bucket 2 start. + vector<shared_ptr<LogEvent>> allData; + allData.clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event->write(tagId); + event->write(110); + event->init(); + allData.push_back(event); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); + + // Bucket 3 empty. + allData.clear(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1); + event2->init(); + allData.push_back(event2); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs); + // Data has been trimmed. + EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); +} + +TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(3); + event->init(); + data->push_back(event); + return true; + })) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.onConditionChanged(true, bucketStartTimeNs + 10); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + + // Empty pull. + valueProducer.onConditionChanged(false, bucketStartTimeNs + 10); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(false, valueProducer.mHasGlobalBase); +} + +TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(1); + event->init(); + data->push_back(event); + return true; + })) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(2); + event->init(); + data->push_back(event); + return true; + })) + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(5); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.onConditionChanged(true, bucketStartTimeNs + 10); + valueProducer.onConditionChanged(false, bucketStartTimeNs + 11); + valueProducer.onConditionChanged(true, bucketStartTimeNs + 12); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + ValueMetricProducer::Interval& curInterval = + valueProducer.mCurrentSlicedBucket.begin()->second[0]; + EXPECT_EQ(true, curInterval.hasBase); + EXPECT_EQ(true, curInterval.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + + // End of bucket + vector<shared_ptr<LogEvent>> allData; + allData.clear(); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0]; + // Data is empty, base should be reset. + EXPECT_EQ(false, curInterval.hasBase); + EXPECT_EQ(5, curInterval.base.long_value); + EXPECT_EQ(false, curInterval.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + + EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); + EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[0].values[0].long_value); +} + + +TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.mutable_dimensions_in_what()->set_field(tagId); + metric.mutable_dimensions_in_what()->add_child()->set_field(1); + metric.set_condition(StringToId("SCREEN_ON")); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // First onConditionChanged + .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(1); + event->write(1); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + valueProducer.onConditionChanged(true, bucketStartTimeNs + 10); + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + + // End of bucket + vector<shared_ptr<LogEvent>> allData; + allData.clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event->write(2); + event->write(2); + event->init(); + allData.push_back(event); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + + // Key 1 should be reset since in not present in the most pull. + EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); + auto iterator = valueProducer.mCurrentSlicedBucket.begin(); + EXPECT_EQ(true, iterator->second[0].hasBase); + EXPECT_EQ(2, iterator->second[0].base.long_value); + EXPECT_EQ(false, iterator->second[0].hasValue); + iterator++; + EXPECT_EQ(false, iterator->second[0].hasBase); + EXPECT_EQ(1, iterator->second[0].base.long_value); + EXPECT_EQ(false, iterator->second[0].hasValue); + + EXPECT_EQ(true, valueProducer.mHasGlobalBase); +} + } // namespace statsd } // namespace os } // namespace android |