diff options
| author | 2018-01-09 10:03:01 +0000 | |
|---|---|---|
| committer | 2018-01-09 10:03:01 +0000 | |
| commit | c131a9297b4acadce2127c950e3ffec5a7e40973 (patch) | |
| tree | ec91a3fbe18d1abcf1851e992f0eb9167819ed04 | |
| parent | 0b921eacb59d5683f08c7759049c4fb235ce9a9f (diff) | |
| parent | b0d0628a2915449db2c4ec071bea0cdeff3af210 (diff) | |
Merge changes Ibe8c8d3c,I4a475d6f
* changes:
Thread-safety at log processor level.
Handle null string in jni and c++ stats-log-api interfaces.
| -rw-r--r-- | cmds/statsd/src/StatsLogProcessor.cpp | 23 | ||||
| -rw-r--r-- | cmds/statsd/src/StatsLogProcessor.h | 6 | ||||
| -rw-r--r-- | cmds/statsd/tests/StatsLogProcessor_test.cpp | 12 | ||||
| -rw-r--r-- | tools/stats_log_api_gen/main.cpp | 23 |
4 files changed, 39 insertions, 25 deletions
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp index 991badcdddac..a9e0f233d0dd 100644 --- a/cmds/statsd/src/StatsLogProcessor.cpp +++ b/cmds/statsd/src/StatsLogProcessor.cpp @@ -82,9 +82,7 @@ StatsLogProcessor::~StatsLogProcessor() { void StatsLogProcessor::onAnomalyAlarmFired( const uint64_t timestampNs, unordered_set<sp<const AnomalyAlarm>, SpHash<AnomalyAlarm>> anomalySet) { - // TODO: This is a thread-safety issue. mMetricsManagers could change under our feet. - // TODO: Solution? Lock everything! :( - // TODO: Question: Can we replace the other lock (broadcast), or do we need to supplement it? + std::lock_guard<std::mutex> lock(mMetricsMutex); for (const auto& itr : mMetricsManagers) { itr.second->onAnomalyAlarmFired(timestampNs, anomalySet); } @@ -92,11 +90,13 @@ void StatsLogProcessor::onAnomalyAlarmFired( // TODO: what if statsd service restarts? How do we know what logs are already processed before? void StatsLogProcessor::OnLogEvent(const LogEvent& msg) { + std::lock_guard<std::mutex> lock(mMetricsMutex); + StatsdStats::getInstance().noteAtomLogged(msg.GetTagId(), msg.GetTimestampNs() / NS_PER_SEC); // pass the event to metrics managers. for (auto& pair : mMetricsManagers) { pair.second->onLogEvent(msg); - flushIfNecessary(msg.GetTimestampNs(), pair.first, *(pair.second)); + flushIfNecessaryLocked(msg.GetTimestampNs(), pair.first, *(pair.second)); } // Hard-coded logic to update the isolated uid's in the uid-map. // The field numbers need to be currently updated by hand with atoms.proto @@ -116,6 +116,7 @@ void StatsLogProcessor::OnLogEvent(const LogEvent& msg) { } void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config) { + std::lock_guard<std::mutex> lock(mMetricsMutex); ALOGD("Updated configuration for key %s", key.ToString().c_str()); sp<MetricsManager> newMetricsManager = new MetricsManager(key, config, mTimeBaseSec, mUidMap); auto it = mMetricsManagers.find(key); @@ -142,6 +143,7 @@ void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig } size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) const { + std::lock_guard<std::mutex> lock(mMetricsMutex); auto it = mMetricsManagers.find(key); if (it == mMetricsManagers.end()) { ALOGW("Config source %s does not exist", key.ToString().c_str()); @@ -152,6 +154,7 @@ size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) const { void StatsLogProcessor::onDumpReport(const ConfigKey& key, const uint64_t& dumpTimeStampNs, ConfigMetricsReportList* report) { + std::lock_guard<std::mutex> lock(mMetricsMutex); auto it = mMetricsManagers.find(key); if (it == mMetricsManagers.end()) { ALOGW("Config source %s does not exist", key.ToString().c_str()); @@ -165,6 +168,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, const uint64_t& dumpT } void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) { + std::lock_guard<std::mutex> lock(mMetricsMutex); auto it = mMetricsManagers.find(key); if (it == mMetricsManagers.end()) { ALOGW("Config source %s does not exist", key.ToString().c_str()); @@ -173,9 +177,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD // This allows another broadcast to be sent within the rate-limit period if we get close to // filling the buffer again soon. - mBroadcastTimesMutex.lock(); mLastBroadcastTimes.erase(key); - mBroadcastTimesMutex.unlock(); ProtoOutputStream proto; @@ -224,6 +226,7 @@ void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outD } void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { + std::lock_guard<std::mutex> lock(mMetricsMutex); auto it = mMetricsManagers.find(key); if (it != mMetricsManagers.end()) { mMetricsManagers.erase(it); @@ -231,14 +234,11 @@ void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) { } StatsdStats::getInstance().noteConfigRemoved(key); - std::lock_guard<std::mutex> lock(mBroadcastTimesMutex); mLastBroadcastTimes.erase(key); } -void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey& key, - MetricsManager& metricsManager) { - std::lock_guard<std::mutex> lock(mBroadcastTimesMutex); - +void StatsLogProcessor::flushIfNecessaryLocked( + uint64_t timestampNs, const ConfigKey& key, MetricsManager& metricsManager) { auto lastCheckTime = mLastByteSizeTimes.find(key); if (lastCheckTime != mLastByteSizeTimes.end()) { if (timestampNs - lastCheckTime->second < StatsdStats::kMinByteSizeCheckPeriodNs) { @@ -274,6 +274,7 @@ void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey& void StatsLogProcessor::WriteDataToDisk() { mkdir(STATS_DATA_DIR, S_IRWXU); + std::lock_guard<std::mutex> lock(mMetricsMutex); for (auto& pair : mMetricsManagers) { const ConfigKey& key = pair.first; vector<uint8_t> data; diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h index f62fc4e31c0a..b527e2790a0c 100644 --- a/cmds/statsd/src/StatsLogProcessor.h +++ b/cmds/statsd/src/StatsLogProcessor.h @@ -57,7 +57,7 @@ public: void WriteDataToDisk(); private: - mutable mutex mBroadcastTimesMutex; + mutable mutex mMetricsMutex; std::unordered_map<ConfigKey, sp<MetricsManager>> mMetricsManagers; @@ -72,8 +72,8 @@ private: /* Check if we should send a broadcast if approaching memory limits and if we're over, we * actually delete the data. */ - void flushIfNecessary(uint64_t timestampNs, const ConfigKey& key, - MetricsManager& metricsManager); + void flushIfNecessaryLocked(uint64_t timestampNs, const ConfigKey& key, + MetricsManager& metricsManager); // Function used to send a broadcast so that receiver for the config key can call getData // to retrieve the stored data. diff --git a/cmds/statsd/tests/StatsLogProcessor_test.cpp b/cmds/statsd/tests/StatsLogProcessor_test.cpp index 5d053e25003d..aab5bedb3cbe 100644 --- a/cmds/statsd/tests/StatsLogProcessor_test.cpp +++ b/cmds/statsd/tests/StatsLogProcessor_test.cpp @@ -60,9 +60,9 @@ TEST(StatsLogProcessorTest, TestRateLimitByteSize) { // Expect only the first flush to trigger a check for byte size since the last two are // rate-limited. EXPECT_CALL(mockMetricsManager, byteSize()).Times(1); - p.flushIfNecessary(99, key, mockMetricsManager); - p.flushIfNecessary(100, key, mockMetricsManager); - p.flushIfNecessary(101, key, mockMetricsManager); + p.flushIfNecessaryLocked(99, key, mockMetricsManager); + p.flushIfNecessaryLocked(100, key, mockMetricsManager); + p.flushIfNecessaryLocked(101, key, mockMetricsManager); } TEST(StatsLogProcessorTest, TestRateLimitBroadcast) { @@ -80,12 +80,12 @@ TEST(StatsLogProcessorTest, TestRateLimitBroadcast) { .WillRepeatedly(Return(int(StatsdStats::kMaxMetricsBytesPerConfig * .95))); // Expect only one broadcast despite always returning a size that should trigger broadcast. - p.flushIfNecessary(1, key, mockMetricsManager); + p.flushIfNecessaryLocked(1, key, mockMetricsManager); EXPECT_EQ(1, broadcastCount); // This next call to flush should not trigger a broadcast. p.mLastByteSizeTimes.clear(); // Force another check for byte size. - p.flushIfNecessary(2, key, mockMetricsManager); + p.flushIfNecessaryLocked(2, key, mockMetricsManager); EXPECT_EQ(1, broadcastCount); } @@ -106,7 +106,7 @@ TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge) { EXPECT_CALL(mockMetricsManager, onDumpReport(_)).Times(1); // Expect to call the onDumpReport and skip the broadcast. - p.flushIfNecessary(1, key, mockMetricsManager); + p.flushIfNecessaryLocked(1, key, mockMetricsManager); EXPECT_EQ(0, broadcastCount); } diff --git a/tools/stats_log_api_gen/main.cpp b/tools/stats_log_api_gen/main.cpp index 89749fb52bb4..bbe6d63073c1 100644 --- a/tools/stats_log_api_gen/main.cpp +++ b/tools/stats_log_api_gen/main.cpp @@ -166,7 +166,15 @@ static int write_stats_log_cpp(FILE *out, const Atoms &atoms, attributionDecl.fields.front().name.c_str()); fprintf(out, " event.begin();\n"); for (const auto &chainField : attributionDecl.fields) { - fprintf(out, " event << %s[i];\n", chainField.name.c_str()); + if (chainField.javaType == JAVA_TYPE_STRING) { + fprintf(out, " if (%s[i] != NULL) {\n", chainField.name.c_str()); + fprintf(out, " event << %s[i];\n", chainField.name.c_str()); + fprintf(out, " } else {\n"); + fprintf(out, " event << \"\";\n"); + fprintf(out, " }\n"); + } else { + fprintf(out, " event << %s[i];\n", chainField.name.c_str()); + } } fprintf(out, " event.end();\n"); fprintf(out, " }\n"); @@ -589,13 +597,18 @@ write_stats_log_jni(FILE* out, const Atoms& atoms, const AtomDecl &attributionDe fprintf(out, " jstring jstr = " "(jstring)env->GetObjectArrayElement(%s, i);\n", chainField.name.c_str()); - fprintf(out, " ScopedUtfChars* scoped_%s = " + fprintf(out, " if (jstr == NULL) {\n"); + fprintf(out, " %s_vec.push_back(NULL);\n", + chainField.name.c_str()); + fprintf(out, " } else {\n"); + fprintf(out, " ScopedUtfChars* scoped_%s = " "new ScopedUtfChars(env, jstr);\n", chainField.name.c_str()); - fprintf(out, " %s_vec.push_back(scoped_%s->c_str());\n", + fprintf(out, " %s_vec.push_back(scoped_%s->c_str());\n", chainField.name.c_str(), chainField.name.c_str()); - fprintf(out, " scoped_%s_vec.push_back(scoped_%s);\n", + fprintf(out, " scoped_%s_vec.push_back(scoped_%s);\n", chainField.name.c_str(), chainField.name.c_str()); + fprintf(out, " }\n"); fprintf(out, " }\n"); } fprintf(out, "\n"); @@ -648,7 +661,7 @@ write_stats_log_jni(FILE* out, const Atoms& atoms, const AtomDecl &attributionDe fprintf(out, " env->ReleaseIntArrayElements(%s, %s_array, 0);\n", chainField.name.c_str(), chainField.name.c_str()); } else if (chainField.javaType == JAVA_TYPE_STRING) { - fprintf(out, " for (size_t i = 0; i < %s_length; ++i) {\n", + fprintf(out, " for (size_t i = 0; i < scoped_%s_vec.size(); ++i) {\n", chainField.name.c_str()); fprintf(out, " delete scoped_%s_vec[i];\n", chainField.name.c_str()); fprintf(out, " }\n"); |