summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmds/statsd/src/external/StatsPuller.cpp8
-rw-r--r--cmds/statsd/src/external/StatsPuller.h1
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.cpp70
-rw-r--r--cmds/statsd/src/metrics/ValueMetricProducer.h17
-rw-r--r--cmds/statsd/src/statsd_config.proto2
-rw-r--r--cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp213
6 files changed, 281 insertions, 30 deletions
diff --git a/cmds/statsd/src/external/StatsPuller.cpp b/cmds/statsd/src/external/StatsPuller.cpp
index f501574d6afc..7043d663eb2c 100644
--- a/cmds/statsd/src/external/StatsPuller.cpp
+++ b/cmds/statsd/src/external/StatsPuller.cpp
@@ -59,15 +59,21 @@ bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr<
mLastPullTimeNs = elapsedTimeNs;
int64_t pullStartTimeNs = getElapsedRealtimeNs();
bool ret = PullInternal(&mCachedData);
+ if (!ret) {
+ mCachedData.clear();
+ return false;
+ }
StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs);
for (const shared_ptr<LogEvent>& data : mCachedData) {
data->setElapsedTimestampNs(elapsedTimeNs);
data->setLogdWallClockTimestampNs(wallClockTimeNs);
}
- if (ret && mCachedData.size() > 0) {
+
+ if (mCachedData.size() > 0) {
mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId);
(*data) = mCachedData;
}
+
StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs);
return ret;
}
diff --git a/cmds/statsd/src/external/StatsPuller.h b/cmds/statsd/src/external/StatsPuller.h
index 22cb2f5c2175..cafd7979601a 100644
--- a/cmds/statsd/src/external/StatsPuller.h
+++ b/cmds/statsd/src/external/StatsPuller.h
@@ -39,6 +39,7 @@ public:
// Pulls the data. The returned data will have elapsedTimeNs set as timeNs
// and will have wallClockTimeNs set as current wall clock time.
+ // Return true if the pull is successful.
bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data);
// Clear cache immediately
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index a34df8aabea2..bff23345a060 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -72,17 +72,15 @@ const int FIELD_ID_BUCKET_NUM = 4;
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
+const Value ZERO_LONG((int64_t)0);
+const Value ZERO_DOUBLE((int64_t)0);
+
// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
-ValueMetricProducer::ValueMetricProducer(const ConfigKey& key,
- const ValueMetric& metric,
- const int conditionIndex,
- const sp<ConditionWizard>& conditionWizard,
- const int whatMatcherIndex,
- const sp<EventMatcherWizard>& matcherWizard,
- const int pullTagId,
- const int64_t timeBaseNs,
- const int64_t startTimeNs,
- const sp<StatsPullerManager>& pullerManager)
+ValueMetricProducer::ValueMetricProducer(
+ const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
+ const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
+ const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
+ const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
: MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
mWhatMatcherIndex(whatMatcherIndex),
mEventMatcherWizard(matcherWizard),
@@ -102,7 +100,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key,
mAggregationType(metric.aggregation_type()),
mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
mValueDirection(metric.value_direction()),
- mSkipZeroDiffOutput(metric.skip_zero_diff_output()) {
+ mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
+ mUseZeroDefaultBase(metric.use_zero_default_base()),
+ mHasGlobalBase(false) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -302,6 +302,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
}
}
+void ValueMetricProducer::resetBase() {
+ for (auto& slice : mCurrentSlicedBucket) {
+ for (auto& interval : slice.second) {
+ interval.hasBase = false;
+ }
+ }
+ mHasGlobalBase = false;
+}
+
void ValueMetricProducer::onConditionChangedLocked(const bool condition,
const int64_t eventTimeNs) {
if (eventTimeNs < mCurrentBucketStartTimeNs) {
@@ -317,13 +326,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
pullAndMatchEventsLocked(eventTimeNs);
}
- // when condition change from true to false, clear diff base
+ // when condition change from true to false, clear diff base but don't
+ // reset other counters as we may accumulate more value in the bucket.
if (mUseDiff && mCondition && !condition) {
- for (auto& slice : mCurrentSlicedBucket) {
- for (auto& interval : slice.second) {
- interval.hasBase = false;
- }
- }
+ resetBase();
}
mCondition = condition;
@@ -332,15 +338,17 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition,
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
- if (allData.size() == 0) {
- return;
- }
for (const auto& data : allData) {
if (mEventMatcherWizard->matchLogEvent(
*data, mWhatMatcherIndex) == MatchingState::kMatched) {
onMatchedLogEventLocked(mWhatMatcherIndex, *data);
}
}
+ mHasGlobalBase = true;
+ } else {
+ // for pulled data, every pull is needed. So we reset the base if any
+ // pull fails.
+ resetBase();
}
}
@@ -376,6 +384,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
onMatchedLogEventLocked(mWhatMatcherIndex, *data);
}
}
+ mHasGlobalBase = true;
} else {
VLOG("No need to commit data on condition false.");
}
@@ -486,11 +495,18 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn
}
if (mUseDiff) {
- // no base. just update base and return.
if (!interval.hasBase) {
- interval.base = value;
- interval.hasBase = true;
- return;
+ if (mHasGlobalBase && mUseZeroDefaultBase) {
+ // The bucket has global base. This key does not.
+ // Optionally use zero as base.
+ interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
+ interval.hasBase = true;
+ } else {
+ // no base. just update base and return.
+ interval.base = value;
+ interval.hasBase = true;
+ return;
+ }
}
Value diff;
switch (mValueDirection) {
@@ -580,11 +596,7 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
if (numBucketsForward > 1) {
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
// take base again in future good bucket.
- for (auto& slice : mCurrentSlicedBucket) {
- for (auto& interval : slice.second) {
- interval.hasBase = false;
- }
- }
+ resetBase();
}
VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
(long long)mCurrentBucketStartTimeNs);
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 9fe84dcf93aa..36ae2146f465 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -148,6 +148,9 @@ private:
void pullAndMatchEventsLocked(const int64_t timestampNs);
+ // Reset diff base and mHasGlobalBase
+ void resetBase();
+
static const size_t kBucketSize = sizeof(ValueBucket{});
const size_t mDimensionSoftLimit;
@@ -164,6 +167,18 @@ private:
const bool mSkipZeroDiffOutput;
+ // If true, use a zero value as base to compute the diff.
+ // This is used for new keys which are present in the new data but was not
+ // present in the base data.
+ // The default base will only be used if we have a global base.
+ const bool mUseZeroDefaultBase;
+
+ // For pulled metrics, this is always set to true whenever a pull succeeds.
+ // It is set to false when a pull fails, or upon condition change to false.
+ // This is used to decide if we have the right base data to compute the
+ // diff against.
+ bool mHasGlobalBase;
+
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -185,6 +200,8 @@ private:
FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput);
+ FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
+ FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
};
} // namespace statsd
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index 61854a446e80..f485185cc9c2 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -274,6 +274,8 @@ message ValueMetric {
optional bool use_diff = 12;
+ optional bool use_zero_default_base = 15 [default = false];
+
enum ValueDirection {
UNKNOWN = 0;
INCREASING = 1;
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 44aa00b3046c..1bd34f52a64e 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -1468,6 +1468,219 @@ TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) {
EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value);
}
+/*
+ * Tests pulled atoms with no conditions
+ */
+TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.mutable_dimensions_in_what()->set_field(tagId);
+ metric.mutable_dimensions_in_what()->add_child()->set_field(1);
+ metric.set_use_zero_default_base(true);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ .WillOnce(Invoke([](int tagId, int64_t timeNs,
+ vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+ event->write(1);
+ event->write(3);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+ logEventMatcherIndex, eventMatcherWizard, tagId,
+ bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ auto iter = valueProducer.mCurrentSlicedBucket.begin();
+ auto& interval1 = iter->second[0];
+ EXPECT_EQ(1, iter->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value);
+ EXPECT_EQ(true, interval1.hasBase);
+ EXPECT_EQ(3, interval1.base.long_value);
+ EXPECT_EQ(false, interval1.hasValue);
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+ vector<shared_ptr<LogEvent>> allData;
+
+ allData.clear();
+ shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event1->write(2);
+ event1->write(4);
+ event1->init();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event2->write(1);
+ event2->write(11);
+ event2->init();
+ allData.push_back(event1);
+ allData.push_back(event2);
+
+ valueProducer.onDataPulled(allData);
+ EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(true, interval1.hasBase);
+ EXPECT_EQ(11, interval1.base.long_value);
+ EXPECT_EQ(true, interval1.hasValue);
+ EXPECT_EQ(8, interval1.value.long_value);
+
+ auto it = valueProducer.mCurrentSlicedBucket.begin();
+ for (; it != valueProducer.mCurrentSlicedBucket.end(); it++) {
+ if (it != iter) {
+ break;
+ }
+ }
+ EXPECT_TRUE(it != iter);
+ auto& interval2 = it->second[0];
+ EXPECT_EQ(2, it->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value);
+ EXPECT_EQ(true, interval2.hasBase);
+ EXPECT_EQ(4, interval2.base.long_value);
+ EXPECT_EQ(true, interval2.hasValue);
+ EXPECT_EQ(4, interval2.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+}
+
+/*
+ * Tests pulled atoms with no conditions
+ */
+TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.mutable_dimensions_in_what()->set_field(tagId);
+ metric.mutable_dimensions_in_what()->add_child()->set_field(1);
+ metric.set_use_zero_default_base(true);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ .WillOnce(Invoke([](int tagId, int64_t timeNs,
+ vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+ event->write(1);
+ event->write(3);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+ logEventMatcherIndex, eventMatcherWizard, tagId,
+ bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ auto iter = valueProducer.mCurrentSlicedBucket.begin();
+ auto& interval1 = iter->second[0];
+ EXPECT_EQ(1, iter->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value);
+ EXPECT_EQ(true, interval1.hasBase);
+ EXPECT_EQ(3, interval1.base.long_value);
+ EXPECT_EQ(false, interval1.hasValue);
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+ vector<shared_ptr<LogEvent>> allData;
+
+ allData.clear();
+ shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event1->write(2);
+ event1->write(4);
+ event1->init();
+ shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event2->write(1);
+ event2->write(11);
+ event2->init();
+ allData.push_back(event1);
+ allData.push_back(event2);
+
+ valueProducer.onDataPulled(allData);
+ EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(true, interval1.hasBase);
+ EXPECT_EQ(11, interval1.base.long_value);
+ EXPECT_EQ(true, interval1.hasValue);
+ EXPECT_EQ(8, interval1.value.long_value);
+
+ auto it = valueProducer.mCurrentSlicedBucket.begin();
+ for (; it != valueProducer.mCurrentSlicedBucket.end(); it++) {
+ if (it != iter) {
+ break;
+ }
+ }
+ EXPECT_TRUE(it != iter);
+ auto& interval2 = it->second[0];
+ EXPECT_EQ(2, it->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value);
+ EXPECT_EQ(true, interval2.hasBase);
+ EXPECT_EQ(4, interval2.base.long_value);
+ EXPECT_EQ(true, interval2.hasValue);
+ EXPECT_EQ(4, interval2.value.long_value);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+
+ // next pull somehow did not happen, skip to end of bucket 3
+ allData.clear();
+ event1 = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
+ event1->write(2);
+ event1->write(5);
+ event1->init();
+ allData.push_back(event1);
+ valueProducer.onDataPulled(allData);
+
+ EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(true, interval2.hasBase);
+ EXPECT_EQ(5, interval2.base.long_value);
+ EXPECT_EQ(false, interval2.hasValue);
+ EXPECT_EQ(false, interval1.hasBase);
+ EXPECT_EQ(false, interval1.hasValue);
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+ EXPECT_EQ(2UL, valueProducer.mPastBuckets.size());
+
+ allData.clear();
+ event1 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1);
+ event1->write(2);
+ event1->write(13);
+ event1->init();
+ allData.push_back(event1);
+ event2 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1);
+ event2->write(1);
+ event2->write(5);
+ event2->init();
+ allData.push_back(event2);
+ valueProducer.onDataPulled(allData);
+
+ EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(true, interval2.hasBase);
+ EXPECT_EQ(13, interval2.base.long_value);
+ EXPECT_EQ(true, interval2.hasValue);
+ EXPECT_EQ(8, interval2.value.long_value);
+ EXPECT_EQ(true, interval1.hasBase);
+ EXPECT_EQ(5, interval1.base.long_value);
+ EXPECT_EQ(true, interval1.hasValue);
+ EXPECT_EQ(5, interval1.value.long_value);
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+ EXPECT_EQ(2UL, valueProducer.mPastBuckets.size());
+}
+
} // namespace statsd
} // namespace os
} // namespace android