diff options
author | 2022-04-07 05:06:33 +0000 | |
---|---|---|
committer | 2022-07-01 05:50:42 +0000 | |
commit | ffa3aaac5513bc2ef6c7d3f88db9976f86275f88 (patch) | |
tree | 5f3b4577b71bb529d7954c28f9d0390a558afc63 | |
parent | 992a405fd8d4025f5c5e7423530f398d459fff6b (diff) |
libbinder: add build option for single-threaded RPC
Trusty does not support threading. This adds a build option
to disable mutexes and other threading code from RpcState,
RpcSession, and RpcServer.
Bug: 224644083
Test: build Trusty
Change-Id: Iaa78caca1ddee45be7c2def2755598decc0d4d15
-rw-r--r-- | libs/binder/BuildFlags.h | 25 | ||||
-rw-r--r-- | libs/binder/FdTrigger.cpp | 26 | ||||
-rw-r--r-- | libs/binder/FdTrigger.h | 4 | ||||
-rw-r--r-- | libs/binder/RpcServer.cpp | 68 | ||||
-rw-r--r-- | libs/binder/RpcSession.cpp | 67 | ||||
-rw-r--r-- | libs/binder/RpcState.cpp | 24 | ||||
-rw-r--r-- | libs/binder/RpcState.h | 3 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcServer.h | 10 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 14 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcThreads.h | 145 |
10 files changed, 303 insertions, 83 deletions
diff --git a/libs/binder/BuildFlags.h b/libs/binder/BuildFlags.h new file mode 100644 index 0000000000..7657246212 --- /dev/null +++ b/libs/binder/BuildFlags.h @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace android { + +#ifdef BINDER_RPC_SINGLE_THREADED +constexpr bool kEnableRpcThreads = false; +#else +constexpr bool kEnableRpcThreads = true; +#endif + +} // namespace android diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp index 5e22593f69..d123fd1f2b 100644 --- a/libs/binder/FdTrigger.cpp +++ b/libs/binder/FdTrigger.cpp @@ -28,25 +28,45 @@ namespace android { std::unique_ptr<FdTrigger> FdTrigger::make() { auto ret = std::make_unique<FdTrigger>(); +#ifndef BINDER_RPC_SINGLE_THREADED if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { ALOGE("Could not create pipe %s", strerror(errno)); return nullptr; } +#endif return ret; } void FdTrigger::trigger() { +#ifdef BINDER_RPC_SINGLE_THREADED + mTriggered = true; +#else mWrite.reset(); +#endif } bool FdTrigger::isTriggered() { +#ifdef BINDER_RPC_SINGLE_THREADED + return mTriggered; +#else return mWrite == -1; +#endif } status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { +#ifdef BINDER_RPC_SINGLE_THREADED + if (mTriggered) { + return DEAD_OBJECT; + } +#endif + LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get()); - pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, - {.fd = mRead.get(), .events = 0, .revents = 0}}; + pollfd pfd[]{ + {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, +#ifndef BINDER_RPC_SINGLE_THREADED + {.fd = mRead.get(), .events = 0, .revents = 0}, +#endif + }; int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); if (ret < 0) { return -errno; @@ -55,6 +75,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { // At least one FD has events. Check them. +#ifndef BINDER_RPC_SINGLE_THREADED // Detect explicit trigger(): DEAD_OBJECT if (pfd[1].revents & POLLHUP) { return DEAD_OBJECT; @@ -68,6 +89,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { // pfd[1].revents is 0, hence pfd[0].revents must be set, and only possible values are // a subset of event | POLLHUP | POLLERR | POLLNVAL. +#endif // POLLNVAL: invalid FD number, e.g. not opened. if (pfd[0].revents & POLLNVAL) { diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h index a545d6cbea..5c7102e22e 100644 --- a/libs/binder/FdTrigger.h +++ b/libs/binder/FdTrigger.h @@ -55,7 +55,11 @@ public: [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event); private: +#ifdef BINDER_RPC_SINGLE_THREADED + bool mTriggered = false; +#else base::unique_fd mWrite; base::unique_fd mRead; +#endif }; } // namespace android diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index f83bb5e335..1cd1fd3afd 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -32,6 +32,7 @@ #include <log/log.h> #include <utils/Compat.h> +#include "BuildFlags.h" #include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" @@ -131,27 +132,27 @@ void RpcServer::setSupportedFileDescriptorTransportModes( } void RpcServer::setRootObject(const sp<IBinder>& binder) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); mRootObjectFactory = nullptr; mRootObjectWeak = mRootObject = binder; } void RpcServer::setRootObjectWeak(const wp<IBinder>& binder) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectFactory = nullptr; mRootObjectWeak = binder; } void RpcServer::setPerSessionRootObject( std::function<sp<IBinder>(const void*, size_t)>&& makeObject) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectWeak.clear(); mRootObjectFactory = std::move(makeObject); } sp<IBinder> RpcServer::getRootObject() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); bool hasWeak = mRootObjectWeak.unsafe_get(); sp<IBinder> ret = mRootObjectWeak.promote(); ALOGW_IF(hasWeak && ret == nullptr, "RpcServer root object is freed, returning nullptr"); @@ -159,7 +160,7 @@ sp<IBinder> RpcServer::getRootObject() { } std::vector<uint8_t> RpcServer::getCertificate(RpcCertificateFormat format) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return mCtx->getCertificate(format); } @@ -168,15 +169,17 @@ static void joinRpcServer(sp<RpcServer>&& thiz) { } void RpcServer::start() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(mJoinThread.get(), "Already started!"); - mJoinThread = std::make_unique<std::thread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); + mJoinThread = + std::make_unique<RpcMaybeThread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); + rpcJoinIfSingleThreaded(*mJoinThread); } void RpcServer::join() { { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; @@ -204,24 +207,31 @@ void RpcServer::join() { LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); { - std::lock_guard<std::mutex> _l(mLock); - std::thread thread = - std::thread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), - std::move(clientFd), addr, addrLen); - mConnectingThreads[thread.get_id()] = std::move(thread); + RpcMutexLockGuard _l(mLock); + RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection, + sp<RpcServer>::fromExisting(this), + std::move(clientFd), addr, addrLen); + + auto& threadRef = mConnectingThreads[thread.get_id()]; + threadRef = std::move(thread); + rpcJoinIfSingleThreaded(threadRef); } } LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str()); - { - std::lock_guard<std::mutex> _l(mLock); + if constexpr (kEnableRpcThreads) { + RpcMutexLockGuard _l(mLock); mJoinThreadRunning = false; + } else { + // Multi-threaded builds clear this in shutdown(), but we need it valid + // so the loop above exits cleanly + mShutdownTrigger = nullptr; } mShutdownCv.notify_all(); } bool RpcServer::shutdown() { - std::unique_lock<std::mutex> _l(mLock); + RpcMutexUniqueLock _l(mLock); if (mShutdownTrigger == nullptr) { LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed (already shutdown?)"); return false; @@ -232,10 +242,16 @@ bool RpcServer::shutdown() { for (auto& [id, session] : mSessions) { (void)id; // server lock is a more general lock - std::lock_guard<std::mutex> _lSession(session->mMutex); + RpcMutexLockGuard _lSession(session->mMutex); session->mShutdownTrigger->trigger(); } + if constexpr (!kEnableRpcThreads) { + // In single-threaded mode we're done here, everything else that + // needs to happen should be at the end of RpcServer::join() + return true; + } + while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " @@ -263,7 +279,7 @@ bool RpcServer::shutdown() { } std::vector<sp<RpcSession>> RpcServer::listSessions() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; @@ -273,7 +289,7 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() { } size_t RpcServer::numUninitializedSessions() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return mConnectingThreads.size(); } @@ -354,12 +370,12 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } } - std::thread thisThread; + RpcMaybeThread thisThread; sp<RpcSession> session; { - std::unique_lock<std::mutex> _l(server->mLock); + RpcMutexUniqueLock _l(server->mLock); - auto threadId = server->mConnectingThreads.find(std::this_thread::get_id()); + auto threadId = server->mConnectingThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); @@ -505,7 +521,7 @@ void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) LOG_RPC_DETAIL("Dropping session with address %s", base::HexString(id.data(), id.size()).c_str()); - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); auto it = mSessions.find(id); LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s", base::HexString(id.data(), id.size()).c_str()); @@ -519,17 +535,17 @@ void RpcServer::onSessionIncomingThreadEnded() { } bool RpcServer::hasServer() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return mServer.ok(); } unique_fd RpcServer::releaseServer() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return std::move(mServer); } status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); if (mServer.ok()) { ALOGE("Each RpcServer can only have one server."); return INVALID_OPERATION; diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 41842a7d84..2d9c93341f 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -21,7 +21,6 @@ #include <dlfcn.h> #include <inttypes.h> #include <poll.h> -#include <pthread.h> #include <unistd.h> #include <string_view> @@ -60,7 +59,7 @@ RpcSession::RpcSession(std::unique_ptr<RpcTransportCtx> ctx) : mCtx(std::move(ct RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0, "Should not be able to destroy a session with servers in use."); } @@ -77,7 +76,7 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans } void RpcSession::setMaxIncomingThreads(size_t threads) { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max incoming threads before setting up connections, but has %zu " "client(s) and %zu server(s)", @@ -86,12 +85,12 @@ void RpcSession::setMaxIncomingThreads(size_t threads) { } size_t RpcSession::getMaxIncomingThreads() { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); return mMaxIncomingThreads; } void RpcSession::setMaxOutgoingThreads(size_t threads) { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max outgoing threads before setting up connections, but has %zu " "client(s) and %zu server(s)", @@ -100,7 +99,7 @@ void RpcSession::setMaxOutgoingThreads(size_t threads) { } size_t RpcSession::getMaxOutgoingThreads() { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); return mMaxOutgoingThreads; } @@ -113,7 +112,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { return false; } - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); if (mProtocolVersion && version > *mProtocolVersion) { ALOGE("Cannot upgrade explicitly capped protocol version %u to newer version %u", *mProtocolVersion, version); @@ -125,7 +124,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { } std::optional<uint32_t> RpcSession::getProtocolVersion() { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); return mProtocolVersion; } @@ -209,7 +208,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { } bool RpcSession::shutdownAndWait(bool wait) { - std::unique_lock<std::mutex> _l(mMutex); + RpcMutexUniqueLock _l(mMutex); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); mShutdownTrigger->trigger(); @@ -222,6 +221,7 @@ bool RpcSession::shutdownAndWait(bool wait) { } _l.unlock(); + mRpcBinderState->clear(); return true; @@ -256,7 +256,7 @@ status_t RpcSession::sendDecStrongToTarget(uint64_t address, size_t target) { status_t RpcSession::readId() { { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client."); } @@ -282,7 +282,7 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { mCv.notify_all(); } -void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock, +void RpcSession::WaitForShutdownListener::waitForShutdown(RpcMutexUniqueLock& lock, const sp<RpcSession>& session) { while (session->mConnections.mIncoming.size() > 0) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { @@ -293,11 +293,11 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } -void RpcSession::preJoinThreadOwnership(std::thread thread) { - LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); +void RpcSession::preJoinThreadOwnership(RpcMaybeThread thread) { + LOG_ALWAYS_FATAL_IF(thread.get_id() != rpc_this_thread::get_id(), "Must own this thread"); { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); mConnections.mThreads[thread.get_id()] = std::move(thread); } } @@ -404,8 +404,8 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult sp<RpcSession::EventListener> listener; { - std::lock_guard<std::mutex> _l(session->mMutex); - auto it = session->mConnections.mThreads.find(std::this_thread::get_id()); + RpcMutexLockGuard _l(session->mMutex); + auto it = session->mConnections.mThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end()); it->second.detach(); session->mConnections.mThreads.erase(it); @@ -438,7 +438,7 @@ sp<RpcServer> RpcSession::server() { status_t RpcSession::setupClient(const std::function<status_t(const std::vector<uint8_t>& sessionId, bool incoming)>& connectAndInit) { { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0, "Must only setup session once, but already has %zu clients", mConnections.mOutgoing.size()); @@ -500,7 +500,11 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< return status; } +#ifdef BINDER_RPC_SINGLE_THREADED + constexpr size_t outgoingThreads = 1; +#else // BINDER_RPC_SINGLE_THREADED size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads); +#endif // BINDER_RPC_SINGLE_THREADED ALOGI_IF(outgoingThreads != numThreadsAvailable, "Server hints client to start %zu outgoing threads, but client will only start %zu " "because it is preconfigured to start at most %zu outgoing threads.", @@ -655,14 +659,14 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ } status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { - std::mutex mutex; - std::condition_variable joinCv; - std::unique_lock<std::mutex> lock(mutex); - std::thread thread; + RpcMutex mutex; + RpcConditionVariable joinCv; + RpcMutexUniqueLock lock(mutex); + RpcMaybeThread thread; sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); bool ownershipTransferred = false; - thread = std::thread([&]() { - std::unique_lock<std::mutex> threadLock(mutex); + thread = RpcMaybeThread([&]() { + RpcMutexUniqueLock threadLock(mutex); std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; @@ -678,6 +682,7 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran RpcSession::join(std::move(session), std::move(setupResult)); }); + rpcJoinIfSingleThreaded(thread); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return OK; @@ -697,9 +702,9 @@ status_t RpcSession::initShutdownTrigger() { status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); connection->rpcTransport = std::move(rpcTransport); - connection->exclusiveTid = base::GetThreadId(); + connection->exclusiveTid = rpcGetThreadId(); mConnections.mOutgoing.push_back(connection); } @@ -736,7 +741,7 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( std::unique_ptr<RpcTransport> rpcTransport) { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); if (mConnections.mIncoming.size() >= mMaxIncomingThreads) { ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)", @@ -754,7 +759,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( sp<RpcConnection> session = sp<RpcConnection>::make(); session->rpcTransport = std::move(rpcTransport); - session->exclusiveTid = base::GetThreadId(); + session->exclusiveTid = rpcGetThreadId(); mConnections.mIncoming.push_back(session); mConnections.mMaxIncoming = mConnections.mIncoming.size(); @@ -763,7 +768,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( } bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { - std::unique_lock<std::mutex> _l(mMutex); + RpcMutexUniqueLock _l(mMutex); if (auto it = std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection); it != mConnections.mIncoming.end()) { @@ -781,7 +786,7 @@ bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { } void RpcSession::clearConnectionTid(const sp<RpcConnection>& connection) { - std::unique_lock<std::mutex> _l(mMutex); + RpcMutexUniqueLock _l(mMutex); connection->exclusiveTid = std::nullopt; if (mConnections.mWaitingThreads > 0) { _l.unlock(); @@ -799,8 +804,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co connection->mConnection = nullptr; connection->mReentrant = false; - uint64_t tid = base::GetThreadId(); - std::unique_lock<std::mutex> _l(session->mMutex); + uint64_t tid = rpcGetThreadId(); + RpcMutexUniqueLock _l(session->mMutex); session->mConnections.mWaitingThreads++; while (true) { diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 8f104bc542..01311b505f 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -45,7 +45,7 @@ void rpcMaybeWaitToFlake() { [[clang::no_destroy]] static std::mutex m; unsigned num; { - std::lock_guard<std::mutex> lock(m); + RpcMutexLockGuard lock(m); num = r(); } if (num % 10 == 0) usleep(num % 1000); @@ -89,7 +89,7 @@ status_t RpcState::onBinderLeaving(const sp<RpcSession>& session, const sp<IBind return INVALID_OPERATION; } - std::lock_guard<std::mutex> _l(mNodeMutex); + RpcMutexLockGuard _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // TODO(b/182939933): maybe move address out of BpBinder, and keep binder->address map @@ -165,7 +165,7 @@ status_t RpcState::onBinderEntering(const sp<RpcSession>& session, uint64_t addr return BAD_VALUE; } - std::lock_guard<std::mutex> _l(mNodeMutex); + RpcMutexLockGuard _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; if (auto it = mNodeForAddress.find(address); it != mNodeForAddress.end()) { @@ -200,7 +200,7 @@ status_t RpcState::flushExcessBinderRefs(const sp<RpcSession>& session, uint64_t // extra reference counting packets now. if (binder->remoteBinder()) return OK; - std::unique_lock<std::mutex> _l(mNodeMutex); + RpcMutexUniqueLock _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; auto it = mNodeForAddress.find(address); @@ -228,17 +228,17 @@ status_t RpcState::flushExcessBinderRefs(const sp<RpcSession>& session, uint64_t } size_t RpcState::countBinders() { - std::lock_guard<std::mutex> _l(mNodeMutex); + RpcMutexLockGuard _l(mNodeMutex); return mNodeForAddress.size(); } void RpcState::dump() { - std::lock_guard<std::mutex> _l(mNodeMutex); + RpcMutexLockGuard _l(mNodeMutex); dumpLocked(); } void RpcState::clear() { - std::unique_lock<std::mutex> _l(mNodeMutex); + RpcMutexUniqueLock _l(mNodeMutex); if (mTerminated) { LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(), @@ -488,7 +488,7 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti uint64_t asyncNumber = 0; if (address != 0) { - std::unique_lock<std::mutex> _l(mNodeMutex); + RpcMutexUniqueLock _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races auto it = mNodeForAddress.find(address); LOG_ALWAYS_FATAL_IF(it == mNodeForAddress.end(), @@ -671,7 +671,7 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co }; { - std::lock_guard<std::mutex> _l(mNodeMutex); + RpcMutexLockGuard _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races auto it = mNodeForAddress.find(addr); LOG_ALWAYS_FATAL_IF(it == mNodeForAddress.end(), @@ -840,7 +840,7 @@ processTransactInternalTailCall: (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; } else if (oneway) { - std::unique_lock<std::mutex> _l(mNodeMutex); + RpcMutexUniqueLock _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it->second.binder.promote() != target) { ALOGE("Binder became invalid during transaction. Bad client? %" PRIu64, addr); @@ -981,7 +981,7 @@ processTransactInternalTailCall: // downside: asynchronous transactions may drown out synchronous // transactions. { - std::unique_lock<std::mutex> _l(mNodeMutex); + RpcMutexUniqueLock _l(mNodeMutex); auto it = mNodeForAddress.find(addr); // last refcount dropped after this transaction happened if (it == mNodeForAddress.end()) return OK; @@ -1089,7 +1089,7 @@ status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connect return status; uint64_t addr = RpcWireAddress::toRaw(body.address); - std::unique_lock<std::mutex> _l(mNodeMutex); + RpcMutexUniqueLock _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it == mNodeForAddress.end()) { ALOGE("Unknown binder address %" PRIu64 " for dec strong.", addr); diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index b452a99c79..6fb2e4a011 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -19,6 +19,7 @@ #include <binder/IBinder.h> #include <binder/Parcel.h> #include <binder/RpcSession.h> +#include <binder/RpcThreads.h> #include <map> #include <optional> @@ -268,7 +269,7 @@ private: // false - session shutdown, halt [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node); - std::mutex mNodeMutex; + RpcMutex mNodeMutex; bool mTerminated = false; uint32_t mNextId = 0; // binders known by both sides of a session diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index 7fea76e051..9318c27cf0 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -18,6 +18,7 @@ #include <android-base/unique_fd.h> #include <binder/IBinder.h> #include <binder/RpcSession.h> +#include <binder/RpcThreads.h> #include <binder/RpcTransport.h> #include <utils/Errors.h> #include <utils/RefBase.h> @@ -207,16 +208,17 @@ private: static_cast<size_t>(RpcSession::FileDescriptorTransportMode::NONE)); base::unique_fd mServer; // socket we are accepting sessions on - std::mutex mLock; // for below - std::unique_ptr<std::thread> mJoinThread; + RpcMutex mLock; // for below + std::unique_ptr<RpcMaybeThread> mJoinThread; bool mJoinThreadRunning = false; - std::map<std::thread::id, std::thread> mConnectingThreads; + std::map<RpcMaybeThread::id, RpcMaybeThread> mConnectingThreads; + sp<IBinder> mRootObject; wp<IBinder> mRootObjectWeak; std::function<sp<IBinder>(const void*, size_t)> mRootObjectFactory; std::map<std::vector<uint8_t>, sp<RpcSession>> mSessions; std::unique_ptr<FdTrigger> mShutdownTrigger; - std::condition_variable mShutdownCv; + RpcConditionVariable mShutdownCv; }; } // namespace android diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index b98b0ebea9..a2b28db4a3 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -18,13 +18,13 @@ #include <android-base/threads.h> #include <android-base/unique_fd.h> #include <binder/IBinder.h> +#include <binder/RpcThreads.h> #include <binder/RpcTransport.h> #include <utils/Errors.h> #include <utils/RefBase.h> #include <map> #include <optional> -#include <thread> #include <vector> namespace android { @@ -218,10 +218,10 @@ private: public: void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) override; void onSessionIncomingThreadEnded() override; - void waitForShutdown(std::unique_lock<std::mutex>& lock, const sp<RpcSession>& session); + void waitForShutdown(RpcMutexUniqueLock& lock, const sp<RpcSession>& session); private: - std::condition_variable mCv; + RpcConditionVariable mCv; }; friend WaitForShutdownListener; @@ -244,7 +244,7 @@ private: // // transfer ownership of thread (usually done while a lock is taken on the // structure which originally owns the thread) - void preJoinThreadOwnership(std::thread thread); + void preJoinThreadOwnership(RpcMaybeThread thread); // pass FD to thread and read initial connection information struct PreJoinSetupResult { // Server connection object associated with this @@ -340,14 +340,14 @@ private: std::unique_ptr<RpcState> mRpcBinderState; - std::mutex mMutex; // for all below + RpcMutex mMutex; // for all below size_t mMaxIncomingThreads = 0; size_t mMaxOutgoingThreads = kDefaultMaxOutgoingThreads; std::optional<uint32_t> mProtocolVersion; FileDescriptorTransportMode mFileDescriptorTransportMode = FileDescriptorTransportMode::NONE; - std::condition_variable mAvailableConnectionCv; // for mWaitingThreads + RpcConditionVariable mAvailableConnectionCv; // for mWaitingThreads struct ThreadState { size_t mWaitingThreads = 0; @@ -356,7 +356,7 @@ private: std::vector<sp<RpcConnection>> mOutgoing; size_t mMaxIncoming = 0; std::vector<sp<RpcConnection>> mIncoming; - std::map<std::thread::id, std::thread> mThreads; + std::map<RpcMaybeThread::id, RpcMaybeThread> mThreads; } mConnections; }; diff --git a/libs/binder/include/binder/RpcThreads.h b/libs/binder/include/binder/RpcThreads.h new file mode 100644 index 0000000000..8abf04eaf0 --- /dev/null +++ b/libs/binder/include/binder/RpcThreads.h @@ -0,0 +1,145 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <pthread.h> + +#include <android-base/threads.h> + +#include <functional> +#include <memory> +#include <thread> + +namespace android { + +#ifdef BINDER_RPC_SINGLE_THREADED +class RpcMutex { +public: + void lock() {} + void unlock() {} +}; + +class RpcMutexUniqueLock { +public: + RpcMutexUniqueLock(RpcMutex&) {} + void unlock() {} +}; + +class RpcMutexLockGuard { +public: + RpcMutexLockGuard(RpcMutex&) {} +}; + +class RpcConditionVariable { +public: + void notify_one() {} + void notify_all() {} + + void wait(RpcMutexUniqueLock&) {} + + template <typename Predicate> + void wait(RpcMutexUniqueLock&, Predicate stop_waiting) { + LOG_ALWAYS_FATAL_IF(!stop_waiting(), "RpcConditionVariable::wait condition not met"); + } + + template <typename Duration> + std::cv_status wait_for(RpcMutexUniqueLock&, const Duration&) { + return std::cv_status::no_timeout; + } + + template <typename Duration, typename Predicate> + bool wait_for(RpcMutexUniqueLock&, const Duration&, Predicate stop_waiting) { + return stop_waiting(); + } +}; + +class RpcMaybeThread { +public: + RpcMaybeThread() = default; + + template <typename Function, typename... Args> + RpcMaybeThread(Function&& f, Args&&... args) { + // std::function requires a copy-constructible closure, + // so we need to wrap both the function and its arguments + // in a shared pointer that std::function can copy internally + struct Vars { + std::decay_t<Function> f; + std::tuple<std::decay_t<Args>...> args; + + explicit Vars(Function&& f, Args&&... args) + : f(std::move(f)), args(std::move(args)...) {} + }; + auto vars = std::make_shared<Vars>(std::forward<Function>(f), std::forward<Args>(args)...); + mFunc = [vars]() { std::apply(std::move(vars->f), std::move(vars->args)); }; + } + + void join() { + if (mFunc) { + // Move mFunc into a temporary so we can clear mFunc before + // executing the callback. This avoids infinite recursion if + // the callee then calls join() again directly or indirectly. + decltype(mFunc) func = nullptr; + mFunc.swap(func); + func(); + } + } + void detach() { join(); } + + class id { + public: + bool operator==(const id&) const { return true; } + bool operator!=(const id&) const { return false; } + bool operator<(const id&) const { return false; } + bool operator<=(const id&) const { return true; } + bool operator>(const id&) const { return false; } + bool operator>=(const id&) const { return true; } + }; + + id get_id() const { return id(); } + +private: + std::function<void(void)> mFunc; +}; + +namespace rpc_this_thread { +static inline RpcMaybeThread::id get_id() { + return RpcMaybeThread::id(); +} +} // namespace rpc_this_thread + +static inline uint64_t rpcGetThreadId() { + return 0; +} + +static inline void rpcJoinIfSingleThreaded(RpcMaybeThread& t) { + t.join(); +} +#else // BINDER_RPC_SINGLE_THREADED +using RpcMutex = std::mutex; +using RpcMutexUniqueLock = std::unique_lock<std::mutex>; +using RpcMutexLockGuard = std::lock_guard<std::mutex>; +using RpcConditionVariable = std::condition_variable; +using RpcMaybeThread = std::thread; +namespace rpc_this_thread = std::this_thread; + +static inline uint64_t rpcGetThreadId() { + return base::GetThreadId(); +} + +static inline void rpcJoinIfSingleThreaded(RpcMaybeThread&) {} +#endif // BINDER_RPC_SINGLE_THREADED + +} // namespace android |