diff options
author | 2021-10-08 22:04:26 +0000 | |
---|---|---|
committer | 2021-10-08 22:04:26 +0000 | |
commit | a25b01205608f993dfdc5f9589a9eef4f159baa4 (patch) | |
tree | 219f8fe9217c6b7d103df870c5d2c98778bbfc3c | |
parent | 14c95f0218fe027e81ae8d780c77c9c18d89de54 (diff) | |
parent | a59937e177f2feffbadc4ab093ba634113edfa3f (diff) |
Merge "libbinder: RPC mThreadState -> mConnections"
-rw-r--r-- | libs/binder/RpcSession.cpp | 77 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 18 |
2 files changed, 46 insertions, 49 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 4465b8ef87..37f6c7ff45 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -61,7 +61,7 @@ RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mThreadState.mIncomingConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0, "Should not be able to destroy a session with servers in use."); } @@ -78,12 +78,10 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans void RpcSession::setMaxThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(!mThreadState.mOutgoingConnections.empty() || - !mThreadState.mIncomingConnections.empty(), + LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), "Must set max threads before setting up connections, but has %zu client(s) " "and %zu server(s)", - mThreadState.mOutgoingConnections.size(), - mThreadState.mIncomingConnections.size()); + mConnections.mOutgoing.size(), mConnections.mIncoming.size()); mMaxThreads = threads; } @@ -197,7 +195,7 @@ bool RpcSession::shutdownAndWait(bool wait) { LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownListener->waitForShutdown(_l, sp<RpcSession>::fromExisting(this)); - LOG_ALWAYS_FATAL_IF(!mThreadState.mThreads.empty(), "Shutdown failed"); + LOG_ALWAYS_FATAL_IF(!mConnections.mThreads.empty(), "Shutdown failed"); } _l.unlock(); @@ -263,11 +261,11 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock, const sp<RpcSession>& session) { - while (session->mThreadState.mIncomingConnections.size() > 0) { + while (session->mConnections.mIncoming.size() > 0) { if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) { ALOGE("Waiting for RpcSession to shut down (1s w/o progress): %zu incoming connections " "still.", - session->mThreadState.mIncomingConnections.size()); + session->mConnections.mIncoming.size()); } } } @@ -277,7 +275,7 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) { { std::lock_guard<std::mutex> _l(mMutex); - mThreadState.mThreads[thread.get_id()] = std::move(thread); + mConnections.mThreads[thread.get_id()] = std::move(thread); } } @@ -380,10 +378,10 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult sp<RpcSession::EventListener> listener; { std::lock_guard<std::mutex> _l(session->mMutex); - auto it = session->mThreadState.mThreads.find(std::this_thread::get_id()); - LOG_ALWAYS_FATAL_IF(it == session->mThreadState.mThreads.end()); + auto it = session->mConnections.mThreads.find(std::this_thread::get_id()); + LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end()); it->second.detach(); - session->mThreadState.mThreads.erase(it); + session->mConnections.mThreads.erase(it); listener = session->mEventListener.promote(); } @@ -414,9 +412,9 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< bool incoming)>& connectAndInit) { { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mThreadState.mOutgoingConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0, "Must only setup session once, but already has %zu clients", - mThreadState.mOutgoingConnections.size()); + mConnections.mOutgoing.size()); } if (auto status = initShutdownTrigger(); status != OK) return status; @@ -439,7 +437,7 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< // downgrade again mProtocolVersion = oldProtocolVersion; - mThreadState = {}; + mConnections = {}; }); if (status_t status = connectAndInit({}, false /*incoming*/); status != OK) return status; @@ -662,7 +660,7 @@ status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTran std::lock_guard<std::mutex> _l(mMutex); connection->rpcTransport = std::move(rpcTransport); connection->exclusiveTid = gettid(); - mThreadState.mOutgoingConnections.push_back(connection); + mConnections.mOutgoing.push_back(connection); } status_t status = OK; @@ -699,9 +697,9 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( std::unique_ptr<RpcTransport> rpcTransport) { std::lock_guard<std::mutex> _l(mMutex); - if (mThreadState.mIncomingConnections.size() >= mMaxThreads) { + if (mConnections.mIncoming.size() >= mMaxThreads) { ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)", - mThreadState.mIncomingConnections.size(), mMaxThreads); + mConnections.mIncoming.size(), mMaxThreads); return nullptr; } @@ -709,7 +707,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( // happens when new connections are still being established as part of a // very short-lived session which shuts down after it already started // accepting new connections. - if (mThreadState.mIncomingConnections.size() < mThreadState.mMaxIncomingConnections) { + if (mConnections.mIncoming.size() < mConnections.mMaxIncoming) { return nullptr; } @@ -717,19 +715,19 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread( session->rpcTransport = std::move(rpcTransport); session->exclusiveTid = gettid(); - mThreadState.mIncomingConnections.push_back(session); - mThreadState.mMaxIncomingConnections = mThreadState.mIncomingConnections.size(); + mConnections.mIncoming.push_back(session); + mConnections.mMaxIncoming = mConnections.mIncoming.size(); return session; } bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { std::unique_lock<std::mutex> _l(mMutex); - if (auto it = std::find(mThreadState.mIncomingConnections.begin(), - mThreadState.mIncomingConnections.end(), connection); - it != mThreadState.mIncomingConnections.end()) { - mThreadState.mIncomingConnections.erase(it); - if (mThreadState.mIncomingConnections.size() == 0) { + if (auto it = + std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection); + it != mConnections.mIncoming.end()) { + mConnections.mIncoming.erase(it); + if (mConnections.mIncoming.size() == 0) { sp<EventListener> listener = mEventListener.promote(); if (listener) { _l.unlock(); @@ -754,7 +752,7 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co pid_t tid = gettid(); std::unique_lock<std::mutex> _l(session->mMutex); - session->mThreadState.mWaitingThreads++; + session->mConnections.mWaitingThreads++; while (true) { sp<RpcConnection> exclusive; sp<RpcConnection> available; @@ -762,11 +760,11 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection if available - findConnection(tid, &exclusive, &available, session->mThreadState.mOutgoingConnections, - session->mThreadState.mOutgoingConnectionsOffset); + findConnection(tid, &exclusive, &available, session->mConnections.mOutgoing, + session->mConnections.mOutgoingOffset); // WARNING: this assumes a server cannot request its client to send - // a transaction, as mIncomingConnections is excluded below. + // a transaction, as mIncoming is excluded below. // // Imagine we have more than one thread in play, and a single thread // sends a synchronous, then an asynchronous command. Imagine the @@ -776,9 +774,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // command. So, we move to considering the second available thread // for subsequent calls. if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) { - session->mThreadState.mOutgoingConnectionsOffset = - (session->mThreadState.mOutgoingConnectionsOffset + 1) % - session->mThreadState.mOutgoingConnections.size(); + session->mConnections.mOutgoingOffset = (session->mConnections.mOutgoingOffset + 1) % + session->mConnections.mOutgoing.size(); } // USE SERVING SOCKET (e.g. nested transaction) @@ -786,7 +783,7 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co sp<RpcConnection> exclusiveIncoming; // server connections are always assigned to a thread findConnection(tid, &exclusiveIncoming, nullptr /*available*/, - session->mThreadState.mIncomingConnections, 0 /* index hint */); + session->mConnections.mIncoming, 0 /* index hint */); // asynchronous calls cannot be nested, we currently allow ref count // calls to be nested (so that you can use this without having extra @@ -815,20 +812,20 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co break; } - if (session->mThreadState.mOutgoingConnections.size() == 0) { + if (session->mConnections.mOutgoing.size() == 0) { ALOGE("Session has no client connections. This is required for an RPC server to make " "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server " "connections: %zu", - static_cast<int>(use), session->mThreadState.mIncomingConnections.size()); + static_cast<int>(use), session->mConnections.mIncoming.size()); return WOULD_BLOCK; } LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", - session->mThreadState.mOutgoingConnections.size(), - session->mThreadState.mIncomingConnections.size()); + session->mConnections.mOutgoing.size(), + session->mConnections.mIncoming.size()); session->mAvailableConnectionCv.wait(_l); } - session->mThreadState.mWaitingThreads--; + session->mConnections.mWaitingThreads--; return OK; } @@ -867,7 +864,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() { if (!mReentrant && mConnection != nullptr) { std::unique_lock<std::mutex> _l(mSession->mMutex); mConnection->exclusiveTid = std::nullopt; - if (mSession->mThreadState.mWaitingThreads > 0) { + if (mSession->mConnections.mWaitingThreads > 0) { _l.unlock(); mSession->mAvailableConnectionCv.notify_one(); } diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 19888b7bf7..0fcee90ed6 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -281,13 +281,13 @@ private: const std::unique_ptr<RpcTransportCtx> mCtx; - // On the other side of a session, for each of mOutgoingConnections here, there should - // be one of mIncomingConnections on the other side (and vice versa). + // On the other side of a session, for each of mOutgoing here, there should + // be one of mIncoming on the other side (and vice versa). // // For the simplest session, a single server with one client, you would // have: - // - the server has a single 'mIncomingConnections' and a thread listening on this - // - the client has a single 'mOutgoingConnections' and makes calls to this + // - the server has a single 'mIncoming' and a thread listening on this + // - the client has a single 'mOutgoing' and makes calls to this // - here, when the client makes a call, the server can call back into it // (nested calls), but outside of this, the client will only ever read // calls from the server when it makes a call itself. @@ -315,12 +315,12 @@ private: struct ThreadState { size_t mWaitingThreads = 0; // hint index into clients, ++ when sending an async transaction - size_t mOutgoingConnectionsOffset = 0; - std::vector<sp<RpcConnection>> mOutgoingConnections; - size_t mMaxIncomingConnections = 0; - std::vector<sp<RpcConnection>> mIncomingConnections; + size_t mOutgoingOffset = 0; + std::vector<sp<RpcConnection>> mOutgoing; + size_t mMaxIncoming = 0; + std::vector<sp<RpcConnection>> mIncoming; std::map<std::thread::id, std::thread> mThreads; - } mThreadState; + } mConnections; }; } // namespace android |