diff options
Diffstat (limited to 'libs/binder/RpcSession.cpp')
-rw-r--r-- | libs/binder/RpcSession.cpp | 67 |
1 files changed, 36 insertions, 31 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 41842a7d84..2d9c93341f 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -21,7 +21,6 @@ #include <dlfcn.h> #include <inttypes.h> #include <poll.h> -#include <pthread.h> #include <unistd.h> #include <string_view> @@ -60,7 +59,7 @@ RpcSession::RpcSession(std::unique_ptr<RpcTransportCtx> ctx) : mCtx(std::move(ct RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0, "Should not be able to destroy a session with servers in use."); } @@ -77,7 +76,7 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans } void RpcSession::setMaxIncomingThreads(size_t threads) { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max incoming threads before setting up connections, but has %zu " "client(s) and %zu server(s)", @@ -86,12 +85,12 @@ void RpcSession::setMaxIncomingThreads(size_t threads) { } size_t RpcSession::getMaxIncomingThreads() { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); return mMaxIncomingThreads; } void RpcSession::setMaxOutgoingThreads(size_t threads) { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max outgoing threads before setting up connections, but has %zu " "client(s) and %zu server(s)", @@ -100,7 +99,7 @@ void RpcSession::setMaxOutgoingThreads(size_t threads) { } size_t RpcSession::getMaxOutgoingThreads() { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); return mMaxOutgoingThreads; } @@ -113,7 +112,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { return false; } - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); if (mProtocolVersion && version > *mProtocolVersion) { ALOGE("Cannot upgrade explicitly capped protocol version %u to newer version %u", *mProtocolVersion, version); @@ -125,7 +124,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) { } std::optional<uint32_t> RpcSession::getProtocolVersion() { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); return mProtocolVersion; } @@ -209,7 +208,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { } bool RpcSession::shutdownAndWait(bool wait) { - std::unique_lock<std::mutex> _l(mMutex); + RpcMutexUniqueLock _l(mMutex); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); mShutdownTrigger->trigger(); @@ -222,6 +221,7 @@ bool RpcSession::shutdownAndWait(bool wait) { } _l.unlock(); + mRpcBinderState->clear(); return true; @@ -256,7 +256,7 @@ status_t RpcSession::sendDecStrongToTarget(uint64_t address, size_t target) { status_t RpcSession::readId() { { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client."); } @@ -282,7 +282,7 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { mCv.notify_all(); } -void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock, +void RpcSession::WaitForShutdownListener::waitForShutdown(RpcMutexUniqueLock& lock, const sp<RpcSession>& session) { while (session->mConnections.mIncoming.size() > 0) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { @@ -293,11 +293,11 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } -void RpcSession::preJoinThreadOwnership(std::thread thread) { - LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); +void RpcSession::preJoinThreadOwnership(RpcMaybeThread thread) { + LOG_ALWAYS_FATAL_IF(thread.get_id() != rpc_this_thread::get_id(), "Must own this thread"); { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); mConnections.mThreads[thread.get_id()] = std::move(thread); } } @@ -404,8 +404,8 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult sp<RpcSession::EventListener> listener; { - std::lock_guard<std::mutex> _l(session->mMutex); - auto it = session->mConnections.mThreads.find(std::this_thread::get_id()); + RpcMutexLockGuard _l(session->mMutex); + auto it = session->mConnections.mThreads.find(rpc_this_thread::get_id()); LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end()); it->second.detach(); session->mConnections.mThreads.erase(it); @@ -438,7 +438,7 @@ sp<RpcServer> RpcSession::server() { status_t RpcSession::setupClient(const std::function<status_t(const std::vector<uint8_t>& sessionId, bool incoming)>& connectAndInit) { { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0, "Must only setup session once, but already has %zu clients", mConnections.mOutgoing.size()); @@ -500,7 +500,11 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< return status; } +#ifdef BINDER_RPC_SINGLE_THREADED + constexpr size_t outgoingThreads = 1; +#else // BINDER_RPC_SINGLE_THREADED size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads); +#endif // BINDER_RPC_SINGLE_THREADED ALOGI_IF(outgoingThreads != numThreadsAvailable, "Server hints client to start %zu outgoing threads, but client will only start %zu " "because it is preconfigured to start at most %zu outgoing threads.", @@ -655,14 +659,14 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_ } status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) { - std::mutex mutex; - std::condition_variable joinCv; - std::unique_lock<std::mutex> lock(mutex); - std::thread thread; + RpcMutex mutex; + RpcConditionVariable joinCv; + RpcMutexUniqueLock lock(mutex); + RpcMaybeThread thread; sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); bool ownershipTransferred = false; - thread = std::thread([&]() { - std::unique_lock<std::mutex> threadLock(mutex); + thread = RpcMaybeThread([&]() { + RpcMutexUniqueLock threadLock(mutex); std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; @@ -678,6 +682,7 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran RpcSession::join(std::move(session), std::move(setupResult)); }); + rpcJoinIfSingleThreaded(thread); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return OK; @@ -697,9 +702,9 @@ status_t RpcSession::initShutdownTrigger() { status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); connection->rpcTransport = std::move(rpcTransport); - connection->exclusiveTid = base::GetThreadId(); + connection->exclusiveTid = rpcGetThreadId(); mConnections.mOutgoing.push_back(connection); } @@ -736,7 +741,7 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( std::unique_ptr<RpcTransport> rpcTransport) { - std::lock_guard<std::mutex> _l(mMutex); + RpcMutexLockGuard _l(mMutex); if (mConnections.mIncoming.size() >= mMaxIncomingThreads) { ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)", @@ -754,7 +759,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( sp<RpcConnection> session = sp<RpcConnection>::make(); session->rpcTransport = std::move(rpcTransport); - session->exclusiveTid = base::GetThreadId(); + session->exclusiveTid = rpcGetThreadId(); mConnections.mIncoming.push_back(session); mConnections.mMaxIncoming = mConnections.mIncoming.size(); @@ -763,7 +768,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( } bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { - std::unique_lock<std::mutex> _l(mMutex); + RpcMutexUniqueLock _l(mMutex); if (auto it = std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection); it != mConnections.mIncoming.end()) { @@ -781,7 +786,7 @@ bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { } void RpcSession::clearConnectionTid(const sp<RpcConnection>& connection) { - std::unique_lock<std::mutex> _l(mMutex); + RpcMutexUniqueLock _l(mMutex); connection->exclusiveTid = std::nullopt; if (mConnections.mWaitingThreads > 0) { _l.unlock(); @@ -799,8 +804,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co connection->mConnection = nullptr; connection->mReentrant = false; - uint64_t tid = base::GetThreadId(); - std::unique_lock<std::mutex> _l(session->mMutex); + uint64_t tid = rpcGetThreadId(); + RpcMutexUniqueLock _l(session->mMutex); session->mConnections.mWaitingThreads++; while (true) { |