From 6c75ecdcef7ae0acb17c512f5bc086bba89ed123 Mon Sep 17 00:00:00 2001 From: Olivier Gaillard Date: Wed, 20 Feb 2019 09:57:33 +0000 Subject: Introduces an option to set a dump latency requirement. We are currently dumping invalid data for pulled metrics. Pulled metrics require a new pull when flushing a bucket. We should either do another pull or invalidate the previous bucket. There are cases where we cannot afford to do another pull, e.g. statsd being killed. If we do not have enough time, we'll just invalidte the bucket to make sure we have correct data. Bug: 123866830 Test: atest statsd_test Change-Id: I090127cace3b7265032ebb2c9bddae976c883771 --- cmds/statsd/src/StatsLogProcessor.cpp | 35 ++-- cmds/statsd/src/StatsLogProcessor.h | 18 +- cmds/statsd/src/StatsService.cpp | 20 ++- cmds/statsd/src/metrics/CountMetricProducer.cpp | 7 +- cmds/statsd/src/metrics/CountMetricProducer.h | 4 +- cmds/statsd/src/metrics/DurationMetricProducer.cpp | 4 +- cmds/statsd/src/metrics/DurationMetricProducer.h | 4 +- cmds/statsd/src/metrics/EventMetricProducer.cpp | 1 + cmds/statsd/src/metrics/EventMetricProducer.h | 1 + cmds/statsd/src/metrics/GaugeMetricProducer.cpp | 6 +- cmds/statsd/src/metrics/GaugeMetricProducer.h | 7 +- cmds/statsd/src/metrics/MetricProducer.h | 22 ++- cmds/statsd/src/metrics/MetricsManager.cpp | 5 +- cmds/statsd/src/metrics/MetricsManager.h | 1 + cmds/statsd/src/metrics/ValueMetricProducer.cpp | 36 ++-- cmds/statsd/src/metrics/ValueMetricProducer.h | 7 +- cmds/statsd/tests/StatsLogProcessor_test.cpp | 12 +- cmds/statsd/tests/e2e/Attribution_e2e_test.cpp | 4 +- ...onInCondition_e2e_combination_AND_cond_test.cpp | 6 +- ...ionInCondition_e2e_combination_OR_cond_test.cpp | 8 +- .../DimensionInCondition_e2e_simple_cond_test.cpp | 6 +- .../statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp | 6 +- .../statsd/tests/e2e/GaugeMetric_e2e_push_test.cpp | 2 +- .../statsd/tests/e2e/MetricActivation_e2e_test.cpp | 2 +- .../tests/e2e/MetricConditionLink_e2e_test.cpp | 4 +- cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp | 2 +- .../statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp | 4 +- .../statsd/tests/e2e/WakelockDuration_e2e_test.cpp | 12 +- .../tests/metrics/ValueMetricProducer_test.cpp | 182 ++++++++++++++++++++- 29 files changed, 337 insertions(+), 91 deletions(-) diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp index 653ef2ec9869..a6699e7e6152 100644 --- a/cmds/statsd/src/StatsLogProcessor.cpp +++ b/cmds/statsd/src/StatsLogProcessor.cpp @@ -298,7 +298,7 @@ void StatsLogProcessor::GetActiveConfigsLocked(const int uid, vector& o void StatsLogProcessor::OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key, const StatsdConfig& config) { std::lock_guard lock(mMetricsMutex); - WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED); + WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED, NO_TIME_CONSTRAINTS); OnConfigUpdatedLocked(timestampNs, key, config); } @@ -355,6 +355,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const int64_t dumpTim const bool include_current_partial_bucket, const bool erase_data, const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency, ProtoOutputStream* proto) { std::lock_guard lock(mMetricsMutex); @@ -378,8 +379,10 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const int64_t dumpTim // Start of ConfigMetricsReport (reports). uint64_t reportsToken = proto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS); - onConfigMetricsReportLocked(key, dumpTimeStampNs, include_current_partial_bucket, - erase_data, dumpReportReason, proto); + onConfigMetricsReportLocked(key, dumpTimeStampNs, + include_current_partial_bucket, + erase_data, dumpReportReason, + dumpLatency, proto); proto->end(reportsToken); // End of ConfigMetricsReport (reports). } else { @@ -394,10 +397,11 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const int64_t dumpTim const bool include_current_partial_bucket, const bool erase_data, const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency, vector* outData) { ProtoOutputStream proto; onDumpReport(key, dumpTimeStampNs, include_current_partial_bucket, erase_data, - dumpReportReason, &proto); + dumpReportReason, dumpLatency, &proto); if (outData != nullptr) { outData->clear(); @@ -423,6 +427,7 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key, const bool include_current_partial_bucket, const bool erase_data, const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency, ProtoOutputStream* proto) { // We already checked whether key exists in mMetricsManagers in // WriteDataToDisk. @@ -438,7 +443,7 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key, // First, fill in ConfigMetricsReport using current data on memory, which // starts from filling in StatsLogReport's. it->second->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, - erase_data, &str_set, proto); + erase_data, dumpLatency, &str_set, proto); // Fill in UidMap if there is at least one metric to report. // This skips the uid map if it's an empty config. @@ -492,7 +497,7 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) } } if (configKeysTtlExpired.size() > 0) { - WriteDataToDiskLocked(CONFIG_RESET); + WriteDataToDiskLocked(CONFIG_RESET, NO_TIME_CONSTRAINTS); resetConfigsLocked(timestampNs, configKeysTtlExpired); } } @@ -501,7 +506,8 @@ void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { std::lock_guard lock(mMetricsMutex); auto it = mMetricsManagers.find(key); if (it != mMetricsManagers.end()) { - WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED); + WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED, + NO_TIME_CONSTRAINTS); mMetricsManagers.erase(it); mUidMap->OnConfigRemoved(key); } @@ -572,14 +578,15 @@ void StatsLogProcessor::flushIfNecessaryLocked( void StatsLogProcessor::WriteDataToDiskLocked(const ConfigKey& key, const int64_t timestampNs, - const DumpReportReason dumpReportReason) { + const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency) { if (mMetricsManagers.find(key) == mMetricsManagers.end() || !mMetricsManagers.find(key)->second->shouldWriteToDisk()) { return; } ProtoOutputStream proto; onConfigMetricsReportLocked(key, timestampNs, true /* include_current_partial_bucket*/, - true /* erase_data */, dumpReportReason, &proto); + true /* erase_data */, dumpReportReason, dumpLatency, &proto); string file_name = StringPrintf("%s/%ld_%d_%lld", STATS_DATA_DIR, (long)getWallClockSec(), key.GetUid(), (long long)key.GetId()); android::base::unique_fd fd(open(file_name.c_str(), @@ -658,7 +665,8 @@ void StatsLogProcessor::LoadMetricsActivationFromDisk() { StorageManager::deleteFile(file_name.c_str()); } -void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason) { +void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency) { const int64_t timeNs = getElapsedRealtimeNs(); // Do not write to disk if we already have in the last few seconds. // This is to avoid overwriting files that would have the same name if we @@ -671,13 +679,14 @@ void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportR } mLastWriteTimeNs = timeNs; for (auto& pair : mMetricsManagers) { - WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason); + WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason, dumpLatency); } } -void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason) { +void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency) { std::lock_guard lock(mMetricsMutex); - WriteDataToDiskLocked(dumpReportReason); + WriteDataToDiskLocked(dumpReportReason, dumpLatency); } void StatsLogProcessor::informPullAlarmFired(const int64_t timestampNs) { diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h index ea9c6e704017..e92b8971fd48 100644 --- a/cmds/statsd/src/StatsLogProcessor.h +++ b/cmds/statsd/src/StatsLogProcessor.h @@ -66,10 +66,14 @@ public: void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, - const DumpReportReason dumpReportReason, vector* outData); + const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency, + vector* outData); void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, - const DumpReportReason dumpReportReason, ProtoOutputStream* proto); + const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency, + ProtoOutputStream* proto); /* Tells MetricsManager that the alarms in alarmSet have fired. Modifies anomaly alarmSet. */ void onAnomalyAlarmFired( @@ -82,7 +86,8 @@ public: unordered_set, SpHash> alarmSet); /* Flushes data to disk. Data on memory will be gone after written to disk. */ - void WriteDataToDisk(const DumpReportReason dumpReportReason); + void WriteDataToDisk(const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency); /* Persist metric activation status onto disk. */ void WriteMetricsActivationToDisk(int64_t currentTimeNs); @@ -153,14 +158,17 @@ private: void GetActiveConfigsLocked(const int uid, vector& outActiveConfigs); - void WriteDataToDiskLocked(const DumpReportReason dumpReportReason); + void WriteDataToDiskLocked(const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency); void WriteDataToDiskLocked(const ConfigKey& key, const int64_t timestampNs, - const DumpReportReason dumpReportReason); + const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency); void onConfigMetricsReportLocked(const ConfigKey& key, const int64_t dumpTimeStampNs, const bool include_current_partial_bucket, const bool erase_data, const DumpReportReason dumpReportReason, + const DumpLatency dumpLatency, util::ProtoOutputStream* proto); /* Check if we should send a broadcast if approaching memory limits and if we're over, we diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp index c542b6215c88..4deb8bd83a9f 100644 --- a/cmds/statsd/src/StatsService.cpp +++ b/cmds/statsd/src/StatsService.cpp @@ -313,7 +313,9 @@ void StatsService::dumpIncidentSection(int out) { proto.start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS_LIST); mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(), true /* includeCurrentBucket */, false /* erase_data */, - ADB_DUMP, &proto); + ADB_DUMP, + FAST, + &proto); proto.end(reportsListToken); proto.flush(out); proto.clear(); @@ -694,7 +696,9 @@ status_t StatsService::cmd_dump_report(int out, const Vector& args) { if (good) { vector data; mProcessor->onDumpReport(ConfigKey(uid, StrToInt64(name)), getElapsedRealtimeNs(), - includeCurrentBucket, eraseData, ADB_DUMP, &data); + includeCurrentBucket, eraseData, ADB_DUMP, + NO_TIME_CONSTRAINTS, + &data); if (proto) { for (size_t i = 0; i < data.size(); i ++) { dprintf(out, "%c", data[i]); @@ -758,7 +762,7 @@ status_t StatsService::cmd_print_uid_map(int out, const Vector& args) { status_t StatsService::cmd_write_data_to_disk(int out) { dprintf(out, "Writing data to disk\n"); - mProcessor->WriteDataToDisk(ADB_DUMP); + mProcessor->WriteDataToDisk(ADB_DUMP, NO_TIME_CONSTRAINTS); return NO_ERROR; } @@ -958,7 +962,7 @@ Status StatsService::systemRunning() { Status StatsService::informDeviceShutdown() { ENFORCE_UID(AID_SYSTEM); VLOG("StatsService::informDeviceShutdown"); - mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN); + mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN, FAST); mProcessor->WriteMetricsActivationToDisk(getElapsedRealtimeNs()); return Status::ok(); } @@ -1000,7 +1004,7 @@ void StatsService::Startup() { void StatsService::Terminate() { ALOGI("StatsService::Terminating"); if (mProcessor != nullptr) { - mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED); + mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED, FAST); } } @@ -1017,8 +1021,10 @@ Status StatsService::getData(int64_t key, const String16& packageName, vectorgetCallingPid(), ipc->getCallingUid()); ConfigKey configKey(ipc->getCallingUid(), key); + // The dump latency does not matter here since we do not include the current bucket, we do not + // need to pull any new data anyhow. mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(), false /* include_current_bucket*/, - true /* erase_data */, GET_DATA_CALLED, output); + true /* erase_data */, GET_DATA_CALLED, FAST, output); return Status::ok(); } @@ -1312,7 +1318,7 @@ void StatsService::binderDied(const wp & who) { StatsdStats::getInstance().noteSystemServerRestart(getWallClockSec()); if (mProcessor != nullptr) { ALOGW("Reset statsd upon system server restarts."); - mProcessor->WriteDataToDisk(STATSCOMPANION_DIED); + mProcessor->WriteDataToDisk(STATSCOMPANION_DIED, FAST); mProcessor->resetConfigs(); } mAnomalyAlarmMonitor->setStatsCompanionService(nullptr); diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp index e84f88d407d3..96d144765081 100644 --- a/cmds/statsd/src/metrics/CountMetricProducer.cpp +++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp @@ -139,13 +139,13 @@ void CountMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition void CountMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { - flushIfNeededLocked(dumpTimeNs); mPastBuckets.clear(); } void CountMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { if (include_current_partial_bucket) { @@ -319,7 +319,7 @@ void CountMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { return; } - flushCurrentBucketLocked(eventTimeNs); + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); // Setup the bucket start time and number. int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; @@ -328,7 +328,8 @@ void CountMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { (long long)mCurrentBucketStartTimeNs); } -void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { +void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) { int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); CountBucket info; info.mBucketStartNs = mCurrentBucketStartTimeNs; diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h index 1ac4426468d8..b4a910c6f410 100644 --- a/cmds/statsd/src/metrics/CountMetricProducer.h +++ b/cmds/statsd/src/metrics/CountMetricProducer.h @@ -57,6 +57,7 @@ private: void onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) override; @@ -78,7 +79,8 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const int64_t& newEventTime) override; - void flushCurrentBucketLocked(const int64_t& eventTimeNs) override; + void flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) override; std::unordered_map> mPastBuckets; diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp index da6b97cc4e59..5e4594b4e00a 100644 --- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp +++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp @@ -456,6 +456,7 @@ void DurationMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { void DurationMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { if (include_current_partial_bucket) { @@ -581,7 +582,8 @@ void DurationMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { mCurrentBucketNum += numBucketsForward; } -void DurationMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { +void DurationMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) { for (auto whatIt = mCurrentSlicedDurationTrackerMap.begin(); whatIt != mCurrentSlicedDurationTrackerMap.end();) { for (auto it = whatIt->second.begin(); it != whatIt->second.end();) { diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h index ec561b527025..f711df240f5e 100644 --- a/cmds/statsd/src/metrics/DurationMetricProducer.h +++ b/cmds/statsd/src/metrics/DurationMetricProducer.h @@ -64,6 +64,7 @@ private: void onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) override; @@ -88,7 +89,8 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const int64_t& eventTime); - void flushCurrentBucketLocked(const int64_t& eventTimeNs) override; + void flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) override; const DurationMetric_AggregationType mAggregationType; diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp index 3b4af6533e34..5435c8420519 100644 --- a/cmds/statsd/src/metrics/EventMetricProducer.cpp +++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp @@ -108,6 +108,7 @@ void EventMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { void EventMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h index 96adfdd48743..74e6bc845c04 100644 --- a/cmds/statsd/src/metrics/EventMetricProducer.h +++ b/cmds/statsd/src/metrics/EventMetricProducer.h @@ -48,6 +48,7 @@ private: void onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) override; void clearPastBucketsLocked(const int64_t dumpTimeNs) override; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index 63017936b1db..7b001b3b8bf4 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -184,6 +184,7 @@ void GaugeMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { VLOG("Gauge metric %lld report now...", (long long)mMetricId); @@ -528,7 +529,7 @@ void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { return; } - flushCurrentBucketLocked(eventTimeNs); + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); // Adjusts the bucket start and end times. int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; @@ -538,7 +539,8 @@ void GaugeMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { (long long)mCurrentBucketStartTimeNs); } -void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { +void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) { int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); GaugeBucket info; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index 64a18337481b..9b99fb1fee99 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -82,8 +82,7 @@ public: // Flush full buckets on the normal path up to the latest bucket boundary. flushIfNeededLocked(eventTimeNs); } - flushCurrentBucketLocked(eventTimeNs); - mCurrentBucketStartTimeNs = eventTimeNs; + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) { pullAndMatchEventsLocked(eventTimeNs); } @@ -99,6 +98,7 @@ private: void onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) override; void clearPastBucketsLocked(const int64_t dumpTimeNs) override; @@ -119,7 +119,8 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const int64_t& eventTime) override; - void flushCurrentBucketLocked(const int64_t& eventTimeNs) override; + void flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) override; void pullAndMatchEventsLocked(const int64_t timestampNs); diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h index 8ab3b0680276..99cb5d4389c7 100644 --- a/cmds/statsd/src/metrics/MetricProducer.h +++ b/cmds/statsd/src/metrics/MetricProducer.h @@ -46,6 +46,16 @@ enum ActivationState { kActiveOnBoot = 2, }; +enum DumpLatency { + // In some cases, we only have a short time range to do the dump, e.g. statsd is being killed. + // We might be able to return all the data in this mode. For instance, pull metrics might need + // to be pulled when the current bucket is requested. + FAST = 1, + // In other cases, it is fine for a dump to take more than a few milliseconds, e.g. config + // updates. + NO_TIME_CONSTRAINTS = 2 +}; + // A MetricProducer is responsible for compute one single metrics, creating stats log report, and // writing the report to dropbox. MetricProducers should respond to package changes as required in // PackageInfoListener, but if none of the metrics are slicing by package name, then the update can @@ -87,8 +97,7 @@ public: flushIfNeededLocked(eventTimeNs); } // Now flush a partial bucket. - flushCurrentBucketLocked(eventTimeNs); - mCurrentBucketStartTimeNs = eventTimeNs; + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); // Don't update the current bucket number so that the anomaly tracker knows this bucket // is a partial bucket and can merge it with the previous bucket. }; @@ -135,11 +144,12 @@ public: void onDumpReport(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) { std::lock_guard lock(mMutex); return onDumpReportLocked(dumpTimeNs, include_current_partial_bucket, erase_data, - str_set, protoOutput); + dumpLatency, str_set, protoOutput); } void clearPastBuckets(const int64_t dumpTimeNs) { @@ -239,6 +249,7 @@ protected: virtual void onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) = 0; virtual void clearPastBucketsLocked(const int64_t dumpTimeNs) = 0; @@ -270,7 +281,7 @@ protected: */ virtual void flushLocked(const int64_t& eventTimeNs) { flushIfNeededLocked(eventTimeNs); - flushCurrentBucketLocked(eventTimeNs); + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); }; /** @@ -283,7 +294,8 @@ protected: * flushIfNeededLocked or the app upgrade handler; the caller MUST update the bucket timestamp * and bucket number as needed. */ - virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs){}; + virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) {}; // Convenience to compute the current bucket's end time, which is always aligned with the // start time of the metric. diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp index 4851a8d40baa..4b3bfd3c8b86 100644 --- a/cmds/statsd/src/metrics/MetricsManager.cpp +++ b/cmds/statsd/src/metrics/MetricsManager.cpp @@ -217,6 +217,7 @@ void MetricsManager::dropData(const int64_t dropTimeNs) { void MetricsManager::onDumpReport(const int64_t dumpTimeStampNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { VLOG("=========================Metric Reports Start=========================="); @@ -227,10 +228,10 @@ void MetricsManager::onDumpReport(const int64_t dumpTimeStampNs, FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS); if (mHashStringsInReport) { producer->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, erase_data, - str_set, protoOutput); + dumpLatency, str_set, protoOutput); } else { producer->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, erase_data, - nullptr, protoOutput); + dumpLatency, nullptr, protoOutput); } protoOutput->end(token); } else { diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h index eab1f762b390..390446068eeb 100644 --- a/cmds/statsd/src/metrics/MetricsManager.h +++ b/cmds/statsd/src/metrics/MetricsManager.h @@ -121,6 +121,7 @@ public: virtual void onDumpReport(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 3cf378d7d7ce..e94b75c16fbe 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -188,13 +188,28 @@ void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) { void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, ProtoOutputStream* protoOutput) { VLOG("metric %lld dump report now...", (long long)mMetricId); + flushIfNeededLocked(dumpTimeNs); if (include_current_partial_bucket) { - flushLocked(dumpTimeNs); - } else { - flushIfNeededLocked(dumpTimeNs); + // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the + // current bucket will have incomplete data and the next will have the wrong snapshot to do + // a diff against. If the condition is false, we are fine since the base data is reset and + // we are not tracking anything. + bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue; + if (pullNeeded) { + switch (dumpLatency) { + case FAST: + invalidateCurrentBucket(); + break; + case NO_TIME_CONSTRAINTS: + pullAndMatchEventsLocked(dumpTimeNs); + break; + } + } + flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs); } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked()); @@ -712,23 +727,21 @@ void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) { return; } - flushCurrentBucketLocked(eventTimeNs); - int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs; - mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; - mCurrentBucketNum += numBucketsForward; + int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs; + flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs); + mCurrentBucketNum += numBucketsForward; if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId); // take base again in future good bucket. resetBase(); } - VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, - (long long)mCurrentBucketStartTimeNs); } -void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { +void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) { if (mCondition == ConditionState::kUnknown) { StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); } @@ -758,6 +771,9 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { } initCurrentSlicedBucket(); mCurrentBucketIsInvalid = false; + mCurrentBucketStartTimeNs = nextBucketStartTimeNs; + VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId, + (long long)mCurrentBucketStartTimeNs); } ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime, diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index f26ad85acf05..696d4fa7ae45 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -65,8 +65,7 @@ public: if (mIsPulled && mCondition) { pullAndMatchEventsLocked(eventTimeNs - 1); } - flushCurrentBucketLocked(eventTimeNs); - mCurrentBucketStartTimeNs = eventTimeNs; + flushCurrentBucketLocked(eventTimeNs, eventTimeNs); }; protected: @@ -79,6 +78,7 @@ private: void onDumpReportLocked(const int64_t dumpTimeNs, const bool include_current_partial_bucket, const bool erase_data, + const DumpLatency dumpLatency, std::set *str_set, android::util::ProtoOutputStream* protoOutput) override; void clearPastBucketsLocked(const int64_t dumpTimeNs) override; @@ -97,7 +97,8 @@ private: // Util function to flush the old packet. void flushIfNeededLocked(const int64_t& eventTime) override; - void flushCurrentBucketLocked(const int64_t& eventTimeNs) override; + void flushCurrentBucketLocked(const int64_t& eventTimeNs, + const int64_t& nextBucketStartTimeNs) override; void dropDataLocked(const int64_t dropTimeNs) override; diff --git a/cmds/statsd/tests/StatsLogProcessor_test.cpp b/cmds/statsd/tests/StatsLogProcessor_test.cpp index 4579ca6008ef..88aa1800c75a 100644 --- a/cmds/statsd/tests/StatsLogProcessor_test.cpp +++ b/cmds/statsd/tests/StatsLogProcessor_test.cpp @@ -173,7 +173,7 @@ TEST(StatsLogProcessorTest, TestUidMapHasSnapshot) { // Expect to get no metrics, but snapshot specified above in uidmap. vector bytes; - p.onDumpReport(key, 1, false, true, ADB_DUMP, &bytes); + p.onDumpReport(key, 1, false, true, ADB_DUMP, FAST, &bytes); ConfigMetricsReportList output; output.ParseFromArray(bytes.data(), bytes.size()); @@ -204,7 +204,7 @@ TEST(StatsLogProcessorTest, TestEmptyConfigHasNoUidMap) { // Expect to get no metrics, but snapshot specified above in uidmap. vector bytes; - p.onDumpReport(key, 1, false, true, ADB_DUMP, &bytes); + p.onDumpReport(key, 1, false, true, ADB_DUMP, FAST, &bytes); ConfigMetricsReportList output; output.ParseFromArray(bytes.data(), bytes.size()); @@ -235,7 +235,7 @@ TEST(StatsLogProcessorTest, TestReportIncludesSubConfig) { // Expect to get no metrics, but snapshot specified above in uidmap. vector bytes; - p.onDumpReport(key, 1, false, true, ADB_DUMP, &bytes); + p.onDumpReport(key, 1, false, true, ADB_DUMP, FAST, &bytes); ConfigMetricsReportList output; output.ParseFromArray(bytes.data(), bytes.size()); @@ -269,21 +269,21 @@ TEST(StatsLogProcessorTest, TestOnDumpReportEraseData) { ConfigMetricsReportList output; // Dump report WITHOUT erasing data. - processor->onDumpReport(cfgKey, 3, true, false /* Do NOT erase data. */, ADB_DUMP, &bytes); + processor->onDumpReport(cfgKey, 3, true, false /* Do NOT erase data. */, ADB_DUMP, FAST, &bytes); output.ParseFromArray(bytes.data(), bytes.size()); EXPECT_EQ(output.reports_size(), 1); EXPECT_EQ(output.reports(0).metrics_size(), 1); EXPECT_EQ(output.reports(0).metrics(0).count_metrics().data_size(), 1); // Dump report WITH erasing data. There should be data since we didn't previously erase it. - processor->onDumpReport(cfgKey, 4, true, true /* DO erase data. */, ADB_DUMP, &bytes); + processor->onDumpReport(cfgKey, 4, true, true /* DO erase data. */, ADB_DUMP, FAST, &bytes); output.ParseFromArray(bytes.data(), bytes.size()); EXPECT_EQ(output.reports_size(), 1); EXPECT_EQ(output.reports(0).metrics_size(), 1); EXPECT_EQ(output.reports(0).metrics(0).count_metrics().data_size(), 1); // Dump report again. There should be no data since we erased it. - processor->onDumpReport(cfgKey, 5, true, true /* DO erase data. */, ADB_DUMP, &bytes); + processor->onDumpReport(cfgKey, 5, true, true /* DO erase data. */, ADB_DUMP, FAST, &bytes); output.ParseFromArray(bytes.data(), bytes.size()); // We don't care whether statsd has a report, as long as it has no count metrics in it. bool noData = output.reports_size() == 0 diff --git a/cmds/statsd/tests/e2e/Attribution_e2e_test.cpp b/cmds/statsd/tests/e2e/Attribution_e2e_test.cpp index a9841c91ada2..33825250446a 100644 --- a/cmds/statsd/tests/e2e/Attribution_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/Attribution_e2e_test.cpp @@ -147,7 +147,7 @@ TEST(AttributionE2eTest, TestAttributionMatchAndSliceByFirstUid) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 4 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -295,7 +295,7 @@ TEST(AttributionE2eTest, TestAttributionMatchAndSliceByChain) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 4 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp b/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp index a8914da4b7fe..e4186b7200a0 100644 --- a/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp +++ b/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_AND_cond_test.cpp @@ -212,7 +212,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_NoLink_AND_CombinationCondi ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, - true, ADB_DUMP, &buffer); + true, ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -548,7 +548,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_Link_AND_CombinationConditi ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, - true, ADB_DUMP, &buffer); + true, ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -798,7 +798,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_PartialLink_AND_Combination ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, - true, ADB_DUMP, &buffer); + true, ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp b/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp index 621b6ed6ddbf..f3ecd56dd946 100644 --- a/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp +++ b/cmds/statsd/tests/e2e/DimensionInCondition_e2e_combination_OR_cond_test.cpp @@ -131,7 +131,7 @@ TEST(DimensionInConditionE2eTest, TestCreateCountMetric_NoLink_OR_CombinationCon ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -347,7 +347,7 @@ TEST(DimensionInConditionE2eTest, TestCreateCountMetric_Link_OR_CombinationCondi ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -531,7 +531,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_NoLink_OR_CombinationCondit ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -733,7 +733,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_Link_OR_CombinationConditio ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp b/cmds/statsd/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp index 9f8acaf18422..489bb0b21a2e 100644 --- a/cmds/statsd/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp +++ b/cmds/statsd/tests/e2e/DimensionInCondition_e2e_simple_cond_test.cpp @@ -143,7 +143,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_NoLink_SimpleCondition) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, - true, ADB_DUMP, &buffer); + true, ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -438,7 +438,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_Link_SimpleCondition) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, - true, ADB_DUMP, &buffer); + true, ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -659,7 +659,7 @@ TEST(DimensionInConditionE2eTest, TestDurationMetric_PartialLink_SimpleCondition ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp b/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp index 24a9980487ed..946eccfc3943 100644 --- a/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp +++ b/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp @@ -125,7 +125,7 @@ TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvents) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -248,7 +248,7 @@ TEST(GaugeMetricE2eTest, TestConditionChangeToTrueSamplePulledEvents) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, configAddedTimeNs + 8 * bucketSizeNs + 10, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -352,7 +352,7 @@ TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvent_LateAlarm) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/GaugeMetric_e2e_push_test.cpp b/cmds/statsd/tests/e2e/GaugeMetric_e2e_push_test.cpp index 3af8212087a1..cd80310c4e72 100644 --- a/cmds/statsd/tests/e2e/GaugeMetric_e2e_push_test.cpp +++ b/cmds/statsd/tests/e2e/GaugeMetric_e2e_push_test.cpp @@ -150,7 +150,7 @@ TEST(GaugeMetricE2eTest, TestMultipleFieldsForPushedEvent) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 3 * bucketSizeNs, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/MetricActivation_e2e_test.cpp b/cmds/statsd/tests/e2e/MetricActivation_e2e_test.cpp index 85d8a5613a8b..7fb43f8aa60e 100644 --- a/cmds/statsd/tests/e2e/MetricActivation_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/MetricActivation_e2e_test.cpp @@ -211,7 +211,7 @@ TEST(MetricActivationE2eTest, TestCountMetric) { ConfigMetricsReportList reports; vector buffer; processor.onDumpReport(cfgKey, bucketStartTimeNs + NS_PER_SEC * 60 * 15 + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/MetricConditionLink_e2e_test.cpp b/cmds/statsd/tests/e2e/MetricConditionLink_e2e_test.cpp index 9349c857c5b3..78fb391978b3 100644 --- a/cmds/statsd/tests/e2e/MetricConditionLink_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/MetricConditionLink_e2e_test.cpp @@ -200,7 +200,7 @@ TEST(MetricConditionLinkE2eTest, TestMultiplePredicatesAndLinks1) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs - 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -319,7 +319,7 @@ TEST(MetricConditionLinkE2eTest, TestMultiplePredicatesAndLinks2) { vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp index aee0c1f8592b..ef3643f32dc6 100644 --- a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp @@ -46,7 +46,7 @@ ConfigMetricsReport GetReports(sp processor, int64_t timestam IPCThreadState* ipc = IPCThreadState::self(); ConfigKey configKey(ipc->getCallingUid(), kConfigKey); processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/, - true /* erase_data */, ADB_DUMP, &output); + true /* erase_data */, ADB_DUMP, FAST, &output); ConfigMetricsReportList reports; reports.ParseFromArray(output.data(), output.size()); EXPECT_EQ(1, reports.reports_size()); diff --git a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp index f3c4e12d27c7..cdb5a78aedbf 100644 --- a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp @@ -121,7 +121,7 @@ TEST(ValueMetricE2eTest, TestPulledEvents) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, configAddedTimeNs + 7 * bucketSizeNs + 10, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -228,7 +228,7 @@ TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) { ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, configAddedTimeNs + 9 * bucketSizeNs + 10, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/e2e/WakelockDuration_e2e_test.cpp b/cmds/statsd/tests/e2e/WakelockDuration_e2e_test.cpp index 16be3d78f69b..e13bf1409989 100644 --- a/cmds/statsd/tests/e2e/WakelockDuration_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/WakelockDuration_e2e_test.cpp @@ -128,7 +128,7 @@ TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration1) vector buffer; ConfigMetricsReportList reports; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs - 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -165,7 +165,7 @@ TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration2) vector buffer; ConfigMetricsReportList reports; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -216,7 +216,7 @@ TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration3) } processor->onDumpReport(cfgKey, bucketStartTimeNs + 6 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -249,7 +249,7 @@ TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForMaxDuration1) ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs - 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); @@ -279,7 +279,7 @@ TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForMaxDuration2) ConfigMetricsReportList reports; vector buffer; processor->onDumpReport(cfgKey, bucketStartTimeNs + 2 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); @@ -325,7 +325,7 @@ TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForMaxDuration3) } processor->onDumpReport(cfgKey, bucketStartTimeNs + 6 * bucketSizeNs + 1, false, true, - ADB_DUMP, &buffer); + ADB_DUMP, FAST, &buffer); EXPECT_TRUE(buffer.size() > 0); EXPECT_TRUE(reports.ParseFromArray(&buffer[0], buffer.size())); backfillDimensionPath(&reports); diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 7e7ffeda8053..a9d2c8810adc 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -2797,7 +2797,6 @@ TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary) { EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[0].values[0].long_value); } - TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries) { ValueMetric metric; metric.set_id(metricId); @@ -2864,6 +2863,187 @@ TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries) { EXPECT_EQ(true, valueProducer.mHasGlobalBase); } +static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) { + vector bytes; + bytes.resize(proto->size()); + size_t pos = 0; + auto iter = proto->data(); + while (iter.readBuffer() != NULL) { + size_t toRead = iter.currentToRead(); + std::memcpy(&((bytes)[pos]), iter.readBuffer(), toRead); + pos += toRead; + iter.rp()->move(toRead); + } + + StatsLogReport report; + report.ParseFromArray(bytes.data(), bytes.size()); + return report; +} + +TEST(ValueMetricProducerTest, TestPullNeededFastDump) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp wizard = new NaggyMock(); + sp pullerManager = new StrictMock(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Initial pull. + .WillOnce(Invoke([](int tagId, vector>* data) { + data->clear(); + shared_ptr event = make_shared(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(1); + event->write(1); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + ProtoOutputStream output; + std::set strSet; + valueProducer.onDumpReport(bucketStartTimeNs + 10, + true /* include recent buckets */, true, + FAST, &strSet, &output); + + StatsLogReport report = outputStreamToProto(&output); + // Bucket is invalid since we did not pull when dump report was called. + EXPECT_EQ(0, report.value_metrics().data_size()); +} + +TEST(ValueMetricProducerTest, TestFastDumpWithoutCurrentBucket) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp wizard = new NaggyMock(); + sp pullerManager = new StrictMock(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Initial pull. + .WillOnce(Invoke([](int tagId, vector>* data) { + data->clear(); + shared_ptr event = make_shared(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(1); + event->write(1); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + vector> allData; + allData.clear(); + shared_ptr event = make_shared(tagId, bucket2StartTimeNs + 1); + event->write(tagId); + event->write(2); + event->write(2); + event->init(); + allData.push_back(event); + valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs); + + ProtoOutputStream output; + std::set strSet; + valueProducer.onDumpReport(bucket4StartTimeNs, + false /* include recent buckets */, true, + FAST, &strSet, &output); + + StatsLogReport report = outputStreamToProto(&output); + // Previous bucket is part of the report. + EXPECT_EQ(1, report.value_metrics().data_size()); + EXPECT_EQ(0, report.value_metrics().data(0).bucket_info(0).bucket_num()); +} + +TEST(ValueMetricProducerTest, TestPullNeededNoTimeConstraints) { + ValueMetric metric; + metric.set_id(metricId); + metric.set_bucket(ONE_MINUTE); + metric.mutable_value_field()->set_field(tagId); + metric.mutable_value_field()->add_child()->set_field(2); + metric.set_max_pull_delay_sec(INT_MAX); + + UidMap uidMap; + SimpleAtomMatcher atomMatcher; + atomMatcher.set_atom_id(tagId); + sp eventMatcherWizard = + new EventMatcherWizard({new SimpleLogMatchingTracker( + atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)}); + sp wizard = new NaggyMock(); + sp pullerManager = new StrictMock(); + EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return()); + EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return()); + + EXPECT_CALL(*pullerManager, Pull(tagId, _)) + // Initial pull. + .WillOnce(Invoke([](int tagId, vector>* data) { + data->clear(); + shared_ptr event = make_shared(tagId, bucketStartTimeNs); + event->write(tagId); + event->write(1); + event->write(1); + event->init(); + data->push_back(event); + return true; + })) + .WillOnce(Invoke([](int tagId, vector>* data) { + data->clear(); + shared_ptr event = make_shared(tagId, bucketStartTimeNs + 10); + event->write(tagId); + event->write(3); + event->write(3); + event->init(); + data->push_back(event); + return true; + })); + + ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, logEventMatcherIndex, + eventMatcherWizard, tagId, bucketStartTimeNs, + bucketStartTimeNs, pullerManager); + + ProtoOutputStream output; + std::set strSet; + valueProducer.onDumpReport(bucketStartTimeNs + 10, + true /* include recent buckets */, true, + NO_TIME_CONSTRAINTS, &strSet, &output); + + StatsLogReport report = outputStreamToProto(&output); + EXPECT_EQ(1, report.value_metrics().data_size()); + EXPECT_EQ(1, report.value_metrics().data(0).bucket_info_size()); + EXPECT_EQ(2, report.value_metrics().data(0).bucket_info(0).values(0).value_long()); +} + + } // namespace statsd } // namespace os } // namespace android -- cgit v1.2.3-59-g8ed1b