Merge "Add pulled atom subscription for shell."
diff --git a/cmds/statsd/Android.mk b/cmds/statsd/Android.mk
index f6b0db8..c396cd1 100644
--- a/cmds/statsd/Android.mk
+++ b/cmds/statsd/Android.mk
@@ -189,6 +189,7 @@
     src/atom_field_options.proto \
     src/atoms.proto \
     src/stats_log.proto \
+    src/shell/shell_data.proto \
     tests/AlarmMonitor_test.cpp \
     tests/anomaly/AlarmTracker_test.cpp \
     tests/anomaly/AnomalyTracker_test.cpp \
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index fb6f8c8..ce28777 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -319,7 +319,7 @@
         }
         if (!args[0].compare(String8("data-subscribe"))) {
             if (mShellSubscriber == nullptr) {
-                mShellSubscriber = new ShellSubscriber(mUidMap);
+                mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager);
             }
             mShellSubscriber->startNewSubscription(in, out, resultReceiver);
             return NO_ERROR;
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index 1306a46..dffff7a 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,9 +18,9 @@
 
 #include "ShellSubscriber.h"
 
-#include "matchers/matcher_util.h"
-
 #include <android-base/file.h>
+#include "matchers/matcher_util.h"
+#include "stats_log_util.h"
 
 using android::util::ProtoOutputStream;
 
@@ -28,6 +28,8 @@
 namespace os {
 namespace statsd {
 
+const static int FIELD_ID_ATOM = 1;
+
 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
     VLOG("start new shell subscription");
     {
@@ -42,25 +44,106 @@
         IInterface::asBinder(mResultReceiver)->linkToDeath(this);
     }
 
-    // Spawn another thread to read the config updates from the input file descriptor
-    std::thread reader([in, this] { readConfig(in); });
-    reader.detach();
+    // Note that the following is blocking, and it's intended as we cannot return until the shell
+    // cmd exits, otherwise all resources & FDs will be automatically closed.
 
+    // Read config forever until EOF is reached. Clients may send multiple configs -- each new
+    // config replace the previous one.
+    readConfig(in);
+
+    // Now we have read an EOF we now wait for the semaphore until the client exits.
+    VLOG("Now wait for client to exit");
     std::unique_lock<std::mutex> lk(mMutex);
-
     mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
-    if (reader.joinable()) {
-        reader.join();
-    }
 }
 
 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
     std::lock_guard<std::mutex> lock(mMutex);
     mPushedMatchers.clear();
+    mPulledInfo.clear();
+
     for (const auto& pushed : config.pushed()) {
         mPushedMatchers.push_back(pushed);
         VLOG("adding matcher for atom %d", pushed.atom_id());
     }
+
+    int64_t token = getElapsedRealtimeNs();
+    mPullToken = token;
+
+    int64_t minInterval = -1;
+    for (const auto& pulled : config.pulled()) {
+        // All intervals need to be multiples of the min interval.
+        if (minInterval < 0 || pulled.freq_millis() < minInterval) {
+            minInterval = pulled.freq_millis();
+        }
+
+        mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
+        VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+    }
+
+    if (mPulledInfo.size() > 0 && minInterval > 0) {
+        // This thread is guaranteed to terminate after it detects the token is different or
+        // cleaned up.
+        std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
+        puller.detach();
+    }
+}
+
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                                          const SimpleAtomMatcher& matcher) {
+    if (mOutput == 0) return;
+    int count = 0;
+    mProto.clear();
+    for (const auto& event : data) {
+        VLOG("%s", event->ToString().c_str());
+        if (matchesSimple(*mUidMap, matcher, *event)) {
+            VLOG("matched");
+            count++;
+            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+            event->ToProto(mProto);
+            mProto.end(atomToken);
+        }
+    }
+
+    if (count > 0) {
+        // First write the payload size.
+        size_t bufferSize = mProto.size();
+        write(mOutput, &bufferSize, sizeof(bufferSize));
+        VLOG("%d atoms, proto size: %zu", count, bufferSize);
+        // Then write the payload.
+        mProto.flush(mOutput);
+    }
+    mProto.clear();
+}
+
+void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
+    while (1) {
+        int64_t nowMillis = getElapsedRealtimeMillis();
+        {
+            std::lock_guard<std::mutex> lock(mMutex);
+            if (mPulledInfo.size() == 0 || mPullToken != token) {
+                VLOG("Pulling thread %lld done!", (long long)token);
+                return;
+            }
+            for (auto& pullInfo : mPulledInfo) {
+                if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+                    VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
+
+                    vector<std::shared_ptr<LogEvent>> data;
+                    mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
+                                     &data);
+                    VLOG("pulled %zu atoms", data.size());
+                    if (data.size() > 0) {
+                        writeToOutputLocked(data, pullInfo.mPullerMatcher);
+                    }
+                    pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+                }
+            }
+        }
+        VLOG("Pulling thread %lld sleep....", (long long)token);
+        std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+    }
 }
 
 void ShellSubscriber::readConfig(int in) {
@@ -101,6 +184,8 @@
     mOutput = 0;
     mResultReceiver = nullptr;
     mPushedMatchers.clear();
+    mPulledInfo.clear();
+    mPullToken = 0;
     VLOG("done clean up");
 }
 
@@ -110,10 +195,13 @@
     if (mOutput <= 0) {
         return;
     }
-
     for (const auto& matcher : mPushedMatchers) {
         if (matchesSimple(*mUidMap, matcher, event)) {
+            VLOG("%s", event.ToString().c_str());
+            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
             event.ToProto(mProto);
+            mProto.end(atomToken);
             // First write the payload size.
             size_t bufferSize = mProto.size();
             write(mOutput, &bufferSize, sizeof(bufferSize));
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 0ace35f..5401f31 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -24,6 +24,7 @@
 #include <mutex>
 #include <string>
 #include <thread>
+#include "external/StatsPullerManager.h"
 #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
 #include "packages/UidMap.h"
@@ -51,14 +52,15 @@
  * with sizeof(size_t) bytes indicating the size of the proto message payload.
  *
  * The stream would be in the following format:
- * |size_t|atom1 proto|size_t|atom2 proto|....
+ * |size_t|shellData proto|size_t|shellData proto|....
  *
  * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
  * until it exits.
  */
 class ShellSubscriber : public virtual IBinder::DeathRecipient {
 public:
-    ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
+    ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
+        : mUidMap(uidMap), mPullerMgr(pullerMgr){};
 
     /**
      * Start a new subscription.
@@ -70,15 +72,28 @@
     void onLogEvent(const LogEvent& event);
 
 private:
+    struct PullInfo {
+        PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
+            : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
+        }
+        SimpleAtomMatcher mPullerMatcher;
+        int64_t mInterval;
+        int64_t mPrevPullElapsedRealtimeMs;
+    };
     void readConfig(int in);
 
     void updateConfig(const ShellSubscription& config);
 
+    void startPull(int64_t token, int64_t intervalMillis);
+
     void cleanUpLocked();
 
+    void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                             const SimpleAtomMatcher& matcher);
+
     sp<UidMap> mUidMap;
 
-    // bool mWritten = false;
+    sp<StatsPullerManager> mPullerMgr;
 
     android::util::ProtoOutputStream mProto;
 
@@ -93,6 +108,10 @@
     sp<IResultReceiver> mResultReceiver;
 
     std::vector<SimpleAtomMatcher> mPushedMatchers;
+
+    std::vector<PullInfo> mPulledInfo;
+
+    int64_t mPullToken = 0;  // A unique token to identify a puller thread.
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/shell/shell_config.proto b/cmds/statsd/src/shell/shell_config.proto
index 516693d..73cb49a 100644
--- a/cmds/statsd/src/shell/shell_config.proto
+++ b/cmds/statsd/src/shell/shell_config.proto
@@ -24,7 +24,7 @@
 import "frameworks/base/cmds/statsd/src/statsd_config.proto";
 
 message PulledAtomSubscription {
-    optional int32 atom_id = 1;
+    optional SimpleAtomMatcher matcher = 1;
 
     /* gap between two pulls in milliseconds */
     optional int32 freq_millis = 2;
diff --git a/cmds/statsd/src/shell/shell_data.proto b/cmds/statsd/src/shell/shell_data.proto
new file mode 100644
index 0000000..236bdbd
--- /dev/null
+++ b/cmds/statsd/src/shell/shell_data.proto
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package android.os.statsd;
+
+option java_package = "com.android.os.statsd";
+option java_outer_classname = "ShellDataProto";
+
+import "frameworks/base/cmds/statsd/src/atoms.proto";
+
+// The output of shell subscription, including both pulled and pushed subscriptions.
+message ShellData {
+    repeated Atom atom = 1;
+}
diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
index b380b03..dd00561 100644
--- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
+++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
@@ -17,6 +17,7 @@
 #include <unistd.h>
 #include "frameworks/base/cmds/statsd/src/atoms.pb.h"
 #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
+#include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h"
 #include "src/shell/ShellSubscriber.h"
 #include "tests/metrics/metrics_test_helper.h"
 
@@ -26,7 +27,10 @@
 using namespace android::os::statsd;
 using android::sp;
 using std::vector;
+using testing::_;
+using testing::Invoke;
 using testing::NaggyMock;
+using testing::StrictMock;
 
 #ifdef __ANDROID__
 
@@ -51,7 +55,10 @@
     }
 };
 
-TEST(ShellSubscriberTest, testPushedSubscription) {
+void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
+                  sp<MockStatsPullerManager> pullerManager,
+                  const vector<std::shared_ptr<LogEvent>>& pushedEvents,
+                  const ShellData& expectedData) {
     // set up 2 pipes for read/write config and data
     int fds_config[2];
     ASSERT_EQ(0, pipe(fds_config));
@@ -59,10 +66,6 @@
     int fds_data[2];
     ASSERT_EQ(0, pipe(fds_data));
 
-    // create a simple config to get screen events
-    ShellSubscription config;
-    config.add_pushed()->set_atom_id(29);
-
     size_t bufferSize = config.ByteSize();
 
     // write the config to pipe, first write size of the config
@@ -75,15 +78,9 @@
     write(fds_config[1], buffer.data(), bufferSize);
     close(fds_config[1]);
 
-    // create a shell subscriber.
-    sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
-    sp<ShellSubscriber> shellClient = new ShellSubscriber(uidMap);
+    sp<ShellSubscriber> shellClient = new ShellSubscriber(uidMap, pullerManager);
     sp<MyResultReceiver> resultReceiver = new MyResultReceiver();
 
-    LogEvent event1(29, 1000);
-    event1.write(2);
-    event1.init();
-
     // mimic a binder thread that a shell subscriber runs on. it would block.
     std::thread reader([&resultReceiver, &fds_config, &fds_data, &shellClient] {
         shellClient->startNewSubscription(fds_config[0], fds_data[1], resultReceiver);
@@ -93,44 +90,127 @@
     // let the shell subscriber to receive the config from pipe.
     std::this_thread::sleep_for(100ms);
 
-    // send a log event that matches the config.
-    std::thread log_reader([&shellClient, &event1] { shellClient->onLogEvent(event1); });
-    log_reader.detach();
+    if (pushedEvents.size() > 0) {
+        // send a log event that matches the config.
+        std::thread log_reader([&shellClient, &pushedEvents] {
+            for (const auto& event : pushedEvents) {
+                shellClient->onLogEvent(*event);
+            }
+        });
 
-    if (log_reader.joinable()) {
-        log_reader.join();
+        log_reader.detach();
+
+        if (log_reader.joinable()) {
+            log_reader.join();
+        }
     }
 
     // wait for the data to be written.
     std::this_thread::sleep_for(100ms);
 
-    // this is the expected screen event atom.
-    Atom atom;
-    atom.mutable_screen_state_changed()->set_state(
-            ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
-
-    int atom_size = atom.ByteSize();
+    int expected_data_size = expectedData.ByteSize();
 
     // now read from the pipe. firstly read the atom size.
     size_t dataSize = 0;
     EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));
-    EXPECT_EQ(atom_size, (int)dataSize);
+    EXPECT_EQ(expected_data_size, (int)dataSize);
 
     // then read that much data which is the atom in proto binary format
     vector<uint8_t> dataBuffer(dataSize);
     EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));
 
     // make sure the received bytes can be parsed to an atom
-    Atom receivedAtom;
+    ShellData receivedAtom;
     EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
 
     // serialze the expected atom to bytes. and compare. to make sure they are the same.
-    vector<uint8_t> atomBuffer(atom_size);
-    atom.SerializeToArray(&atomBuffer[0], atom_size);
+    vector<uint8_t> atomBuffer(expected_data_size);
+    expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
     EXPECT_EQ(atomBuffer, dataBuffer);
     close(fds_data[0]);
 }
 
+TEST(ShellSubscriberTest, testPushedSubscription) {
+    sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    vector<std::shared_ptr<LogEvent>> pushedList;
+
+    std::shared_ptr<LogEvent> event1 =
+            std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/);
+    event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+    event1->init();
+    pushedList.push_back(event1);
+
+    // create a simple config to get screen events
+    ShellSubscription config;
+    config.add_pushed()->set_atom_id(29);
+
+    // this is the expected screen event atom.
+    ShellData shellData;
+    shellData.add_atom()->mutable_screen_state_changed()->set_state(
+            ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
+
+    runShellTest(config, uidMap, pullerManager, pushedList, shellData);
+}
+
+namespace {
+
+int kUid1 = 1000;
+int kUid2 = 2000;
+
+int kCpuTime1 = 100;
+int kCpuTime2 = 200;
+
+ShellData getExpectedShellData() {
+    ShellData shellData;
+    auto* atom1 = shellData.add_atom()->mutable_cpu_active_time();
+    atom1->set_uid(kUid1);
+    atom1->set_time_millis(kCpuTime1);
+
+    auto* atom2 = shellData.add_atom()->mutable_cpu_active_time();
+    atom2->set_uid(kUid2);
+    atom2->set_time_millis(kCpuTime2);
+
+    return shellData;
+}
+
+ShellSubscription getPulledConfig() {
+    ShellSubscription config;
+    auto* pull_config = config.add_pulled();
+    pull_config->mutable_matcher()->set_atom_id(10016);
+    pull_config->set_freq_millis(2000);
+    return config;
+}
+
+}  // namespace
+
+TEST(ShellSubscriberTest, testPulledSubscription) {
+    sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(10016, _, _))
+            .WillRepeatedly(
+                    Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) {
+                        data->clear();
+                        shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, timeNs);
+                        event->write(kUid1);
+                        event->write(kCpuTime1);
+                        event->init();
+                        data->push_back(event);
+                        // another event
+                        event = make_shared<LogEvent>(tagId, timeNs);
+                        event->write(kUid2);
+                        event->write(kCpuTime2);
+                        event->init();
+                        data->push_back(event);
+                        return true;
+                    }));
+
+    runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
+                 getExpectedShellData());
+}
+
 #else
 GTEST_LOG_(INFO) << "This test does nothing.\n";
 #endif