diff options
| -rw-r--r-- | cmds/statsd/src/metrics/MetricProducer.h | 4 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.cpp | 54 | ||||
| -rw-r--r-- | cmds/statsd/src/metrics/ValueMetricProducer.h | 12 | ||||
| -rw-r--r-- | cmds/statsd/src/stats_log.proto | 2 | ||||
| -rw-r--r-- | cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp | 130 | 
5 files changed, 170 insertions, 32 deletions
| diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h index e86fdf06e836..673f668cda82 100644 --- a/cmds/statsd/src/metrics/MetricProducer.h +++ b/cmds/statsd/src/metrics/MetricProducer.h @@ -82,7 +82,9 @@ enum BucketDropReason {      DIMENSION_GUARDRAIL_REACHED = 6,      MULTIPLE_BUCKETS_SKIPPED = 7,      // Not an invalid bucket case, but the bucket is dropped. -    BUCKET_TOO_SMALL = 8 +    BUCKET_TOO_SMALL = 8, +    // Not an invalid bucket case, but the bucket is skipped. +    NO_DATA = 9  };  struct Activation { diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index f03ce4550bc4..dbec24bf3f6c 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -108,7 +108,7 @@ ValueMetricProducer::ValueMetricProducer(        mSkipZeroDiffOutput(metric.skip_zero_diff_output()),        mUseZeroDefaultBase(metric.use_zero_default_base()),        mHasGlobalBase(false), -      mCurrentBucketIsInvalid(false), +      mCurrentBucketIsSkipped(false),        mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC                                                        : StatsdStats::kPullMaxDelayNs),        mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()), @@ -383,15 +383,12 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,  void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,                                                                    const BucketDropReason reason) { -    if (!mCurrentBucketIsInvalid) { +    if (!mCurrentBucketIsSkipped) {          // Only report to StatsdStats once per invalid bucket.          StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);      } -    if (!maxDropEventsReached()) { -        mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); -    } -    mCurrentBucketIsInvalid = true; +    skipCurrentBucket(dropTimeNs, reason);  }  void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs, @@ -400,6 +397,14 @@ void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,      resetBase();  } +void ValueMetricProducer::skipCurrentBucket(const int64_t dropTimeNs, +                                            const BucketDropReason reason) { +    if (!maxDropEventsReached()) { +        mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason)); +    } +    mCurrentBucketIsSkipped = true; +} +  void ValueMetricProducer::resetBase() {      for (auto& slice : mCurrentBaseInfo) {          for (auto& baseInfo : slice.second) { @@ -961,12 +966,10 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,      int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);      bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;      if (!isBucketLargeEnough) { -        if (!maxDropEventsReached()) { -            mCurrentSkippedBucket.dropEvents.emplace_back( -                    buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL)); -        } +        skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);      } -    if (isBucketLargeEnough && !mCurrentBucketIsInvalid) { +    bool bucketHasData = false; +    if (!mCurrentBucketIsSkipped) {          // The current bucket is large enough to keep.          for (const auto& slice : mCurrentSlicedBucket) {              ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); @@ -975,14 +978,33 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,              if (bucket.valueIndex.size() > 0) {                  auto& bucketList = mPastBuckets[slice.first];                  bucketList.push_back(bucket); +                bucketHasData = true;              }          } -    } else { +    } + +    if (!bucketHasData && !mCurrentBucketIsSkipped) { +        skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); +    } + +    if (mCurrentBucketIsSkipped) {          mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; -        mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; +        // Fill in the gap if we skipped multiple buckets. +        mCurrentSkippedBucket.bucketEndTimeNs = +                numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime;          mSkippedBuckets.emplace_back(mCurrentSkippedBucket);      } +    // This means that the current bucket was not flushed before a forced bucket split. +    if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) { +        SkippedBucket bucketInGap; +        bucketInGap.bucketStartTimeNs = bucketEndTime; +        bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; +        bucketInGap.dropEvents.emplace_back( +                buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA)); +        mSkippedBuckets.emplace_back(bucketInGap); +    } +      appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);      initCurrentSlicedBucket(nextBucketStartTimeNs);      // Update the condition timer again, in case we skipped buckets. @@ -1036,13 +1058,13 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)          // TODO: remove mCurrentBaseInfo entries when obsolete      } -    mCurrentBucketIsInvalid = false; +    mCurrentBucketIsSkipped = false;      mCurrentSkippedBucket.reset();      // If we do not have a global base when the condition is true,      // we will have incomplete bucket for the next bucket.      if (mUseDiff && !mHasGlobalBase && mCondition) { -        mCurrentBucketIsInvalid = false; +        mCurrentBucketIsSkipped = false;      }      mCurrentBucketStartTimeNs = nextBucketStartTimeNs;      VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, @@ -1051,7 +1073,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)  void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {      bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; -    if (mCurrentBucketIsInvalid) { +    if (mCurrentBucketIsSkipped) {          if (isFullBucketReached) {              // If the bucket is invalid, we ignore the full bucket since it contains invalid data.              mCurrentFullBucket.clear(); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 751fef2bf2b1..bb4a66164860 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -144,6 +144,10 @@ private:      void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);      void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,                                                   const BucketDropReason reason); +    // Skips the current bucket without notifying StatsdStats of the skipped bucket. +    // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that +    // causes the bucket to be invalidated will not notify StatsdStats. +    void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);      const int mWhatMatcherIndex; @@ -250,11 +254,9 @@ private:      // diff against.      bool mHasGlobalBase; -    // Invalid bucket. There was a problem in collecting data in the current bucket so we cannot -    // trust any of the data in this bucket. -    // -    // For instance, one pull failed. -    bool mCurrentBucketIsInvalid; +    // This is to track whether or not the bucket is skipped for any of the reasons listed in +    // BucketDropReason, many of which make the bucket potentially invalid. +    bool mCurrentBucketIsSkipped;      const int64_t mMaxPullDelayNs; diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto index 1121392f1db0..6bfa26761b2f 100644 --- a/cmds/statsd/src/stats_log.proto +++ b/cmds/statsd/src/stats_log.proto @@ -212,6 +212,8 @@ message StatsLogReport {        MULTIPLE_BUCKETS_SKIPPED = 7;        // Not an invalid bucket case, but the bucket is dropped.        BUCKET_TOO_SMALL = 8; +      // Not an invalid bucket case, but the bucket is skipped. +      NO_DATA = 9;    };    message DropEvent { diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 474aa2234837..14246cab0d96 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -1115,13 +1115,21 @@ TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {      EXPECT_EQ(false, curInterval.hasValue);      assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {12}, {bucketSizeNs},                                      {bucket2StartTimeNs}, {bucket3StartTimeNs}); +    // The 1st bucket is dropped because of no data      // The 3rd bucket is dropped due to multiple buckets being skipped. -    ASSERT_EQ(1, valueProducer->mSkippedBuckets.size()); -    EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); -    EXPECT_EQ(bucket4StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); +    ASSERT_EQ(2, valueProducer->mSkippedBuckets.size()); + +    EXPECT_EQ(bucketStartTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); +    EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs);      ASSERT_EQ(1, valueProducer->mSkippedBuckets[0].dropEvents.size()); -    EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); -    EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[0].dropEvents[0].dropTimeNs); +    EXPECT_EQ(NO_DATA, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); +    EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].dropEvents[0].dropTimeNs); + +    EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[1].bucketStartTimeNs); +    EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[1].bucketEndTimeNs); +    ASSERT_EQ(1, valueProducer->mSkippedBuckets[1].dropEvents.size()); +    EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[1].dropEvents[0].reason); +    EXPECT_EQ(bucket6StartTimeNs, valueProducer->mSkippedBuckets[1].dropEvents[0].dropTimeNs);  }  /* @@ -2214,7 +2222,7 @@ TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit) {      valueProducer->mCondition = ConditionState::kFalse;      valueProducer->onConditionChanged(true, bucketStartTimeNs + 2); -    EXPECT_EQ(true, valueProducer->mCurrentBucketIsInvalid); +    EXPECT_EQ(true, valueProducer->mCurrentBucketIsSkipped);      ASSERT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size());      ASSERT_EQ(0UL, valueProducer->mSkippedBuckets.size()); @@ -2629,13 +2637,17 @@ TEST_P(ValueMetricProducerTest_PartialBucket, TestFullBucketResetWhenLastBucketI      vector<shared_ptr<LogEvent>> allData;      allData.push_back(CreateRepeatedValueLogEvent(tagId, bucket3StartTimeNs + 1, 4)); +    // Pull fails and arrives late.      valueProducer->onDataPulled(allData, /** fails */ false, bucket3StartTimeNs + 1);      assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {9},                                      {partialBucketSplitTimeNs - bucketStartTimeNs},                                      {bucketStartTimeNs}, {partialBucketSplitTimeNs});      ASSERT_EQ(1, valueProducer->mSkippedBuckets.size()); +    ASSERT_EQ(2, valueProducer->mSkippedBuckets[0].dropEvents.size()); +    EXPECT_EQ(PULL_FAILED, valueProducer->mSkippedBuckets[0].dropEvents[0].reason); +    EXPECT_EQ(MULTIPLE_BUCKETS_SKIPPED, valueProducer->mSkippedBuckets[0].dropEvents[1].reason);      EXPECT_EQ(partialBucketSplitTimeNs, valueProducer->mSkippedBuckets[0].bucketStartTimeNs); -    EXPECT_EQ(bucket2StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs); +    EXPECT_EQ(bucket3StartTimeNs, valueProducer->mSkippedBuckets[0].bucketEndTimeNs);      ASSERT_EQ(0UL, valueProducer->mCurrentFullBucket.size());  } @@ -3464,26 +3476,41 @@ TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenMultipleBucketsSki      // Condition change event that skips forward by three buckets.      valueProducer->onConditionChanged(false, bucket4StartTimeNs + 10); +    int64_t dumpTimeNs = bucket4StartTimeNs + 1000; +      // Check dump report.      ProtoOutputStream output;      std::set<string> strSet; -    valueProducer->onDumpReport(bucket4StartTimeNs + 1000, true /* include recent buckets */, true, +    valueProducer->onDumpReport(dumpTimeNs, true /* include current buckets */, true,                                  NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);      StatsLogReport report = outputStreamToProto(&output);      EXPECT_TRUE(report.has_value_metrics());      ASSERT_EQ(0, report.value_metrics().data_size()); -    ASSERT_EQ(1, report.value_metrics().skipped_size()); +    ASSERT_EQ(2, report.value_metrics().skipped_size());      EXPECT_EQ(NanoToMillis(bucketStartTimeNs),                report.value_metrics().skipped(0).start_bucket_elapsed_millis()); -    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), +    EXPECT_EQ(NanoToMillis(bucket4StartTimeNs),                report.value_metrics().skipped(0).end_bucket_elapsed_millis());      ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size());      auto dropEvent = report.value_metrics().skipped(0).drop_event(0);      EXPECT_EQ(BucketDropReason::MULTIPLE_BUCKETS_SKIPPED, dropEvent.drop_reason());      EXPECT_EQ(NanoToMillis(bucket4StartTimeNs + 10), dropEvent.drop_time_millis()); + +    // This bucket is skipped because a dumpReport with include current buckets is called. +    // This creates a new bucket from bucket4StartTimeNs to dumpTimeNs in which we have no data +    // since the condition is false for the entire bucket interval. +    EXPECT_EQ(NanoToMillis(bucket4StartTimeNs), +              report.value_metrics().skipped(1).start_bucket_elapsed_millis()); +    EXPECT_EQ(NanoToMillis(dumpTimeNs), +              report.value_metrics().skipped(1).end_bucket_elapsed_millis()); +    ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); + +    dropEvent = report.value_metrics().skipped(1).drop_event(0); +    EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); +    EXPECT_EQ(NanoToMillis(dumpTimeNs), dropEvent.drop_time_millis());  }  /* @@ -3544,6 +3571,89 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) {  }  /* + * Test that NO_DATA dump reason is logged when a flushed bucket contains no data. + */ +TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenDataUnavailable) { +    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); + +    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + +    sp<ValueMetricProducer> valueProducer = +            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); + +    // Check dump report. +    ProtoOutputStream output; +    std::set<string> strSet; +    int64_t dumpReportTimeNs = bucketStartTimeNs + 10000000000; // 10 seconds +    valueProducer->onDumpReport(dumpReportTimeNs, true /* include current bucket */, true, +                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); + +    StatsLogReport report = outputStreamToProto(&output); +    EXPECT_TRUE(report.has_value_metrics()); +    ASSERT_EQ(0, report.value_metrics().data_size()); +    ASSERT_EQ(1, report.value_metrics().skipped_size()); + +    EXPECT_EQ(NanoToMillis(bucketStartTimeNs), +              report.value_metrics().skipped(0).start_bucket_elapsed_millis()); +    EXPECT_EQ(NanoToMillis(dumpReportTimeNs), +              report.value_metrics().skipped(0).end_bucket_elapsed_millis()); +    ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); + +    auto dropEvent = report.value_metrics().skipped(0).drop_event(0); +    EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); +    EXPECT_EQ(NanoToMillis(dumpReportTimeNs), dropEvent.drop_time_millis()); +} + +/* + * Test that a skipped bucket is logged when a forced bucket split occurs when the previous bucket + * was not flushed in time. + */ +TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBeforeBucketFlush) { +    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); + +    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + +    sp<ValueMetricProducer> valueProducer = +            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); + +    // App update event. +    int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000; +    valueProducer->notifyAppUpgrade(appUpdateTimeNs); + +    // Check dump report. +    ProtoOutputStream output; +    std::set<string> strSet; +    int64_t dumpReportTimeNs = bucket2StartTimeNs + 10000000000; // 10 seconds +    valueProducer->onDumpReport(dumpReportTimeNs, false /* include current buckets */, true, +                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output); + +    StatsLogReport report = outputStreamToProto(&output); +    EXPECT_TRUE(report.has_value_metrics()); +    ASSERT_EQ(0, report.value_metrics().data_size()); +    ASSERT_EQ(2, report.value_metrics().skipped_size()); + +    EXPECT_EQ(NanoToMillis(bucketStartTimeNs), +              report.value_metrics().skipped(0).start_bucket_elapsed_millis()); +    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), +              report.value_metrics().skipped(0).end_bucket_elapsed_millis()); +    ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); + +    auto dropEvent = report.value_metrics().skipped(0).drop_event(0); +    EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); +    EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); + +    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), +              report.value_metrics().skipped(1).start_bucket_elapsed_millis()); +    EXPECT_EQ(NanoToMillis(appUpdateTimeNs), +              report.value_metrics().skipped(1).end_bucket_elapsed_millis()); +    ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); + +    dropEvent = report.value_metrics().skipped(1).drop_event(0); +    EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); +    EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); +} + +/*   * Test multiple bucket drop events in the same bucket.   */  TEST(ValueMetricProducerTest_BucketDrop, TestMultipleBucketDropEvents) { |