Merge "Add pulled atom subscription for shell."
diff --git a/cmds/statsd/Android.mk b/cmds/statsd/Android.mk
index f6b0db8..c396cd1 100644
--- a/cmds/statsd/Android.mk
+++ b/cmds/statsd/Android.mk
@@ -189,6 +189,7 @@
src/atom_field_options.proto \
src/atoms.proto \
src/stats_log.proto \
+ src/shell/shell_data.proto \
tests/AlarmMonitor_test.cpp \
tests/anomaly/AlarmTracker_test.cpp \
tests/anomaly/AnomalyTracker_test.cpp \
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index fb6f8c8..ce28777 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -319,7 +319,7 @@
}
if (!args[0].compare(String8("data-subscribe"))) {
if (mShellSubscriber == nullptr) {
- mShellSubscriber = new ShellSubscriber(mUidMap);
+ mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager);
}
mShellSubscriber->startNewSubscription(in, out, resultReceiver);
return NO_ERROR;
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index 1306a46..dffff7a 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,9 +18,9 @@
#include "ShellSubscriber.h"
-#include "matchers/matcher_util.h"
-
#include <android-base/file.h>
+#include "matchers/matcher_util.h"
+#include "stats_log_util.h"
using android::util::ProtoOutputStream;
@@ -28,6 +28,8 @@
namespace os {
namespace statsd {
+const static int FIELD_ID_ATOM = 1;
+
void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
VLOG("start new shell subscription");
{
@@ -42,25 +44,106 @@
IInterface::asBinder(mResultReceiver)->linkToDeath(this);
}
- // Spawn another thread to read the config updates from the input file descriptor
- std::thread reader([in, this] { readConfig(in); });
- reader.detach();
+ // Note that the following is blocking, and it's intended as we cannot return until the shell
+ // cmd exits, otherwise all resources & FDs will be automatically closed.
+ // Read config forever until EOF is reached. Clients may send multiple configs -- each new
+ // config replace the previous one.
+ readConfig(in);
+
+ // Now we have read an EOF we now wait for the semaphore until the client exits.
+ VLOG("Now wait for client to exit");
std::unique_lock<std::mutex> lk(mMutex);
-
mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
- if (reader.joinable()) {
- reader.join();
- }
}
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
std::lock_guard<std::mutex> lock(mMutex);
mPushedMatchers.clear();
+ mPulledInfo.clear();
+
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
VLOG("adding matcher for atom %d", pushed.atom_id());
}
+
+ int64_t token = getElapsedRealtimeNs();
+ mPullToken = token;
+
+ int64_t minInterval = -1;
+ for (const auto& pulled : config.pulled()) {
+ // All intervals need to be multiples of the min interval.
+ if (minInterval < 0 || pulled.freq_millis() < minInterval) {
+ minInterval = pulled.freq_millis();
+ }
+
+ mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
+ VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+ }
+
+ if (mPulledInfo.size() > 0 && minInterval > 0) {
+ // This thread is guaranteed to terminate after it detects the token is different or
+ // cleaned up.
+ std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
+ puller.detach();
+ }
+}
+
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher) {
+ if (mOutput == 0) return;
+ int count = 0;
+ mProto.clear();
+ for (const auto& event : data) {
+ VLOG("%s", event->ToString().c_str());
+ if (matchesSimple(*mUidMap, matcher, *event)) {
+ VLOG("matched");
+ count++;
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+ event->ToProto(mProto);
+ mProto.end(atomToken);
+ }
+ }
+
+ if (count > 0) {
+ // First write the payload size.
+ size_t bufferSize = mProto.size();
+ write(mOutput, &bufferSize, sizeof(bufferSize));
+ VLOG("%d atoms, proto size: %zu", count, bufferSize);
+ // Then write the payload.
+ mProto.flush(mOutput);
+ }
+ mProto.clear();
+}
+
+void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
+ while (1) {
+ int64_t nowMillis = getElapsedRealtimeMillis();
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (mPulledInfo.size() == 0 || mPullToken != token) {
+ VLOG("Pulling thread %lld done!", (long long)token);
+ return;
+ }
+ for (auto& pullInfo : mPulledInfo) {
+ if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+ VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
+
+ vector<std::shared_ptr<LogEvent>> data;
+ mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
+ &data);
+ VLOG("pulled %zu atoms", data.size());
+ if (data.size() > 0) {
+ writeToOutputLocked(data, pullInfo.mPullerMatcher);
+ }
+ pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+ }
+ }
+ }
+ VLOG("Pulling thread %lld sleep....", (long long)token);
+ std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+ }
}
void ShellSubscriber::readConfig(int in) {
@@ -101,6 +184,8 @@
mOutput = 0;
mResultReceiver = nullptr;
mPushedMatchers.clear();
+ mPulledInfo.clear();
+ mPullToken = 0;
VLOG("done clean up");
}
@@ -110,10 +195,13 @@
if (mOutput <= 0) {
return;
}
-
for (const auto& matcher : mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
+ VLOG("%s", event.ToString().c_str());
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
+ mProto.end(atomToken);
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 0ace35f..5401f31 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -24,6 +24,7 @@
#include <mutex>
#include <string>
#include <thread>
+#include "external/StatsPullerManager.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
#include "packages/UidMap.h"
@@ -51,14 +52,15 @@
* with sizeof(size_t) bytes indicating the size of the proto message payload.
*
* The stream would be in the following format:
- * |size_t|atom1 proto|size_t|atom2 proto|....
+ * |size_t|shellData proto|size_t|shellData proto|....
*
* Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
* until it exits.
*/
class ShellSubscriber : public virtual IBinder::DeathRecipient {
public:
- ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
+ ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
+ : mUidMap(uidMap), mPullerMgr(pullerMgr){};
/**
* Start a new subscription.
@@ -70,15 +72,28 @@
void onLogEvent(const LogEvent& event);
private:
+ struct PullInfo {
+ PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
+ : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
+ }
+ SimpleAtomMatcher mPullerMatcher;
+ int64_t mInterval;
+ int64_t mPrevPullElapsedRealtimeMs;
+ };
void readConfig(int in);
void updateConfig(const ShellSubscription& config);
+ void startPull(int64_t token, int64_t intervalMillis);
+
void cleanUpLocked();
+ void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher);
+
sp<UidMap> mUidMap;
- // bool mWritten = false;
+ sp<StatsPullerManager> mPullerMgr;
android::util::ProtoOutputStream mProto;
@@ -93,6 +108,10 @@
sp<IResultReceiver> mResultReceiver;
std::vector<SimpleAtomMatcher> mPushedMatchers;
+
+ std::vector<PullInfo> mPulledInfo;
+
+ int64_t mPullToken = 0; // A unique token to identify a puller thread.
};
} // namespace statsd
diff --git a/cmds/statsd/src/shell/shell_config.proto b/cmds/statsd/src/shell/shell_config.proto
index 516693d..73cb49a 100644
--- a/cmds/statsd/src/shell/shell_config.proto
+++ b/cmds/statsd/src/shell/shell_config.proto
@@ -24,7 +24,7 @@
import "frameworks/base/cmds/statsd/src/statsd_config.proto";
message PulledAtomSubscription {
- optional int32 atom_id = 1;
+ optional SimpleAtomMatcher matcher = 1;
/* gap between two pulls in milliseconds */
optional int32 freq_millis = 2;
diff --git a/cmds/statsd/src/shell/shell_data.proto b/cmds/statsd/src/shell/shell_data.proto
new file mode 100644
index 0000000..236bdbd
--- /dev/null
+++ b/cmds/statsd/src/shell/shell_data.proto
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package android.os.statsd;
+
+option java_package = "com.android.os.statsd";
+option java_outer_classname = "ShellDataProto";
+
+import "frameworks/base/cmds/statsd/src/atoms.proto";
+
+// The output of shell subscription, including both pulled and pushed subscriptions.
+message ShellData {
+ repeated Atom atom = 1;
+}
diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
index b380b03..dd00561 100644
--- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
+++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
@@ -17,6 +17,7 @@
#include <unistd.h>
#include "frameworks/base/cmds/statsd/src/atoms.pb.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
+#include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h"
#include "src/shell/ShellSubscriber.h"
#include "tests/metrics/metrics_test_helper.h"
@@ -26,7 +27,10 @@
using namespace android::os::statsd;
using android::sp;
using std::vector;
+using testing::_;
+using testing::Invoke;
using testing::NaggyMock;
+using testing::StrictMock;
#ifdef __ANDROID__
@@ -51,7 +55,10 @@
}
};
-TEST(ShellSubscriberTest, testPushedSubscription) {
+void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
+ sp<MockStatsPullerManager> pullerManager,
+ const vector<std::shared_ptr<LogEvent>>& pushedEvents,
+ const ShellData& expectedData) {
// set up 2 pipes for read/write config and data
int fds_config[2];
ASSERT_EQ(0, pipe(fds_config));
@@ -59,10 +66,6 @@
int fds_data[2];
ASSERT_EQ(0, pipe(fds_data));
- // create a simple config to get screen events
- ShellSubscription config;
- config.add_pushed()->set_atom_id(29);
-
size_t bufferSize = config.ByteSize();
// write the config to pipe, first write size of the config
@@ -75,15 +78,9 @@
write(fds_config[1], buffer.data(), bufferSize);
close(fds_config[1]);
- // create a shell subscriber.
- sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
- sp<ShellSubscriber> shellClient = new ShellSubscriber(uidMap);
+ sp<ShellSubscriber> shellClient = new ShellSubscriber(uidMap, pullerManager);
sp<MyResultReceiver> resultReceiver = new MyResultReceiver();
- LogEvent event1(29, 1000);
- event1.write(2);
- event1.init();
-
// mimic a binder thread that a shell subscriber runs on. it would block.
std::thread reader([&resultReceiver, &fds_config, &fds_data, &shellClient] {
shellClient->startNewSubscription(fds_config[0], fds_data[1], resultReceiver);
@@ -93,44 +90,127 @@
// let the shell subscriber to receive the config from pipe.
std::this_thread::sleep_for(100ms);
- // send a log event that matches the config.
- std::thread log_reader([&shellClient, &event1] { shellClient->onLogEvent(event1); });
- log_reader.detach();
+ if (pushedEvents.size() > 0) {
+ // send a log event that matches the config.
+ std::thread log_reader([&shellClient, &pushedEvents] {
+ for (const auto& event : pushedEvents) {
+ shellClient->onLogEvent(*event);
+ }
+ });
- if (log_reader.joinable()) {
- log_reader.join();
+ log_reader.detach();
+
+ if (log_reader.joinable()) {
+ log_reader.join();
+ }
}
// wait for the data to be written.
std::this_thread::sleep_for(100ms);
- // this is the expected screen event atom.
- Atom atom;
- atom.mutable_screen_state_changed()->set_state(
- ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
-
- int atom_size = atom.ByteSize();
+ int expected_data_size = expectedData.ByteSize();
// now read from the pipe. firstly read the atom size.
size_t dataSize = 0;
EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));
- EXPECT_EQ(atom_size, (int)dataSize);
+ EXPECT_EQ(expected_data_size, (int)dataSize);
// then read that much data which is the atom in proto binary format
vector<uint8_t> dataBuffer(dataSize);
EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));
// make sure the received bytes can be parsed to an atom
- Atom receivedAtom;
+ ShellData receivedAtom;
EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
// serialze the expected atom to bytes. and compare. to make sure they are the same.
- vector<uint8_t> atomBuffer(atom_size);
- atom.SerializeToArray(&atomBuffer[0], atom_size);
+ vector<uint8_t> atomBuffer(expected_data_size);
+ expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
EXPECT_EQ(atomBuffer, dataBuffer);
close(fds_data[0]);
}
+TEST(ShellSubscriberTest, testPushedSubscription) {
+ sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ vector<std::shared_ptr<LogEvent>> pushedList;
+
+ std::shared_ptr<LogEvent> event1 =
+ std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/);
+ event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+ event1->init();
+ pushedList.push_back(event1);
+
+ // create a simple config to get screen events
+ ShellSubscription config;
+ config.add_pushed()->set_atom_id(29);
+
+ // this is the expected screen event atom.
+ ShellData shellData;
+ shellData.add_atom()->mutable_screen_state_changed()->set_state(
+ ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+
+ runShellTest(config, uidMap, pullerManager, pushedList, shellData);
+}
+
+namespace {
+
+int kUid1 = 1000;
+int kUid2 = 2000;
+
+int kCpuTime1 = 100;
+int kCpuTime2 = 200;
+
+ShellData getExpectedShellData() {
+ ShellData shellData;
+ auto* atom1 = shellData.add_atom()->mutable_cpu_active_time();
+ atom1->set_uid(kUid1);
+ atom1->set_time_millis(kCpuTime1);
+
+ auto* atom2 = shellData.add_atom()->mutable_cpu_active_time();
+ atom2->set_uid(kUid2);
+ atom2->set_time_millis(kCpuTime2);
+
+ return shellData;
+}
+
+ShellSubscription getPulledConfig() {
+ ShellSubscription config;
+ auto* pull_config = config.add_pulled();
+ pull_config->mutable_matcher()->set_atom_id(10016);
+ pull_config->set_freq_millis(2000);
+ return config;
+}
+
+} // namespace
+
+TEST(ShellSubscriberTest, testPulledSubscription) {
+ sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
+
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, Pull(10016, _, _))
+ .WillRepeatedly(
+ Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, timeNs);
+ event->write(kUid1);
+ event->write(kCpuTime1);
+ event->init();
+ data->push_back(event);
+ // another event
+ event = make_shared<LogEvent>(tagId, timeNs);
+ event->write(kUid2);
+ event->write(kCpuTime2);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
+ getExpectedShellData());
+}
+
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";
#endif