diff options
Diffstat (limited to 'libs/binder/RpcServer.cpp')
-rw-r--r-- | libs/binder/RpcServer.cpp | 68 |
1 files changed, 42 insertions, 26 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index f83bb5e335..1cd1fd3afd 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -32,6 +32,7 @@ #include <log/log.h> #include <utils/Compat.h> +#include "BuildFlags.h" #include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" @@ -131,27 +132,27 @@ void RpcServer::setSupportedFileDescriptorTransportModes( } void RpcServer::setRootObject(const sp<IBinder>& binder) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); mRootObjectFactory = nullptr; mRootObjectWeak = mRootObject = binder; } void RpcServer::setRootObjectWeak(const wp<IBinder>& binder) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectFactory = nullptr; mRootObjectWeak = binder; } void RpcServer::setPerSessionRootObject( std::function<sp<IBinder>(const void*, size_t)>&& makeObject) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); mRootObject.clear(); mRootObjectWeak.clear(); mRootObjectFactory = std::move(makeObject); } sp<IBinder> RpcServer::getRootObject() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); bool hasWeak = mRootObjectWeak.unsafe_get(); sp<IBinder> ret = mRootObjectWeak.promote(); ALOGW_IF(hasWeak && ret == nullptr, "RpcServer root object is freed, returning nullptr"); @@ -159,7 +160,7 @@ sp<IBinder> RpcServer::getRootObject() { } std::vector<uint8_t> RpcServer::getCertificate(RpcCertificateFormat format) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return mCtx->getCertificate(format); } @@ -168,15 +169,17 @@ static void joinRpcServer(sp<RpcServer>&& thiz) { } void RpcServer::start() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(mJoinThread.get(), "Already started!"); - mJoinThread = std::make_unique<std::thread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); + mJoinThread = + std::make_unique<RpcMaybeThread>(&joinRpcServer, sp<RpcServer>::fromExisting(this)); + rpcJoinIfSingleThreaded(*mJoinThread); } void RpcServer::join() { { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; @@ -204,24 +207,31 @@ void RpcServer::join() { LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); { - std::lock_guard<std::mutex> _l(mLock); - std::thread thread = - std::thread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this), - std::move(clientFd), addr, addrLen); - mConnectingThreads[thread.get_id()] = std::move(thread); + RpcMutexLockGuard _l(mLock); + RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection, + sp<RpcServer>::fromExisting(this), + std::move(clientFd), addr, addrLen); + + auto& threadRef = mConnectingThreads[thread.get_id()]; + threadRef = std::move(thread); + rpcJoinIfSingleThreaded(threadRef); } } LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str()); - { - std::lock_guard<std::mutex> _l(mLock); + if constexpr (kEnableRpcThreads) { + RpcMutexLockGuard _l(mLock); mJoinThreadRunning = false; + } else { + // Multi-threaded builds clear this in shutdown(), but we need it valid + // so the loop above exits cleanly + mShutdownTrigger = nullptr; } mShutdownCv.notify_all(); } bool RpcServer::shutdown() { - std::unique_lock<std::mutex> _l(mLock); + RpcMutexUniqueLock _l(mLock); if (mShutdownTrigger == nullptr) { LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed (already shutdown?)"); return false; @@ -232,10 +242,16 @@ bool RpcServer::shutdown() { for (auto& [id, session] : mSessions) { (void)id; // server lock is a more general lock - std::lock_guard<std::mutex> _lSession(session->mMutex); + RpcMutexLockGuard _lSession(session->mMutex); session->mShutdownTrigger->trigger(); } + if constexpr (!kEnableRpcThreads) { + // In single-threaded mode we're done here, everything else that + // needs to happen should be at the end of RpcServer::join() + return true; + } + while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " @@ -263,7 +279,7 @@ bool RpcServer::shutdown() { } std::vector<sp<RpcSession>> RpcServer::listSessions() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); std::vector<sp<RpcSession>> sessions; for (auto& [id, session] : mSessions) { (void)id; @@ -273,7 +289,7 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() { } size_t RpcServer::numUninitializedSessions() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return mConnectingThreads.size(); } @@ -354,12 +370,12 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } } - std::thread thisThread; + RpcMaybeThread thisThread; sp<RpcSession> session; { - std::unique_lock<std::mutex> _l(server->mLock); + RpcMutexUniqueLock _l(server->mLock); - auto threadId = server->mConnectingThreads.find(std::this_thread::get_id()); + auto threadId = server->mConnectingThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(), "Must establish connection on owned thread"); thisThread = std::move(threadId->second); @@ -505,7 +521,7 @@ void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) LOG_RPC_DETAIL("Dropping session with address %s", base::HexString(id.data(), id.size()).c_str()); - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); auto it = mSessions.find(id); LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s", base::HexString(id.data(), id.size()).c_str()); @@ -519,17 +535,17 @@ void RpcServer::onSessionIncomingThreadEnded() { } bool RpcServer::hasServer() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return mServer.ok(); } unique_fd RpcServer::releaseServer() { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); return std::move(mServer); } status_t RpcServer::setupExternalServer(base::unique_fd serverFd) { - std::lock_guard<std::mutex> _l(mLock); + RpcMutexLockGuard _l(mLock); if (mServer.ok()) { ALOGE("Each RpcServer can only have one server."); return INVALID_OPERATION; |