summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp164
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.h23
-rw-r--r--cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp278
3 files changed, 323 insertions, 142 deletions
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 8d1cf33b0001..8902774503f3 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -174,13 +174,17 @@ void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition
}
void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
- flushIfNeededLocked(dropTimeNs);
StatsdStats::getInstance().noteBucketDropped(mMetricId);
- mPastBuckets.clear();
+ // We are going to flush the data without doing a pull first so we need to invalidte the data.
+ bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
+ if (pullNeeded) {
+ invalidateCurrentBucket();
+ }
+ flushIfNeededLocked(dropTimeNs);
+ clearPastBucketsLocked(dropTimeNs);
}
void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
- flushIfNeededLocked(dumpTimeNs);
mPastBuckets.clear();
mSkippedBuckets.clear();
}
@@ -192,7 +196,6 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
std::set<string> *str_set,
ProtoOutputStream* protoOutput) {
VLOG("metric %lld dump report now...", (long long)mMetricId);
- flushIfNeededLocked(dumpTimeNs);
if (include_current_partial_bucket) {
// For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
// current bucket will have incomplete data and the next will have the wrong snapshot to do
@@ -208,7 +211,7 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
pullAndMatchEventsLocked(dumpTimeNs);
break;
}
- }
+ }
flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
}
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
@@ -325,12 +328,16 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
}
}
-void ValueMetricProducer::invalidateCurrentBucket() {
+void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
if (!mCurrentBucketIsInvalid) {
// Only report once per invalid bucket.
StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
}
mCurrentBucketIsInvalid = true;
+}
+
+void ValueMetricProducer::invalidateCurrentBucket() {
+ invalidateCurrentBucketWithoutResetBase();
resetBase();
}
@@ -345,42 +352,58 @@ void ValueMetricProducer::resetBase() {
void ValueMetricProducer::onConditionChangedLocked(const bool condition,
const int64_t eventTimeNs) {
- if (eventTimeNs < mCurrentBucketStartTimeNs) {
- VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
- (long long)mCurrentBucketStartTimeNs);
- StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
- invalidateCurrentBucket();
- return;
- }
-
- flushIfNeededLocked(eventTimeNs);
+ bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
+ if (!isEventTooLate) {
+ if (mCondition == ConditionState::kUnknown) {
+ // If the condition was unknown, we mark the bucket as invalid since the bucket will
+ // contain partial data. For instance, the condition change might happen close to the
+ // end of the bucket and we might miss lots of data.
+ //
+ // We still want to pull to set the base.
+ invalidateCurrentBucket();
+ }
- if (mCondition != ConditionState::kUnknown) {
// Pull on condition changes.
- bool conditionChanged = mCondition != condition;
+ bool conditionChanged =
+ (mCondition == ConditionState::kTrue && condition == ConditionState::kFalse)
+ || (mCondition == ConditionState::kFalse && condition == ConditionState::kTrue);
// We do not need to pull when we go from unknown to false.
- if (mIsPulled && conditionChanged) {
+ //
+ // We also pull if the condition was already true in order to be able to flush the bucket at
+ // the end if needed.
+ //
+ // onConditionChangedLocked might happen on bucket boundaries if this is called before
+ // #onDataPulled.
+ if (mIsPulled && (conditionChanged || condition)) {
pullAndMatchEventsLocked(eventTimeNs);
}
- // when condition change from true to false, clear diff base but don't
+ // When condition change from true to false, clear diff base but don't
// reset other counters as we may accumulate more value in the bucket.
- if (mUseDiff && mCondition == ConditionState::kTrue && condition == ConditionState::kFalse) {
+ if (mUseDiff && mCondition == ConditionState::kTrue
+ && condition == ConditionState::kFalse) {
resetBase();
}
+ mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
+
} else {
- // If the condition was unknown, we mark the bucket as invalid since the bucket will contain
- // partial data. For instance, the condition change might happen close to the end of the
- // bucket and we might miss lots of data.
+ VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+ (long long)mCurrentBucketStartTimeNs);
+ StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
invalidateCurrentBucket();
+ // Something weird happened. If we received another event if the future, the condition might
+ // be wrong.
+ mCondition = ConditionState::kUnknown;
}
- mCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
+
+ // This part should alway be called.
+ flushIfNeededLocked(eventTimeNs);
}
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
if (!mPullerManager->Pull(mPullTagId, &allData)) {
- ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
+ ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
invalidateCurrentBucket();
return;
}
@@ -392,38 +415,46 @@ int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTime
return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}
+// By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
+// to be delayed. Other events like condition changes or app upgrade which are not based on
+// AlarmManager might have arrived earlier and close the bucket.
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
bool pullSuccess, int64_t originalPullTimeNs) {
std::lock_guard<std::mutex> lock(mMutex);
- if (mCondition == ConditionState::kTrue) {
- if (!pullSuccess) {
+ if (mCondition == ConditionState::kTrue) {
// If the pull failed, we won't be able to compute a diff.
- invalidateCurrentBucket();
- return;
+ if (!pullSuccess) {
+ invalidateCurrentBucket();
+ } else {
+ bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
+ if (isEventLate) {
+ // If the event is late, we are in the middle of a bucket. Just
+ // process the data without trying to snap the data to the nearest bucket.
+ accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
+ } else {
+ // For scheduled pulled data, the effective event time is snap to the nearest
+ // bucket end. In the case of waking up from a deep sleep state, we will
+ // attribute to the previous bucket end. If the sleep was long but not very
+ // long, we will be in the immediate next bucket. Previous bucket may get a
+ // larger number as we pull at a later time than real bucket end.
+ //
+ // If the sleep was very long, we skip more than one bucket before sleep. In
+ // this case, if the diff base will be cleared and this new data will serve as
+ // new diff base.
+ int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
+ StatsdStats::getInstance().noteBucketBoundaryDelayNs(
+ mMetricId, originalPullTimeNs - bucketEndTime);
+ accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
+ }
+ }
}
- // For scheduled pulled data, the effective event time is snap to the nearest
- // bucket end. In the case of waking up from a deep sleep state, we will
- // attribute to the previous bucket end. If the sleep was long but not very long, we
- // will be in the immediate next bucket. Previous bucket may get a larger number as
- // we pull at a later time than real bucket end.
- // If the sleep was very long, we skip more than one bucket before sleep. In this case,
- // if the diff base will be cleared and this new data will serve as new diff base.
- int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
- StatsdStats::getInstance().noteBucketBoundaryDelayNs(
- mMetricId, originalPullTimeNs - bucketEndTime);
- accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
-
- // We can probably flush the bucket. Since we used bucketEndTime when calling
- // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
- flushIfNeededLocked(originalPullTimeNs);
-
- } else {
- VLOG("No need to commit data on condition false.");
- }
+ // We can probably flush the bucket. Since we used bucketEndTime when calling
+ // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
+ flushIfNeededLocked(originalPullTimeNs);
}
-void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
+void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
if (isEventLate) {
@@ -463,7 +494,7 @@ void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<Log
// If the new pulled data does not contains some keys we track in our intervals, we need to
// reset the base.
for (auto& slice : mCurrentSlicedBucket) {
- bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
+ bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
!= mMatchedMetricDimensionKeys.end();
if (!presentInPulledData) {
for (auto& interval : slice.second) {
@@ -587,7 +618,10 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
}
mMatchedMetricDimensionKeys.insert(eventKey);
- flushIfNeededLocked(eventTimeNs);
+ if (!mIsPulled) {
+ // We cannot flush without doing a pull first.
+ flushIfNeededLocked(eventTimeNs);
+ }
// For pulled data, we already check condition when we decide to pull or
// in onDataPulled. So take all of them.
@@ -722,26 +756,26 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
}
}
+// For pulled metrics, we always need to make sure we do a pull before flushing the bucket
+// if mCondition is true!
void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
-
if (eventTimeNs < currentBucketEndTimeNs) {
VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
(long long)(currentBucketEndTimeNs));
return;
}
-
- int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
+ int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
+}
- mCurrentBucketNum += numBucketsForward;
- if (numBucketsForward > 1) {
- VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
- StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
- // take base again in future good bucket.
- resetBase();
+int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
+ int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
+ if (eventTimeNs < currentBucketEndTimeNs) {
+ return 0;
}
+ return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
}
void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
@@ -750,6 +784,16 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
}
+ int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
+ mCurrentBucketNum += numBucketsForward;
+ if (numBucketsForward > 1) {
+ VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
+ StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
+ // Something went wrong. Maybe the device was sleeping for a long time. It is better
+ // to mark the current bucket as invalid. The last pull might have been successful through.
+ invalidateCurrentBucketWithoutResetBase();
+ }
+
VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
(int)mCurrentSlicedBucket.size());
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 24e14b1ffc92..632479797cd6 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -39,6 +39,13 @@ struct ValueBucket {
std::vector<Value> values;
};
+
+// Aggregates values within buckets.
+//
+// There are different events that might complete a bucket
+// - a condition change
+// - an app upgrade
+// - an alarm set to the end of the bucket
class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
public:
ValueMetricProducer(const ConfigKey& key, const ValueMetric& valueMetric,
@@ -61,9 +68,8 @@ public:
if (!mSplitBucketForAppUpgrade) {
return;
}
- flushIfNeededLocked(eventTimeNs - 1);
if (mIsPulled && mCondition) {
- pullAndMatchEventsLocked(eventTimeNs - 1);
+ pullAndMatchEventsLocked(eventTimeNs);
}
flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
};
@@ -94,9 +100,12 @@ private:
void dumpStatesLocked(FILE* out, bool verbose) const override;
- // Util function to flush the old packet.
+ // For pulled metrics, this method should only be called if a pulled have be done. Else we will
+ // not have complete data for the bucket.
void flushIfNeededLocked(const int64_t& eventTime) override;
+ // For pulled metrics, this method should only be called if a pulled have be done. Else we will
+ // not have complete data for the bucket.
void flushCurrentBucketLocked(const int64_t& eventTimeNs,
const int64_t& nextBucketStartTimeNs) override;
@@ -105,8 +114,12 @@ private:
// Calculate previous bucket end time based on current time.
int64_t calcPreviousBucketEndTime(const int64_t currentTimeNs);
+ // Calculate how many buckets are present between the current bucket and eventTimeNs.
+ int64_t calcBucketsForwardCount(const int64_t& eventTimeNs) const;
+
// Mark the data as invalid.
void invalidateCurrentBucket();
+ void invalidateCurrentBucketWithoutResetBase();
const int mWhatMatcherIndex;
@@ -256,6 +269,10 @@ private:
FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade);
+ FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
+ FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
friend class ValueMetricProducerTestHelper;
};
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 28caedea5b79..01a0c91836d0 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -52,10 +52,34 @@ const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs;
const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs;
double epsilon = 0.001;
+static void assertPastBucketValuesSingleKey(
+ const std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>>& mPastBuckets,
+ const std::initializer_list<int>& expectedValuesList) {
+
+ std::vector<int> expectedValues(expectedValuesList);
+ ASSERT_EQ(1, mPastBuckets.size());
+ ASSERT_EQ(expectedValues.size(), mPastBuckets.begin()->second.size());
+
+ auto buckets = mPastBuckets.begin()->second;
+ for (int i = 0; i < expectedValues.size(); i++) {
+ EXPECT_EQ(expectedValues[i], buckets[i].values[0].long_value)
+ << "Values differ at index " << i;
+ }
+}
+
class ValueMetricProducerTestHelper {
public:
+ static shared_ptr<LogEvent> createEvent(int64_t eventTimeNs, int64_t value) {
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, eventTimeNs);
+ event->write(tagId);
+ event->write(value);
+ event->write(value);
+ event->init();
+ return event;
+ }
+
static sp<ValueMetricProducer> createValueProducerNoConditions(
sp<MockStatsPullerManager>& pullerManager, ValueMetric& metric) {
UidMap uidMap;
@@ -541,7 +565,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
- event->write(120);
+ event->write(130);
event->init();
data->push_back(event);
return true;
@@ -569,6 +593,7 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
event->init();
allData.push_back(event);
valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {10});
// has one slice
EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
@@ -577,16 +602,15 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
EXPECT_EQ(110, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
EXPECT_EQ(10, curInterval.value.long_value);
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.size());
- EXPECT_EQ(10, valueProducer->mPastBuckets.begin()->second[0].values[0].long_value);
valueProducer->onConditionChanged(false, bucket2StartTimeNs + 1);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {10});
// has one slice
EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasValue);
- EXPECT_EQ(10, curInterval.value.long_value);
+ EXPECT_EQ(20, curInterval.value.long_value);
EXPECT_EQ(false, curInterval.hasBase);
}
@@ -805,10 +829,8 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(30, curInterval.value.long_value);
- valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value);
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {30});
}
TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
@@ -872,10 +894,8 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(50, curInterval.value.long_value);
- valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value);
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {50});
}
TEST(ValueMetricProducerTest, TestAnomalyDetection) {
@@ -1006,8 +1026,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
EXPECT_EQ(true, curInterval.hasBase);
EXPECT_EQ(23, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.size());
- EXPECT_EQ(12, valueProducer->mPastBuckets.begin()->second.back().values[0].long_value);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12});
// pull 3 come late.
// The previous bucket gets closed with error. (Has start value 23, no ending)
@@ -1026,9 +1045,7 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
EXPECT_EQ(true, curInterval.hasBase);
EXPECT_EQ(36, curInterval.base.long_value);
EXPECT_EQ(false, curInterval.hasValue);
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.begin()->second.size());
- EXPECT_EQ(12, valueProducer->mPastBuckets.begin()->second.back().values[0].long_value);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12});
}
/*
@@ -1076,27 +1093,19 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
// pull on bucket boundary come late, condition change happens before it
valueProducer->onConditionChanged(false, bucket2StartTimeNs + 1);
curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20});
EXPECT_EQ(false, curInterval.hasBase);
- EXPECT_EQ(true, curInterval.hasValue);
- EXPECT_EQ(20, curInterval.value.long_value);
- EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
// Now the alarm is delivered.
// since the condition turned to off before this pull finish, it has no effect
vector<shared_ptr<LogEvent>> allData;
- allData.clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30);
- event->write(1);
- event->write(110);
- event->init();
- allData.push_back(event);
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 30, 110));
valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20});
curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(false, curInterval.hasBase);
- EXPECT_EQ(true, curInterval.hasValue);
- EXPECT_EQ(20, curInterval.value.long_value);
- EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
+ EXPECT_EQ(false, curInterval.hasValue);
}
/*
@@ -1155,37 +1164,37 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
// pull on bucket boundary come late, condition change happens before it
valueProducer->onConditionChanged(false, bucket2StartTimeNs + 1);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20});
+ EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(false, curInterval.hasBase);
- EXPECT_EQ(true, curInterval.hasValue);
- EXPECT_EQ(20, curInterval.value.long_value);
- EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
+ EXPECT_EQ(false, curInterval.hasValue);
// condition changed to true again, before the pull alarm is delivered
valueProducer->onConditionChanged(true, bucket2StartTimeNs + 25);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20});
curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
EXPECT_EQ(130, curInterval.base.long_value);
- EXPECT_EQ(true, curInterval.hasValue);
- EXPECT_EQ(20, curInterval.value.long_value);
- EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
+ EXPECT_EQ(false, curInterval.hasValue);
- // Now the alarm is delivered, but it is considered late, the bucket is invalidated.
+ // Now the alarm is delivered, but it is considered late, the data will be used
+ // for the new bucket since it was just pulled.
vector<shared_ptr<LogEvent>> allData;
- allData.clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50);
- event->write(1);
- event->write(110);
- event->init();
- allData.push_back(event);
- valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 50, 140));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs + 50);
curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
- EXPECT_EQ(false, curInterval.hasBase);
- EXPECT_EQ(130, curInterval.base.long_value);
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(140, curInterval.base.long_value);
EXPECT_EQ(true, curInterval.hasValue);
- EXPECT_EQ(20, curInterval.value.long_value);
- EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
+ EXPECT_EQ(10, curInterval.value.long_value);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20});
+
+ allData.clear();
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs, 160));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {20, 30});
}
TEST(ValueMetricProducerTest, TestPushedAggregateMin) {
@@ -1227,10 +1236,8 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMin) {
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(10, curInterval.value.long_value);
- valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value);
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {10});
}
TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
@@ -1273,9 +1280,9 @@ TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
EXPECT_EQ(20, curInterval.value.long_value);
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value);
+ /* EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); */
+ /* EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); */
+ /* EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); */
}
TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
@@ -1320,7 +1327,7 @@ TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
EXPECT_EQ(25, curInterval.value.long_value);
EXPECT_EQ(2, curInterval.sampleSize);
- valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
EXPECT_TRUE(std::abs(valueProducer.mPastBuckets.begin()->second.back().values[0].double_value - 12.5) < epsilon);
@@ -1365,10 +1372,8 @@ TEST(ValueMetricProducerTest, TestPushedAggregateSum) {
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(25, curInterval.value.long_value);
- valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
- EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
- EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value);
+ valueProducer.flushIfNeededLocked(bucket2StartTimeNs);
+ assertPastBucketValuesSingleKey(valueProducer.mPastBuckets, {25});
}
TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) {
@@ -1700,10 +1705,8 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) {
EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
- EXPECT_EQ(4, interval2.base.long_value);
+ EXPECT_EQ(5, interval2.base.long_value);
EXPECT_EQ(false, interval2.hasValue);
- EXPECT_EQ(true, interval1.hasBase);
- EXPECT_EQ(false, interval1.hasValue);
EXPECT_EQ(true, valueProducer->mHasGlobalBase);
EXPECT_EQ(2UL, valueProducer->mPastBuckets.size());
@@ -1721,14 +1724,16 @@ TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) {
valueProducer->onDataPulled(allData, /** succeed */ true, bucket5StartTimeNs);
EXPECT_EQ(2UL, valueProducer->mCurrentSlicedBucket.size());
- EXPECT_EQ(true, interval2.hasBase);
- EXPECT_EQ(5, interval2.base.long_value);
- EXPECT_EQ(false, interval2.hasValue);
- EXPECT_EQ(5, interval2.value.long_value);
- EXPECT_EQ(true, interval1.hasBase);
- EXPECT_EQ(13, interval1.base.long_value);
- EXPECT_EQ(false, interval1.hasValue);
- EXPECT_EQ(8, interval1.value.long_value);
+ auto it1 = std::next(valueProducer->mCurrentSlicedBucket.begin())->second[0];
+ EXPECT_EQ(true, it1.hasBase);
+ EXPECT_EQ(13, it1.base.long_value);
+ EXPECT_EQ(false, it1.hasValue);
+ EXPECT_EQ(8, it1.value.long_value);
+ auto it2 = valueProducer->mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, it2.hasBase);
+ EXPECT_EQ(5, it2.base.long_value);
+ EXPECT_EQ(false, it2.hasValue);
+ EXPECT_EQ(5, it2.value.long_value);
EXPECT_EQ(true, valueProducer->mHasGlobalBase);
EXPECT_EQ(2UL, valueProducer->mPastBuckets.size());
}
@@ -1785,8 +1790,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
EXPECT_EQ(false, interval1.hasValue);
EXPECT_EQ(8, interval1.value.long_value);
EXPECT_FALSE(interval1.seenNewData);
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.size());
- EXPECT_EQ(8, valueProducer->mPastBuckets.begin()->second[0].values[0].long_value);
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {8});
auto it = valueProducer->mCurrentSlicedBucket.begin();
for (; it != valueProducer->mCurrentSlicedBucket.end(); it++) {
@@ -1801,7 +1805,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
EXPECT_EQ(4, interval2.base.long_value);
EXPECT_EQ(false, interval2.hasValue);
EXPECT_FALSE(interval2.seenNewData);
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.size());
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {8});
// next pull somehow did not happen, skip to end of bucket 3
allData.clear();
@@ -1811,7 +1815,6 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
event1->init();
allData.push_back(event1);
valueProducer->onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
-
// Only one interval left. One was trimmed.
EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
interval2 = valueProducer->mCurrentSlicedBucket.begin()->second[0];
@@ -1820,7 +1823,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
EXPECT_EQ(5, interval2.base.long_value);
EXPECT_EQ(false, interval2.hasValue);
EXPECT_FALSE(interval2.seenNewData);
- EXPECT_EQ(1UL, valueProducer->mPastBuckets.size());
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {8});
allData.clear();
event1 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1);
@@ -1835,7 +1838,7 @@ TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey) {
EXPECT_EQ(14, interval2.base.long_value);
EXPECT_EQ(false, interval2.hasValue);
EXPECT_FALSE(interval2.seenNewData);
- EXPECT_EQ(2UL, valueProducer->mPastBuckets.size());
+ ASSERT_EQ(2UL, valueProducer->mPastBuckets.size());
auto iterator = valueProducer->mPastBuckets.begin();
EXPECT_EQ(9, iterator->second[0].values[0].long_value);
iterator++;
@@ -2106,7 +2109,7 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit) {
// First onConditionChanged
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
for (int i = 0; i < 2000; i++) {
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
event->write(i);
event->write(i);
event->init();
@@ -2117,9 +2120,9 @@ TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit) {
sp<ValueMetricProducer> valueProducer =
ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
-
valueProducer->mCondition = ConditionState::kFalse;
- valueProducer->onConditionChanged(true, bucket2StartTimeNs + 2);
+
+ valueProducer->onConditionChanged(true, bucketStartTimeNs + 2);
EXPECT_EQ(true, valueProducer->mCurrentBucketIsInvalid);
EXPECT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size());
}
@@ -2493,6 +2496,123 @@ TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid) {
EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
}
+TEST(ValueMetricProducerTest, TestBucketBoundariesOnConditionChange) {
+ ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // Second onConditionChanged.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 10, 5));
+ return true;
+ }))
+ // Third onConditionChanged.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucket3StartTimeNs + 10, 7));
+ return true;
+ }));
+
+ sp<ValueMetricProducer> valueProducer =
+ ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+ valueProducer->mCondition = ConditionState::kUnknown;
+
+ valueProducer->onConditionChanged(false, bucketStartTimeNs);
+ ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size());
+
+ // End of first bucket
+ vector<shared_ptr<LogEvent>> allData;
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 1, 4));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs + 1);
+ ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size());
+
+ valueProducer->onConditionChanged(true, bucket2StartTimeNs + 10);
+ ASSERT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
+ auto curInterval = valueProducer->mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(5, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+
+ valueProducer->onConditionChanged(false, bucket3StartTimeNs + 10);
+
+ // Bucket should have been completed.
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {2});
+}
+
+TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff) {
+ ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
+ metric.set_use_diff(false);
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ sp<ValueMetricProducer> valueProducer =
+ ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric);
+
+ vector<shared_ptr<LogEvent>> allData;
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 30, 10));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucketStartTimeNs + 30);
+
+ allData.clear();
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+
+ // Bucket should have been completed.
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {30});
+}
+
+TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff) {
+ ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // Initialization.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1));
+ return true;
+ }));
+
+ sp<ValueMetricProducer> valueProducer =
+ ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric);
+
+ vector<shared_ptr<LogEvent>> allData;
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs + 30, 10));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucketStartTimeNs + 30);
+
+ allData.clear();
+ allData.push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs, 20));
+ valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+
+ // Bucket should have been completed.
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {19});
+}
+
+TEST(ValueMetricProducerTest, TestBucketBoundariesOnAppUpgrade) {
+ ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ // Initialization.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucketStartTimeNs, 1));
+ return true;
+ }))
+ // notifyAppUpgrade.
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ data->push_back(ValueMetricProducerTestHelper::createEvent(bucket2StartTimeNs + 2, 10));
+ return true;
+ }));
+
+ sp<ValueMetricProducer> valueProducer =
+ ValueMetricProducerTestHelper::createValueProducerNoConditions(pullerManager, metric);
+
+ valueProducer->notifyAppUpgrade(bucket2StartTimeNs + 2, "com.foo", 10000, 1);
+
+ // Bucket should have been completed.
+ assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9});
+}
+
static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) {
vector<uint8_t> bytes;
bytes.resize(proto->size());