summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Olivier Gaillard <gaillard@google.com> 2019-02-06 15:10:36 +0000
committer Android (Google) Code Review <android-gerrit@google.com> 2019-02-06 15:10:36 +0000
commit8333721a67c317e0de5eb44395dd7150cf56e13c (patch)
treee34775f5c713d6468b716f0609d3c6f018aeccc4
parente8c4958b2f1bd08d87506eb6a86065daead2b74c (diff)
parent9a5d359879b5e2f7e7c0730b04242d81dd10a5c8 (diff)
Merge changes I300ab05c,I0910b7db
* changes: Adds the concept of invalid bucket. Reset the base when pull fails.
-rw-r--r--cmds/statsd/src/external/PullDataReceiver.h10
-rw-r--r--cmds/statsd/src/external/StatsPullerManager.cpp13
-rw-r--r--cmds/statsd/src/guardrail/StatsdStats.cpp5
-rw-r--r--cmds/statsd/src/guardrail/StatsdStats.h6
-rw-r--r--cmds/statsd/src/metrics/GaugeMetricProducer.cpp5
-rw-r--r--cmds/statsd/src/metrics/GaugeMetricProducer.h3
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp128
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.h24
-rw-r--r--cmds/statsd/src/stats_log.proto1
-rw-r--r--cmds/statsd/src/stats_log_util.cpp3
-rw-r--r--cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp22
-rw-r--r--cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp407
12 files changed, 527 insertions, 100 deletions
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index 0d505cb49e8f..b071682f8a59 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -28,9 +28,15 @@ namespace statsd {
class PullDataReceiver : virtual public RefBase{
public:
virtual ~PullDataReceiver() {}
- virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0;
+ /**
+ * @param data The pulled data.
+ * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
+ * bucket should be invalidated.
+ */
+ virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) = 0;
};
} // namespace statsd
} // namespace os
-} // namespace android \ No newline at end of file
+} // namespace android
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c69384c7077f..9b603d6c7957 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -358,12 +358,13 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data;
- if (!Pull(pullInfo.first, &data)) {
+ bool pullSuccess = Pull(pullInfo.first, &data);
+ if (pullSuccess) {
+ StatsdStats::getInstance().notePullDelay(
+ pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
+ } else {
VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
- continue;
}
- StatsdStats::getInstance().notePullDelay(pullInfo.first,
- getElapsedRealtimeNs() - elapsedTimeNs);
// Convention is to mark pull atom timestamp at request time.
// If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@ void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
for (const auto& receiverInfo : pullInfo.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
if (receiverPtr != nullptr) {
- receiverPtr->onDataPulled(data);
- // we may have just come out of a coma, compute next pull time
+ receiverPtr->onDataPulled(data, pullSuccess);
+ // We may have just come out of a coma, compute next pull time.
int numBucketsAhead =
(elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index 37ccad5f4a49..a5bd5c6b6364 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -448,6 +448,11 @@ void StatsdStats::noteConditionChangeInNextBucket(int metricId) {
getAtomMetricStats(metricId).conditionChangeInNextBucket++;
}
+void StatsdStats::noteInvalidatedBucket(int metricId) {
+ lock_guard<std::mutex> lock(mLock);
+ getAtomMetricStats(metricId).invalidatedBucket++;
+}
+
StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int metricId) {
auto atomMetricStatsIter = mAtomMetricStats.find(metricId);
if (atomMetricStatsIter != mAtomMetricStats.end()) {
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 01e9ca17e5fd..cb17061c1ed1 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -365,6 +365,11 @@ public:
void noteConditionChangeInNextBucket(int atomId);
/**
+ * A bucket has been tagged as invalid.
+ */
+ void noteInvalidatedBucket(int metricId);
+
+ /**
* Reset the historical stats. Including all stats in icebox, and the tracked stats about
* metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
* to collect stats after reset() has been called.
@@ -408,6 +413,7 @@ public:
long skippedForwardBuckets = 0;
long badValueType = 0;
long conditionChangeInNextBucket = 0;
+ long invalidatedBucket = 0;
} AtomMetricStats;
private:
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index c2878f02a691..c9b71659aa58 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -406,9 +406,10 @@ std::shared_ptr<vector<FieldValue>> GaugeMetricProducer::getGaugeFields(const Lo
return gaugeFields;
}
-void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess) {
std::lock_guard<std::mutex> lock(mMutex);
- if (allData.size() == 0) {
+ if (!pullSuccess || allData.size() == 0) {
return;
}
for (const auto& data : allData) {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index df0877954d70..d480941ed311 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -67,7 +67,8 @@ public:
virtual ~GaugeMetricProducer();
// Handles when the pulled data arrives.
- void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+ void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) override;
// GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 6aa8e842b021..9fb78e780870 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -104,6 +104,7 @@ ValueMetricProducer::ValueMetricProducer(
mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
mUseZeroDefaultBase(metric.use_zero_default_base()),
mHasGlobalBase(false),
+ mCurrentBucketIsInvalid(false),
mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
: StatsdStats::kPullMaxDelayNs),
mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()) {
@@ -308,6 +309,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
}
}
+void ValueMetricProducer::invalidateCurrentBucket() {
+ if (!mCurrentBucketIsInvalid) {
+ // Only report once per invalid bucket.
+ StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
+ }
+ mCurrentBucketIsInvalid = true;
+ resetBase();
+}
+
void ValueMetricProducer::resetBase() {
for (auto& slice : mCurrentSlicedBucket) {
for (auto& interval : slice.second) {
@@ -323,6 +333,7 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
+ invalidateCurrentBucket();
return;
}
@@ -346,19 +357,20 @@ void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
if (!mPullerManager->Pull(mPullTagId, &allData)) {
ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
- resetBase();
+ invalidateCurrentBucket();
return;
}
const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
if (pullDelayNs > mMaxPullDelayNs) {
ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
(long long)mMaxPullDelayNs);
StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
- StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
- resetBase();
+ // We are missing one pull from the bucket which means we will not have a complete view of
+ // what's going on.
+ invalidateCurrentBucket();
return;
}
- StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
if (timestampNs < mCurrentBucketStartTimeNs) {
// The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report
@@ -382,9 +394,16 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime
return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess) {
std::lock_guard<std::mutex> lock(mMutex);
if (mCondition) {
+ if (!pullSuccess) {
+ // If the pull failed, we won't be able to compute a diff.
+ invalidateCurrentBucket();
+ return;
+ }
+
if (allData.size() == 0) {
VLOG("Data pulled is empty");
StatsdStats::getInstance().noteEmptyData(mPullTagId);
@@ -399,12 +418,13 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
// if the diff base will be cleared and this new data will serve as new diff base.
int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
- if (bucketEndTime < mCurrentBucketStartTimeNs) {
+ bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
+ if (isEventLate) {
VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
(long long)mCurrentBucketStartTimeNs);
StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
- return;
}
+
for (const auto& data : allData) {
LogEvent localCopy = data->makeCopy();
if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
@@ -679,31 +699,13 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
(int)mCurrentSlicedBucket.size());
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
-
int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
- if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+ bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
+ if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
- ValueBucket bucket;
- bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
- bucket.mBucketEndNs = bucketEndTime;
- for (const auto& interval : slice.second) {
- if (interval.hasValue) {
- // skip the output if the diff is zero
- if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
- continue;
- }
- bucket.valueIndex.push_back(interval.valueIndex);
- if (mAggregationType != ValueMetric::AVG) {
- bucket.values.push_back(interval.value);
- } else {
- double sum = interval.value.type == LONG ? (double)interval.value.long_value
- : interval.value.double_value;
- bucket.values.push_back(Value((double)sum / interval.sampleSize));
- }
- }
- }
+ ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
// it will auto create new vector of ValuebucketInfo if the key is not found.
if (bucket.valueIndex.size() > 0) {
auto& bucketList = mPastBuckets[slice.first];
@@ -714,6 +716,58 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
}
+ if (!mCurrentBucketIsInvalid) {
+ appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
+ }
+ initCurrentSlicedBucket();
+ mCurrentBucketIsInvalid = false;
+}
+
+ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
+ const std::vector<Interval>& intervals) {
+ ValueBucket bucket;
+ bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
+ bucket.mBucketEndNs = bucketEndTime;
+ for (const auto& interval : intervals) {
+ if (interval.hasValue) {
+ // skip the output if the diff is zero
+ if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
+ continue;
+ }
+ bucket.valueIndex.push_back(interval.valueIndex);
+ if (mAggregationType != ValueMetric::AVG) {
+ bucket.values.push_back(interval.value);
+ } else {
+ double sum = interval.value.type == LONG ? (double)interval.value.long_value
+ : interval.value.double_value;
+ bucket.values.push_back(Value((double)sum / interval.sampleSize));
+ }
+ }
+ }
+ return bucket;
+}
+
+void ValueMetricProducer::initCurrentSlicedBucket() {
+ for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
+ bool obsolete = true;
+ for (auto& interval : it->second) {
+ interval.hasValue = false;
+ interval.sampleSize = 0;
+ if (interval.seenNewData) {
+ obsolete = false;
+ }
+ interval.seenNewData = false;
+ }
+
+ if (obsolete) {
+ it = mCurrentSlicedBucket.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
+void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker.
// Accumulate partial buckets with current value and then send to anomaly tracker.
if (mCurrentFullBucket.size() > 0) {
@@ -751,24 +805,6 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
}
}
-
- for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
- bool obsolete = true;
- for (auto& interval : it->second) {
- interval.hasValue = false;
- interval.sampleSize = 0;
- if (interval.seenNewData) {
- obsolete = false;
- }
- interval.seenNewData = false;
- }
-
- if (obsolete) {
- it = mCurrentSlicedBucket.erase(it);
- } else {
- it++;
- }
- }
}
size_t ValueMetricProducer::byteSizeLocked() const {
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a8dfc5ba0e5d..d9bec5d588be 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -51,7 +51,8 @@ public:
virtual ~ValueMetricProducer();
// Process data pulled on bucket boundary.
- void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+ void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) override;
// ValueMetric needs special logic if it's a pulled atom.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -102,6 +103,9 @@ private:
// Calculate previous bucket end time based on current time.
int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);
+ // Mark the data as invalid.
+ void invalidateCurrentBucket();
+
const int mWhatMatcherIndex;
sp<EventMatcherWizard> mEventMatcherWizard;
@@ -155,6 +159,11 @@ private:
void pullAndMatchEventsLocked(const int64_t timestampNs);
+ ValueBucket buildPartialBucket(int64_t bucketEndTime,
+ const std::vector<Interval>& intervals);
+ void initCurrentSlicedBucket();
+ void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);
+
// Reset diff base and mHasGlobalBase
void resetBase();
@@ -186,6 +195,12 @@ private:
// diff against.
bool mHasGlobalBase;
+ // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot
+ // trust any of the data in this bucket.
+ //
+ // For instance, one pull failed.
+ bool mCurrentBucketIsInvalid;
+
const int64_t mMaxPullDelayNs;
const bool mSplitBucketForAppUpgrade;
@@ -216,8 +231,13 @@ private:
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
+ FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
};
} // namespace statsd
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index cca09ac017a3..5b9148283bdc 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -417,6 +417,7 @@ message StatsdStatsReport {
optional int64 skipped_forward_buckets = 4;
optional int64 bad_value_type = 5;
optional int64 condition_change_in_next_bucket = 6;
+ optional int64 invalidated_bucket = 7;
}
repeated AtomMetricStats atom_metric_stats = 17;
diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp
index 9c9985ed271c..3cb7563b206b 100644
--- a/cmds/statsd/src/stats_log_util.cpp
+++ b/cmds/statsd/src/stats_log_util.cpp
@@ -78,6 +78,7 @@ const int FIELD_ID_LATE_LOG_EVENT_SKIPPED = 3;
const int FIELD_ID_SKIPPED_FORWARD_BUCKETS = 4;
const int FIELD_ID_BAD_VALUE_TYPE = 5;
const int FIELD_ID_CONDITION_CHANGE_IN_NEXT_BUCKET = 6;
+const int FIELD_ID_INVALIDATED_BUCKET = 7;
namespace {
@@ -494,6 +495,8 @@ void writeAtomMetricStatsToStream(const std::pair<int, StatsdStats::AtomMetricSt
(long long)pair.second.badValueType);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_CHANGE_IN_NEXT_BUCKET,
(long long)pair.second.conditionChangeInNextBucket);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_INVALIDATED_BUCKET,
+ (long long)pair.second.invalidatedBucket);
protoOutput->end(token);
}
diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
index 0ffbb54c8d48..1725160c00c7 100644
--- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
@@ -133,7 +133,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) {
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
EXPECT_EQ(INT, it->mValue.getType());
@@ -151,7 +151,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) {
event2->write(25);
event2->init();
allData.push_back(event2);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
EXPECT_EQ(INT, it->mValue.getType());
@@ -305,7 +305,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) {
event->write(1);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -328,7 +328,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) {
event->write(3);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -371,7 +371,7 @@ TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled) {
event->write(1);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -440,7 +440,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition) {
event->write(110);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -541,7 +541,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition) {
event->write(110);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
@@ -590,7 +590,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) {
event1->write(13);
event1->init();
- gaugeProducer.onDataPulled({event1});
+ gaugeProducer.onDataPulled({event1}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -604,7 +604,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) {
event2->write(15);
event2->init();
- gaugeProducer.onDataPulled({event2});
+ gaugeProducer.onDataPulled({event2}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -619,7 +619,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) {
event3->write(26);
event3->init();
- gaugeProducer.onDataPulled({event3});
+ gaugeProducer.onDataPulled({event3}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -633,7 +633,7 @@ TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) {
std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10);
event4->write("some value");
event4->init();
- gaugeProducer.onDataPulled({event4});
+ gaugeProducer.onDataPulled({event4}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty());
}
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index c0648ee70032..64045520fb0b 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -160,7 +160,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -177,7 +177,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
event->write(23);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -196,7 +196,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -256,7 +256,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval =
@@ -274,7 +274,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) {
event->write(23);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -292,7 +292,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) {
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -341,7 +341,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -357,7 +357,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
event->write(10);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -373,7 +373,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset) {
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
@@ -420,7 +420,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -436,7 +436,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) {
event->write(10);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -451,7 +451,7 @@ TEST(ValueMetricProducerTest, TestPulledEventsTakeZeroOnReset) {
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
@@ -525,7 +525,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
@@ -635,7 +635,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -650,7 +650,7 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
event->write(150);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
EXPECT_EQ(20L,
@@ -689,7 +689,7 @@ TEST(ValueMetricProducerTest, TestPulledWithAppUpgradeDisabled) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -993,7 +993,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1011,7 +1011,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
event->write(23);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1032,7 +1032,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
// startUpdated:false sum:12
@@ -1120,7 +1120,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(false, curInterval.hasBase);
@@ -1224,7 +1224,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
@@ -1677,7 +1677,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) {
allData.push_back(event1);
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval1.hasBase);
EXPECT_EQ(11, interval1.base.long_value);
@@ -1762,7 +1762,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) {
allData.push_back(event1);
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval1.hasBase);
EXPECT_EQ(11, interval1.base.long_value);
@@ -1791,7 +1791,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) {
event1->write(5);
event1->init();
allData.push_back(event1);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
@@ -1813,7 +1813,7 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) {
event2->write(5);
event2->init();
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
@@ -1888,7 +1888,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
allData.push_back(event1);
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval1.hasBase);
EXPECT_EQ(11, interval1.base.long_value);
@@ -1918,7 +1918,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
event1->write(5);
event1->init();
allData.push_back(event1);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
@@ -1941,7 +1941,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
event1->write(13);
event1->init();
allData.push_back(event1);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
@@ -1951,7 +1951,60 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
}
-TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) {
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ // Used by onConditionChanged.
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(100);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+ // has one slice
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+
+ vector<shared_ptr<LogEvent>> allData;
+ valueProducer.onDataPulled(allData, /** succeed */ false);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange) {
ValueMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
@@ -2007,6 +2060,56 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) {
EXPECT_EQ(false, valueProducer.mHasGlobalBase);
}
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(100);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.mCondition = true;
+
+ vector<shared_ptr<LogEvent>> allData;
+ valueProducer.onDataPulled(allData, /** succeed */ false);
+ EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+
+ valueProducer.onConditionChanged(false, bucketStartTimeNs + 1);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
ValueMetric metric;
metric.set_id(metricId);
@@ -2052,7 +2155,7 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
@@ -2073,6 +2176,250 @@ TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
EXPECT_EQ(false, valueProducer.mHasGlobalBase);
}
+TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // First onConditionChanged
+ .WillOnce(Return(false))
+ // Second onConditionChanged
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(130);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.mCondition = true;
+
+ // Bucket start.
+ vector<shared_ptr<LogEvent>> allData;
+ allData.clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+ event->write(1);
+ event->write(110);
+ event->init();
+ allData.push_back(event);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
+
+ // This will fail and should invalidate the whole bucket since we do not have all the data
+ // needed to compute the metric value when the screen was on.
+ valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
+
+ // Bucket end.
+ allData.clear();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event2->write(1);
+ event2->write(140);
+ event2->init();
+ allData.push_back(event2);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
+
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
+
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+ // Contains base from last pull which was successful.
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(140, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // First onConditionChanged
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(120);
+ event->init();
+ data->push_back(event);
+ return true;
+ }))
+ // Second onConditionChanged
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(130);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.mCondition = true;
+
+ // Bucket start.
+ vector<shared_ptr<LogEvent>> allData;
+ allData.clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+ event->write(1);
+ event->write(110);
+ event->init();
+ allData.push_back(event);
+ valueProducer.onDataPulled(allData, /** succeed */ false);
+
+ valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
+
+ // Bucket end.
+ allData.clear();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event2->write(1);
+ event2->write(140);
+ event2->init();
+ allData.push_back(event2);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
+
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
+
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+ // Contains base from last pull which was successful.
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(140, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // First onConditionChanged
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(120);
+ event->init();
+ data->push_back(event);
+ return true;
+ }))
+ // Second onConditionChanged
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(130);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.mCondition = true;
+
+ // Bucket start.
+ vector<shared_ptr<LogEvent>> allData;
+ allData.clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+ event->write(1);
+ event->write(110);
+ event->init();
+ allData.push_back(event);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
+
+ // This will fail and should invalidate the whole bucket since we do not have all the data
+ // needed to compute the metric value when the screen was on.
+ valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
+
+ // Bucket end.
+ allData.clear();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event2->write(1);
+ event2->write(140);
+ event2->init();
+ allData.push_back(event2);
+ valueProducer.onDataPulled(allData, /** succeed */ false);
+
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
+
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+ // Last pull failed so based has been reset.
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
} // namespace statsd
} // namespace os
} // namespace android