diff options
| author | 2020-05-07 23:22:56 +0000 | |
|---|---|---|
| committer | 2020-05-07 23:22:56 +0000 | |
| commit | ca7fc3be0f1a841ab12e00a219025bcc63773fff (patch) | |
| tree | 78603aa3def78abbadfeb16de80dedd6cd87555d | |
| parent | f67a780c331e8776da059067d83b40a11584138d (diff) | |
| parent | 1e2405160447116c8fd9b6f09e2245381c55f23e (diff) | |
Merge "Fix ShellSubscriber concurrency issues" into rvc-dev
| -rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.cpp | 190 | ||||
| -rw-r--r-- | cmds/statsd/src/shell/ShellSubscriber.h | 36 | ||||
| -rw-r--r-- | cmds/statsd/tests/shell/ShellSubscriber_test.cpp | 46 |
3 files changed, 164 insertions, 108 deletions
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index bed836a1bd90..7b687210ce33 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -19,6 +19,7 @@ #include "ShellSubscriber.h" #include <android-base/file.h> + #include "matchers/matcher_util.h" #include "stats_log_util.h" @@ -32,42 +33,53 @@ const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { int myToken = claimToken(); + VLOG("ShellSubscriber: new subscription %d has come in", myToken); mSubscriptionShouldEnd.notify_one(); shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); - if (!readConfig(mySubscriptionInfo)) { - return; - } + if (!readConfig(mySubscriptionInfo)) return; + + { + std::unique_lock<std::mutex> lock(mMutex); + if (myToken != mToken) { + // Some other subscription has already come in. Stop. + return; + } + mSubscriptionInfo = mySubscriptionInfo; + + spawnHelperThreadsLocked(mySubscriptionInfo, myToken); + waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); + + if (mSubscriptionInfo == mySubscriptionInfo) { + mSubscriptionInfo = nullptr; + } - // critical-section - std::unique_lock<std::mutex> lock(mMutex); - if (myToken != mToken) { - // Some other subscription has already come in. Stop. - return; } - mSubscriptionInfo = mySubscriptionInfo; +} - if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { - // This thread terminates after it detects that mToken has changed. +void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) { + if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } - // Block until subscription has ended. + std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); + heartbeatSender.detach(); +} + +void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, + int myToken, + std::unique_lock<std::mutex>& lock, + int timeoutSec) { if (timeoutSec > 0) { - mSubscriptionShouldEnd.wait_for( - lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { - return mToken != myToken || !mySubscriptionInfo->mClientAlive; - }); + mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; + }); } else { - mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { - return mToken != myToken || !mySubscriptionInfo->mClientAlive; + mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; }); } - - if (mSubscriptionInfo == mySubscriptionInfo) { - mSubscriptionInfo = nullptr; - } } // Atomically claim the next token. Token numbers denote subscriber ordering. @@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) return true; } -void ShellSubscriber::startPull(int64_t myToken) { +void ShellSubscriber::startPull(int myToken) { + VLOG("ShellSubscriber: pull thread %d starting", myToken); while (true) { - std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo || mToken != myToken) { - VLOG("Pulling thread %lld done!", (long long)myToken); - return; - } + { + std::lock_guard<std::mutex> lock(mMutex); + if (!mSubscriptionInfo || mToken != myToken) { + VLOG("ShellSubscriber: pulling thread %d done!", myToken); + return; + } - int64_t nowMillis = getElapsedRealtimeMillis(); - for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { - vector<std::shared_ptr<LogEvent>> data; - vector<int32_t> uids; - uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); - // This is slow. Consider storing the uids per app and listening to uidmap updates. - for (const string& pkg : pullInfo.mPullPackages) { - set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); - uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end()); + int64_t nowMillis = getElapsedRealtimeMillis(); + for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { + continue; } - uids.push_back(DEFAULT_PULL_UID); + + vector<int32_t> uids; + getUidsForPullAtom(&uids, pullInfo); + + vector<std::shared_ptr<LogEvent>> data; mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data); - VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + writePulledAtomsLocked(data, pullInfo.mPullerMatcher); - if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; - } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } - VLOG("Pulling thread %lld sleep....", (long long)myToken); + VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, + mSubscriptionInfo->mPullIntervalMin); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } -// \return boolean indicating if writes were successful (will return false if -// client dies) -bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, +void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) { + uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); + // This is slow. Consider storing the uids per app and listening to uidmap updates. + for (const string& pkg : pullInfo.mPullPackages) { + set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); + uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); + } + uids->push_back(DEFAULT_PULL_UID); +} + +void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher) { mProto.clear(); int count = 0; for (const auto& event : data) { - VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | @@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve } } - if (count > 0) { - // First write the payload size. - size_t bufferSize = mProto.size(); - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, - sizeof(bufferSize))) { - return false; - } - - VLOG("%d atoms, proto size: %zu", count, bufferSize); - // Then write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { - return false; - } - } - - return true; + if (count > 0) attemptWriteToSocketLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard<std::mutex> lock(mMutex); - if (!mSubscriptionInfo) { - return; - } + if (!mSubscriptionInfo) return; mProto.clear(); for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { - VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); + attemptWriteToSocketLocked(mProto.size()); + } + } +} - // First write the payload size. - size_t bufferSize = mProto.size(); - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, - sizeof(bufferSize))) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); +// Tries to write the atom encoded in mProto to the socket. If the write fails +// because the read end of the pipe has closed, signals to other threads that +// the subscription should end. +void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { + // First write the payload size. + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + if (dataSize == 0) return; + + // Then, write the payload. + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + mLastWriteMs = getElapsedRealtimeMillis(); +} + +// Send a heartbeat, consisting solely of a data size of 0, if perfd has not +// recently received any writes from statsd. When it receives the data size of +// 0, perfd will not expect any data and recheck whether the shell command is +// still running. +void ShellSubscriber::sendHeartbeats(int myToken) { + while (true) { + { + std::lock_guard<std::mutex> lock(mMutex); + if (!mSubscriptionInfo || myToken != mToken) { + VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); return; } - // Then write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; + if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { + VLOG("ShellSubscriber: sending a heartbeat to perfd"); + attemptWriteToSocketLocked(/*dataSize=*/0); } } + std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); } } diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h index 61457d89f224..26c8a2a0b683 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.h +++ b/cmds/statsd/src/shell/ShellSubscriber.h @@ -38,11 +38,11 @@ namespace statsd { * * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms. - * The atoms are sent back to the client in real time, as opposed to - * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are - * responsible for doing the aggregation after receiving the atom events. + * The atoms are sent back to the client in real time, as opposed to keeping the data in memory. + * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the + * aggregation after receiving the atom events. * - * Shell client pass ShellSubscription in the proto binary format. Client can update the + * Shell clients pass ShellSubscription in the proto binary format. Clients can update the * subscription by sending a new subscription. The new subscription would replace the old one. * Input data stream format is: * @@ -54,7 +54,7 @@ namespace statsd { * The stream would be in the following format: * |size_t|shellData proto|size_t|shellData proto|.... * - * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread + * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread * until it exits. */ class ShellSubscriber : public virtual RefBase { @@ -100,11 +100,28 @@ private: bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); - void startPull(int64_t myToken); + void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken); - bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, + void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo, + int myToken, + std::unique_lock<std::mutex>& lock, + int timeoutSec); + + void startPull(int myToken); + + void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, const SimpleAtomMatcher& matcher); + void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo); + + void attemptWriteToSocketLocked(size_t dataSize); + + // Send ocassional heartbeats for two reasons: (a) for statsd to detect when + // the read end of the pipe has closed and (b) for perfd to escape a + // blocking read call and recheck if the user has terminated the + // subscription. + void sendHeartbeats(int myToken); + sp<UidMap> mUidMap; sp<StatsPullerManager> mPullerMgr; @@ -120,6 +137,11 @@ private: int mToken = 0; const int32_t DEFAULT_PULL_UID = AID_SYSTEM; + + // Tracks when we last send data to perfd. We need that time to determine + // when next to send a heartbeat. + int64_t mLastWriteMs = 0; + const int64_t kMsBetweenHeartbeats = 1000; }; } // namespace statsd diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp index 7b952d7a392e..363fcb4bf193 100644 --- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp +++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp @@ -86,28 +86,34 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap, // wait for the data to be written. std::this_thread::sleep_for(100ms); - int expected_data_size = expectedData.ByteSize(); - - // now read from the pipe. firstly read the atom size. - size_t dataSize = 0; - EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); - - EXPECT_EQ(expected_data_size, (int)dataSize); - - // then read that much data which is the atom in proto binary format - vector<uint8_t> dataBuffer(dataSize); - EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize)); - - // make sure the received bytes can be parsed to an atom - ShellData receivedAtom; - EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0); + // Because we might receive heartbeats from statsd, consisting of data sizes + // of 0, encapsulate reads within a while loop. + bool readAtom = false; + while (!readAtom) { + // Read the atom size. + size_t dataSize = 0; + read(fds_data[0], &dataSize, sizeof(dataSize)); + if (dataSize == 0) continue; + EXPECT_EQ(expectedData.ByteSize(), int(dataSize)); + + // Read that much data in proto binary format. + vector<uint8_t> dataBuffer(dataSize); + EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize)); + + // Make sure the received bytes can be parsed to an atom. + ShellData receivedAtom; + EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0); + + // Serialize the expected atom to byte array and compare to make sure + // they are the same. + vector<uint8_t> expectedAtomBuffer(expectedData.ByteSize()); + expectedData.SerializeToArray(expectedAtomBuffer.data(), expectedData.ByteSize()); + EXPECT_EQ(expectedAtomBuffer, dataBuffer); + + readAtom = true; + } - // serialze the expected atom to bytes. and compare. to make sure they are the same. - vector<uint8_t> atomBuffer(expected_data_size); - expectedData.SerializeToArray(&atomBuffer[0], expected_data_size); - EXPECT_EQ(atomBuffer, dataBuffer); close(fds_data[0]); - if (reader.joinable()) { reader.join(); } |