diff options
| -rw-r--r-- | cmds/statsd/src/external/StatsPuller.cpp | 8 | ||||
| -rw-r--r-- | cmds/statsd/src/external/StatsPuller.h | 1 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 70 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 17 | ||||
| -rw-r--r-- | cmds/statsd/src/statsd_config.proto | 2 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 213 |
6 files changed, 281 insertions, 30 deletions
diff --git a/cmds/statsd/src/external/StatsPuller.cpp b/cmds/statsd/src/external/StatsPuller.cpp index f501574d6afc..7043d663eb2c 100644 --- a/cmds/statsd/src/external/StatsPuller.cpp +++ b/cmds/statsd/src/external/StatsPuller.cpp @@ -59,15 +59,21 @@ bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr< mLastPullTimeNs = elapsedTimeNs; int64_t pullStartTimeNs = getElapsedRealtimeNs(); bool ret = PullInternal(&mCachedData); + if (!ret) { + mCachedData.clear(); + return false; + } StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs); for (const shared_ptr<LogEvent>& data : mCachedData) { data->setElapsedTimestampNs(elapsedTimeNs); data->setLogdWallClockTimestampNs(wallClockTimeNs); } - if (ret && mCachedData.size() > 0) { + + if (mCachedData.size() > 0) { mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId); (*data) = mCachedData; } + StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs); return ret; } diff --git a/cmds/statsd/src/external/StatsPuller.h b/cmds/statsd/src/external/StatsPuller.h index 22cb2f5c2175..cafd7979601a 100644 --- a/cmds/statsd/src/external/StatsPuller.h +++ b/cmds/statsd/src/external/StatsPuller.h @@ -39,6 +39,7 @@ public: // Pulls the data. The returned data will have elapsedTimeNs set as timeNs // and will have wallClockTimeNs set as current wall clock time. + // Return true if the pull is successful. bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data); // Clear cache immediately diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index a34df8aabea2..bff23345a060 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -72,17 +72,15 @@ const int FIELD_ID_BUCKET_NUM = 4; const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5; const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6; +const Value ZERO_LONG((int64_t)0); +const Value ZERO_DOUBLE((int64_t)0); + // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently -ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, - const ValueMetric& metric, - const int conditionIndex, - const sp<ConditionWizard>& conditionWizard, - const int whatMatcherIndex, - const sp<EventMatcherWizard>& matcherWizard, - const int pullTagId, - const int64_t timeBaseNs, - const int64_t startTimeNs, - const sp<StatsPullerManager>& pullerManager) +ValueMetricProducer::ValueMetricProducer( + const ConfigKey& key, const ValueMetric& metric, const int conditionIndex, + const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex, + const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs, + const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager) : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard), mWhatMatcherIndex(whatMatcherIndex), mEventMatcherWizard(matcherWizard), @@ -102,7 +100,9 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, mAggregationType(metric.aggregation_type()), mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)), mValueDirection(metric.value_direction()), - mSkipZeroDiffOutput(metric.skip_zero_diff_output()) { + mSkipZeroDiffOutput(metric.skip_zero_diff_output()), + mUseZeroDefaultBase(metric.use_zero_default_base()), + mHasGlobalBase(false) { int64_t bucketSizeMills = 0; if (metric.has_bucket()) { bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket()); @@ -302,6 +302,15 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } } +void ValueMetricProducer::resetBase() { + for (auto& slice : mCurrentSlicedBucket) { + for (auto& interval : slice.second) { + interval.hasBase = false; + } + } + mHasGlobalBase = false; +} + void ValueMetricProducer::onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs) { @@ -317,13 +326,10 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, pullAndMatchEventsLocked(eventTimeNs); } - // when condition change from true to false, clear diff base + // when condition change from true to false, clear diff base but don't + // reset other counters as we may accumulate more value in the bucket. if (mUseDiff && mCondition && !condition) { - for (auto& slice : mCurrentSlicedBucket) { - for (auto& interval : slice.second) { - interval.hasBase = false; - } - } + resetBase(); } mCondition = condition; @@ -332,15 +338,17 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) { vector<std::shared_ptr<LogEvent>> allData; if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) { - if (allData.size() == 0) { - return; - } for (const auto& data : allData) { if (mEventMatcherWizard->matchLogEvent( *data, mWhatMatcherIndex) == MatchingState::kMatched) { onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } + mHasGlobalBase = true; + } else { + // for pulled data, every pull is needed. So we reset the base if any + // pull fails. + resetBase(); } } @@ -376,6 +384,7 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven onMatchedLogEventLocked(mWhatMatcherIndex, *data); } } + mHasGlobalBase = true; } else { VLOG("No need to commit data on condition false."); } @@ -486,11 +495,18 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIn } if (mUseDiff) { - // no base. just update base and return. if (!interval.hasBase) { - interval.base = value; - interval.hasBase = true; - return; + if (mHasGlobalBase && mUseZeroDefaultBase) { + // The bucket has global base. This key does not. + // Optionally use zero as base. + interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE); + interval.hasBase = true; + } else { + // no base. just update base and return. + interval.base = value; + interval.hasBase = true; + return; + } } Value diff; switch (mValueDirection) { @@ -580,11 +596,7 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); // take base again in future good bucket. - for (auto& slice : mCurrentSlicedBucket) { - for (auto& interval : slice.second) { - interval.hasBase = false; - } - } + resetBase(); } VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, (long long)mCurrentBucketStartTimeNs); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 9fe84dcf93aa..36ae2146f465 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -148,6 +148,9 @@ private: void pullAndMatchEventsLocked(const int64_t timestampNs); + // Reset diff base and mHasGlobalBase + void resetBase(); + static const size_t kBucketSize = sizeof(ValueBucket{}); const size_t mDimensionSoftLimit; @@ -164,6 +167,18 @@ private: const bool mSkipZeroDiffOutput; + // If true, use a zero value as base to compute the diff. + // This is used for new keys which are present in the new data but was not + // present in the base data. + // The default base will only be used if we have a global base. + const bool mUseZeroDefaultBase; + + // For pulled metrics, this is always set to true whenever a pull succeeds. + // It is set to false when a pull fails, or upon condition change to false. + // This is used to decide if we have the right base data to compute the + // diff against. + bool mHasGlobalBase; + FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering); FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset); @@ -185,6 +200,8 @@ private: FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket); FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime); FRIEND_TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput); + FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase); + FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures); }; } // namespace statsd diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto index 61854a446e80..f485185cc9c2 100644 --- a/cmds/statsd/src/statsd_config.proto +++ b/cmds/statsd/src/statsd_config.proto @@ -274,6 +274,8 @@ message ValueMetric { optional bool use_diff = 12; + optional bool use_zero_default_base = 15 [default = false]; + enum ValueDirection { UNKNOWN = 0; INCREASING = 1; diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 44aa00b3046c..1bd34f52a64e 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -1468,6 +1468,219 @@ TEST(ValueMetricProducerTest, TestSkipZeroDiffOutput) { EXPECT_EQ(5, valueProducer.mPastBuckets.begin()->second.back().values[0].long_value); } +/* + * Tests pulled atoms with no conditions + */ +TEST(ValueMetricProducerTest, TestUseZeroDefaultBase) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.mutable_dimensions_in_what()->set_field(tagId); + metric.mutable_dimensions_in_what()->add_child()->set_field(1); + metric.set_use_zero_default_base(true); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) + .WillOnce(Invoke([](int tagId, int64_t timeNs, + vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(1); + event->write(3); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, + logEventMatcherIndex, eventMatcherWizard, tagId, + bucketStartTimeNs, bucketStartTimeNs, pullerManager); + + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + auto iter = valueProducer.mCurrentSlicedBucket.begin(); + auto& interval1 = iter->second[0]; + EXPECT_EQ(1, iter->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); + EXPECT_EQ(true, interval1.hasBase); + EXPECT_EQ(3, interval1.base.long_value); + EXPECT_EQ(false, interval1.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); + vector<shared_ptr<LogEvent>> allData; + + allData.clear(); + shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event1->write(2); + event1->write(4); + event1->init(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event2->write(1); + event2->write(11); + event2->init(); + allData.push_back(event1); + allData.push_back(event2); + + valueProducer.onDataPulled(allData); + EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(true, interval1.hasBase); + EXPECT_EQ(11, interval1.base.long_value); + EXPECT_EQ(true, interval1.hasValue); + EXPECT_EQ(8, interval1.value.long_value); + + auto it = valueProducer.mCurrentSlicedBucket.begin(); + for (; it != valueProducer.mCurrentSlicedBucket.end(); it++) { + if (it != iter) { + break; + } + } + EXPECT_TRUE(it != iter); + auto& interval2 = it->second[0]; + EXPECT_EQ(2, it->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); + EXPECT_EQ(true, interval2.hasBase); + EXPECT_EQ(4, interval2.base.long_value); + EXPECT_EQ(true, interval2.hasValue); + EXPECT_EQ(4, interval2.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); +} + +/* + * Tests pulled atoms with no conditions + */ +TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.mutable_dimensions_in_what()->set_field(tagId); + metric.mutable_dimensions_in_what()->add_child()->set_field(1); + metric.set_use_zero_default_base(true); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp<EventMatcherWizard> eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>(); + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, Pull(tagId, _, _)) + .WillOnce(Invoke([](int tagId, int64_t timeNs, + vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs); + event->write(1); + event->write(3); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard, + logEventMatcherIndex, eventMatcherWizard, tagId, + bucketStartTimeNs, bucketStartTimeNs, pullerManager); + + EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size()); + auto iter = valueProducer.mCurrentSlicedBucket.begin(); + auto& interval1 = iter->second[0]; + EXPECT_EQ(1, iter->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); + EXPECT_EQ(true, interval1.hasBase); + EXPECT_EQ(3, interval1.base.long_value); + EXPECT_EQ(false, interval1.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); + vector<shared_ptr<LogEvent>> allData; + + allData.clear(); + shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event1->write(2); + event1->write(4); + event1->init(); + shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1); + event2->write(1); + event2->write(11); + event2->init(); + allData.push_back(event1); + allData.push_back(event2); + + valueProducer.onDataPulled(allData); + EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(true, interval1.hasBase); + EXPECT_EQ(11, interval1.base.long_value); + EXPECT_EQ(true, interval1.hasValue); + EXPECT_EQ(8, interval1.value.long_value); + + auto it = valueProducer.mCurrentSlicedBucket.begin(); + for (; it != valueProducer.mCurrentSlicedBucket.end(); it++) { + if (it != iter) { + break; + } + } + EXPECT_TRUE(it != iter); + auto& interval2 = it->second[0]; + EXPECT_EQ(2, it->first.getDimensionKeyInWhat().getValues()[0].mValue.int_value); + EXPECT_EQ(true, interval2.hasBase); + EXPECT_EQ(4, interval2.base.long_value); + EXPECT_EQ(true, interval2.hasValue); + EXPECT_EQ(4, interval2.value.long_value); + EXPECT_EQ(0UL, valueProducer.mPastBuckets.size()); + + // next pull somehow did not happen, skip to end of bucket 3 + allData.clear(); + event1 = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1); + event1->write(2); + event1->write(5); + event1->init(); + allData.push_back(event1); + valueProducer.onDataPulled(allData); + + EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(true, interval2.hasBase); + EXPECT_EQ(5, interval2.base.long_value); + EXPECT_EQ(false, interval2.hasValue); + EXPECT_EQ(false, interval1.hasBase); + EXPECT_EQ(false, interval1.hasValue); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + EXPECT_EQ(2UL, valueProducer.mPastBuckets.size()); + + allData.clear(); + event1 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1); + event1->write(2); + event1->write(13); + event1->init(); + allData.push_back(event1); + event2 = make_shared<LogEvent>(tagId, bucket5StartTimeNs + 1); + event2->write(1); + event2->write(5); + event2->init(); + allData.push_back(event2); + valueProducer.onDataPulled(allData); + + EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size()); + EXPECT_EQ(true, interval2.hasBase); + EXPECT_EQ(13, interval2.base.long_value); + EXPECT_EQ(true, interval2.hasValue); + EXPECT_EQ(8, interval2.value.long_value); + EXPECT_EQ(true, interval1.hasBase); + EXPECT_EQ(5, interval1.base.long_value); + EXPECT_EQ(true, interval1.hasValue); + EXPECT_EQ(5, interval1.value.long_value); + EXPECT_EQ(true, valueProducer.mHasGlobalBase); + EXPECT_EQ(2UL, valueProducer.mPastBuckets.size()); +} + } // namespace statsd } // namespace os } // namespace android |