diff options
| author | 2018-10-16 21:46:49 +0000 | |
|---|---|---|
| committer | 2018-10-16 21:46:49 +0000 | |
| commit | dfa4c24aafd18b07077427b36d302414bceff5c1 (patch) | |
| tree | 7ec49036ee1e5b654b99207bc31276ed51d91451 | |
| parent | 4eb4f745f88b6515ff2090e1fd941abdf8c22b21 (diff) | |
| parent | 41e606c1fce2def938bf093896c7bedcf642be1a (diff) | |
Merge "Add pulled atom subscription for shell."
| -rw-r--r-- | cmds/statsd/Android.mk | 1 | ||||
| -rw-r--r-- | cmds/statsd/src/StatsService.cpp | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 108 | ||||
| -rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.h | 25 | ||||
| -rw-r--r-- | cmds/statsd/src/shell/shell_config.proto | 2 | ||||
| -rw-r--r-- | cmds/statsd/src/shell/shell_data.proto | 29 | ||||
| -rw-r--r-- | cmds/statsd/tests/shell/ShellSubscriber_test.cpp | 134 |
7 files changed, 259 insertions, 42 deletions
diff --git a/cmds/statsd/Android.mk b/cmds/statsd/Android.mk index f6b0db80f3ad..c396cd130f93 100644 --- a/cmds/statsd/Android.mk +++ b/cmds/statsd/Android.mk @@ -189,6 +189,7 @@ LOCAL_SRC_FILES := \ src/atom_field_options.proto \ src/atoms.proto \ src/stats_log.proto \ + src/shell/shell_data.proto \ tests/AlarmMonitor_test.cpp \ tests/anomaly/AlarmTracker_test.cpp \ tests/anomaly/AnomalyTracker_test.cpp \ diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp index fb6f8c8d4590..ce2877731882 100644 --- a/cmds/statsd/src/StatsService.cpp +++ b/cmds/statsd/src/StatsService.cpp @@ -319,7 +319,7 @@ status_t StatsService::command(int in, int out, int err, Vector<String8>& args, } if (!args[0].compare(String8("data-subscribe"))) { if (mShellSubscriber == nullptr) { - mShellSubscriber = new ShellSubscriber(mUidMap); + mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager); } mShellSubscriber->startNewSubscription(in, out, resultReceiver); return NO_ERROR; diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index 1306a467e5c4..dffff7a96269 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -18,9 +18,9 @@ #include "ShellSubscriber.h" -#include "matchers/matcher_util.h" - #include <android-base/file.h> +#include "matchers/matcher_util.h" +#include "stats_log_util.h" using android::util::ProtoOutputStream; @@ -28,6 +28,8 @@ namespace android { namespace os { namespace statsd { +const static int FIELD_ID_ATOM = 1; + void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) { VLOG("start new shell subscription"); { @@ -42,25 +44,106 @@ void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> IInterface::asBinder(mResultReceiver)->linkToDeath(this); } - // Spawn another thread to read the config updates from the input file descriptor - std::thread reader([in, this] { readConfig(in); }); - reader.detach(); + // Note that the following is blocking, and it's intended as we cannot return until the shell + // cmd exits, otherwise all resources & FDs will be automatically closed. - std::unique_lock<std::mutex> lk(mMutex); + // Read config forever until EOF is reached. Clients may send multiple configs -- each new + // config replace the previous one. + readConfig(in); + // Now we have read an EOF we now wait for the semaphore until the client exits. + VLOG("Now wait for client to exit"); + std::unique_lock<std::mutex> lk(mMutex); mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; }); - if (reader.joinable()) { - reader.join(); - } } void ShellSubscriber::updateConfig(const ShellSubscription& config) { std::lock_guard<std::mutex> lock(mMutex); mPushedMatchers.clear(); + mPulledInfo.clear(); + for (const auto& pushed : config.pushed()) { mPushedMatchers.push_back(pushed); VLOG("adding matcher for atom %d", pushed.atom_id()); } + + int64_t token = getElapsedRealtimeNs(); + mPullToken = token; + + int64_t minInterval = -1; + for (const auto& pulled : config.pulled()) { + // All intervals need to be multiples of the min interval. + if (minInterval < 0 || pulled.freq_millis() < minInterval) { + minInterval = pulled.freq_millis(); + } + + mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); + VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); + } + + if (mPulledInfo.size() > 0 && minInterval > 0) { + // This thread is guaranteed to terminate after it detects the token is different or + // cleaned up. + std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); + puller.detach(); + } +} + +void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, + const SimpleAtomMatcher& matcher) { + if (mOutput == 0) return; + int count = 0; + mProto.clear(); + for (const auto& event : data) { + VLOG("%s", event->ToString().c_str()); + if (matchesSimple(*mUidMap, matcher, *event)) { + VLOG("matched"); + count++; + uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | + util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); + event->ToProto(mProto); + mProto.end(atomToken); + } + } + + if (count > 0) { + // First write the payload size. + size_t bufferSize = mProto.size(); + write(mOutput, &bufferSize, sizeof(bufferSize)); + VLOG("%d atoms, proto size: %zu", count, bufferSize); + // Then write the payload. + mProto.flush(mOutput); + } + mProto.clear(); +} + +void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { + while (1) { + int64_t nowMillis = getElapsedRealtimeMillis(); + { + std::lock_guard<std::mutex> lock(mMutex); + if (mPulledInfo.size() == 0 || mPullToken != token) { + VLOG("Pulling thread %lld done!", (long long)token); + return; + } + for (auto& pullInfo : mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { + VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); + + vector<std::shared_ptr<LogEvent>> data; + mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L, + &data); + VLOG("pulled %zu atoms", data.size()); + if (data.size() > 0) { + writeToOutputLocked(data, pullInfo.mPullerMatcher); + } + pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; + } + } + } + VLOG("Pulling thread %lld sleep....", (long long)token); + std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); + } } void ShellSubscriber::readConfig(int in) { @@ -101,6 +184,8 @@ void ShellSubscriber::cleanUpLocked() { mOutput = 0; mResultReceiver = nullptr; mPushedMatchers.clear(); + mPulledInfo.clear(); + mPullToken = 0; VLOG("done clean up"); } @@ -110,10 +195,13 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { if (mOutput <= 0) { return; } - for (const auto& matcher : mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { + VLOG("%s", event.ToString().c_str()); + uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | + util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); + mProto.end(atomToken); // First write the payload size. size_t bufferSize = mProto.size(); write(mOutput, &bufferSize, sizeof(bufferSize)); diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h index 0ace35fab850..5401f31ce68c 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.h +++ b/cmds/statsd/src/shell/ShellSubscriber.h @@ -24,6 +24,7 @@ #include <mutex> #include <string> #include <thread> +#include "external/StatsPullerManager.h" #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h" #include "packages/UidMap.h" @@ -51,14 +52,15 @@ namespace statsd { * with sizeof(size_t) bytes indicating the size of the proto message payload. * * The stream would be in the following format: - * |size_t|atom1 proto|size_t|atom2 proto|.... + * |size_t|shellData proto|size_t|shellData proto|.... * * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread * until it exits. */ class ShellSubscriber : public virtual IBinder::DeathRecipient { public: - ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){}; + ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr) + : mUidMap(uidMap), mPullerMgr(pullerMgr){}; /** * Start a new subscription. @@ -70,15 +72,28 @@ public: void onLogEvent(const LogEvent& event); private: + struct PullInfo { + PullInfo(const SimpleAtomMatcher& matcher, int64_t interval) + : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) { + } + SimpleAtomMatcher mPullerMatcher; + int64_t mInterval; + int64_t mPrevPullElapsedRealtimeMs; + }; void readConfig(int in); void updateConfig(const ShellSubscription& config); + void startPull(int64_t token, int64_t intervalMillis); + void cleanUpLocked(); + void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, + const SimpleAtomMatcher& matcher); + sp<UidMap> mUidMap; - // bool mWritten = false; + sp<StatsPullerManager> mPullerMgr; android::util::ProtoOutputStream mProto; @@ -93,6 +108,10 @@ private: sp<IResultReceiver> mResultReceiver; std::vector<SimpleAtomMatcher> mPushedMatchers; + + std::vector<PullInfo> mPulledInfo; + + int64_t mPullToken = 0; // A unique token to identify a puller thread. }; } // namespace statsd diff --git a/cmds/statsd/src/shell/shell_config.proto b/cmds/statsd/src/shell/shell_config.proto index 516693d4e7f7..73cb49a61821 100644 --- a/cmds/statsd/src/shell/shell_config.proto +++ b/cmds/statsd/src/shell/shell_config.proto @@ -24,7 +24,7 @@ option java_outer_classname = "ShellConfig"; import "frameworks/base/cmds/statsd/src/statsd_config.proto"; message PulledAtomSubscription { - optional int32 atom_id = 1; + optional SimpleAtomMatcher matcher = 1; /* gap between two pulls in milliseconds */ optional int32 freq_millis = 2; diff --git a/cmds/statsd/src/shell/shell_data.proto b/cmds/statsd/src/shell/shell_data.proto new file mode 100644 index 000000000000..236bdbdd31f6 --- /dev/null +++ b/cmds/statsd/src/shell/shell_data.proto @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto2"; + +package android.os.statsd; + +option java_package = "com.android.os.statsd"; +option java_outer_classname = "ShellDataProto"; + +import "frameworks/base/cmds/statsd/src/atoms.proto"; + +// The output of shell subscription, including both pulled and pushed subscriptions. +message ShellData { + repeated Atom atom = 1; +} diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp index b380b03e28d0..dd00561854fb 100644 --- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp +++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp @@ -17,6 +17,7 @@ #include <unistd.h> #include "frameworks/base/cmds/statsd/src/atoms.pb.h" #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" +#include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h" #include "src/shell/ShellSubscriber.h" #include "tests/metrics/metrics_test_helper.h" @@ -26,7 +27,10 @@ using namespace android::os::statsd; using android::sp; using std::vector; +using testing::_; +using testing::Invoke; using testing::NaggyMock; +using testing::StrictMock; #ifdef __ANDROID__ @@ -51,7 +55,10 @@ public: } }; -TEST(ShellSubscriberTest, testPushedSubscription) { +void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap, + sp<MockStatsPullerManager> pullerManager, + const vector<std::shared_ptr<LogEvent>>& pushedEvents, + const ShellData& expectedData) { // set up 2 pipes for read/write config and data int fds_config[2]; ASSERT_EQ(0, pipe(fds_config)); @@ -59,10 +66,6 @@ TEST(ShellSubscriberTest, testPushedSubscription) { int fds_data[2]; ASSERT_EQ(0, pipe(fds_data)); - // create a simple config to get screen events - ShellSubscription config; - config.add_pushed()->set_atom_id(29); - size_t bufferSize = config.ByteSize(); // write the config to pipe, first write size of the config @@ -75,15 +78,9 @@ TEST(ShellSubscriberTest, testPushedSubscription) { write(fds_config[1], buffer.data(), bufferSize); close(fds_config[1]); - // create a shell subscriber. - sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); - sp<ShellSubscriber> shellClient = new ShellSubscriber(uidMap); + sp<ShellSubscriber> shellClient = new ShellSubscriber(uidMap, pullerManager); sp<MyResultReceiver> resultReceiver = new MyResultReceiver(); - LogEvent event1(29, 1000); - event1.write(2); - event1.init(); - // mimic a binder thread that a shell subscriber runs on. it would block. std::thread reader([&resultReceiver, &fds_config, &fds_data, &shellClient] { shellClient->startNewSubscription(fds_config[0], fds_data[1], resultReceiver); @@ -93,44 +90,127 @@ TEST(ShellSubscriberTest, testPushedSubscription) { // let the shell subscriber to receive the config from pipe. std::this_thread::sleep_for(100ms); - // send a log event that matches the config. - std::thread log_reader([&shellClient, &event1] { shellClient->onLogEvent(event1); }); - log_reader.detach(); + if (pushedEvents.size() > 0) { + // send a log event that matches the config. + std::thread log_reader([&shellClient, &pushedEvents] { + for (const auto& event : pushedEvents) { + shellClient->onLogEvent(*event); + } + }); + + log_reader.detach(); - if (log_reader.joinable()) { - log_reader.join(); + if (log_reader.joinable()) { + log_reader.join(); + } } // wait for the data to be written. std::this_thread::sleep_for(100ms); - // this is the expected screen event atom. - Atom atom; - atom.mutable_screen_state_changed()->set_state( - ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); - - int atom_size = atom.ByteSize(); + int expected_data_size = expectedData.ByteSize(); // now read from the pipe. firstly read the atom size. size_t dataSize = 0; EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); - EXPECT_EQ(atom_size, (int)dataSize); + EXPECT_EQ(expected_data_size, (int)dataSize); // then read that much data which is the atom in proto binary format vector<uint8_t> dataBuffer(dataSize); EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize)); // make sure the received bytes can be parsed to an atom - Atom receivedAtom; + ShellData receivedAtom; EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0); // serialze the expected atom to bytes. and compare. to make sure they are the same. - vector<uint8_t> atomBuffer(atom_size); - atom.SerializeToArray(&atomBuffer[0], atom_size); + vector<uint8_t> atomBuffer(expected_data_size); + expectedData.SerializeToArray(&atomBuffer[0], expected_data_size); EXPECT_EQ(atomBuffer, dataBuffer); close(fds_data[0]); } +TEST(ShellSubscriberTest, testPushedSubscription) { + sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + vector<std::shared_ptr<LogEvent>> pushedList; + + std::shared_ptr<LogEvent> event1 = + std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/); + event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON); + event1->init(); + pushedList.push_back(event1); + + // create a simple config to get screen events + ShellSubscription config; + config.add_pushed()->set_atom_id(29); + + // this is the expected screen event atom. + ShellData shellData; + shellData.add_atom()->mutable_screen_state_changed()->set_state( + ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); + + runShellTest(config, uidMap, pullerManager, pushedList, shellData); +} + +namespace { + +int kUid1 = 1000; +int kUid2 = 2000; + +int kCpuTime1 = 100; +int kCpuTime2 = 200; + +ShellData getExpectedShellData() { + ShellData shellData; + auto* atom1 = shellData.add_atom()->mutable_cpu_active_time(); + atom1->set_uid(kUid1); + atom1->set_time_millis(kCpuTime1); + + auto* atom2 = shellData.add_atom()->mutable_cpu_active_time(); + atom2->set_uid(kUid2); + atom2->set_time_millis(kCpuTime2); + + return shellData; +} + +ShellSubscription getPulledConfig() { + ShellSubscription config; + auto* pull_config = config.add_pulled(); + pull_config->mutable_matcher()->set_atom_id(10016); + pull_config->set_freq_millis(2000); + return config; +} + +} // namespace + +TEST(ShellSubscriberTest, testPulledSubscription) { + sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>(); + + sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>(); + EXPECT_CALL(*pullerManager, Pull(10016, _, _)) + .WillRepeatedly( + Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) { + data->clear(); + shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, timeNs); + event->write(kUid1); + event->write(kCpuTime1); + event->init(); + data->push_back(event); + // another event + event = make_shared<LogEvent>(tagId, timeNs); + event->write(kUid2); + event->write(kCpuTime2); + event->init(); + data->push_back(event); + return true; + })); + + runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(), + getExpectedShellData()); +} + #else GTEST_LOG_(INFO) << "This test does nothing.\n"; #endif |