diff options
Diffstat (limited to 'libs/binder/RpcSession.cpp')
| -rw-r--r-- | libs/binder/RpcSession.cpp | 135 |
1 files changed, 78 insertions, 57 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index a759ae36c3..4f55eef2d1 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -51,7 +51,7 @@ RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mServerConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mIncomingConnections.size() != 0, "Should not be able to destroy a session with servers in use."); } @@ -61,10 +61,10 @@ sp<RpcSession> RpcSession::make() { void RpcSession::setMaxThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(), + LOG_ALWAYS_FATAL_IF(!mOutgoingConnections.empty() || !mIncomingConnections.empty(), "Must set max threads before setting up connections, but has %zu client(s) " "and %zu server(s)", - mClientConnections.size(), mServerConnections.size()); + mOutgoingConnections.size(), mIncomingConnections.size()); mMaxThreads = threads; } @@ -100,7 +100,7 @@ bool RpcSession::addNullDebuggingClient() { return false; } - return addClientConnection(std::move(serverFd)); + return addOutgoingConnection(std::move(serverFd), false); } sp<IBinder> RpcSession::getRootObject() { @@ -108,7 +108,7 @@ sp<IBinder> RpcSession::getRootObject() { status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return nullptr; - return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this)); + return state()->getRootObject(connection.get(), sp<RpcSession>::fromExisting(this)); } status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { @@ -116,7 +116,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return status; - return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); + return state()->getMaxThreads(connection.get(), sp<RpcSession>::fromExisting(this), maxThreads); } bool RpcSession::shutdownAndWait(bool wait) { @@ -146,7 +146,7 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa : ConnectionUse::CLIENT, &connection); if (status != OK) return status; - return state()->transact(connection.fd(), binder, code, data, + return state()->transact(connection.get(), binder, code, data, sp<RpcSession>::fromExisting(this), reply, flags); } @@ -155,7 +155,7 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) { status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT_REFCOUNT, &connection); if (status != OK) return status; - return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address); + return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address); } std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() { @@ -218,28 +218,27 @@ status_t RpcSession::readId() { LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client."); } - int32_t id; - ExclusiveConnection connection; status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return status; - status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id); + mId = RpcAddress::zero(); + status = state()->getSessionId(connection.get(), sp<RpcSession>::fromExisting(this), + &mId.value()); if (status != OK) return status; - LOG_RPC_DETAIL("RpcSession %p has id %d", this, id); - mId = id; + LOG_RPC_DETAIL("RpcSession %p has id %s", this, mId->toString().c_str()); return OK; } -void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded( +void RpcSession::WaitForShutdownListener::onSessionLockedAllIncomingThreadsEnded( const sp<RpcSession>& session) { (void)session; mShutdown = true; } -void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() { +void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { mCv.notify_all(); } @@ -263,10 +262,9 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) { RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) - sp<RpcConnection> connection = assignServerToThisThread(std::move(fd)); + sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd)); - status_t status = - mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + status_t status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this)); return PreJoinSetupResult{ .connection = std::move(connection), @@ -279,7 +277,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult if (setupResult.status == OK) { while (true) { - status_t status = session->state()->getAndExecuteCommand(connection->fd, session, + status_t status = session->state()->getAndExecuteCommand(connection, session, RpcState::CommandType::ANY); if (status != OK) { LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", @@ -292,7 +290,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult statusToString(setupResult.status).c_str()); } - LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), + LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection), "bad state: connection object guaranteed to be in list"); sp<RpcSession::EventListener> listener; @@ -309,23 +307,28 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult session = nullptr; if (listener != nullptr) { - listener->onSessionServerThreadEnded(); + listener->onSessionIncomingThreadEnded(); } } -wp<RpcServer> RpcSession::server() { - return mForServer; +sp<RpcServer> RpcSession::server() { + RpcServer* unsafeServer = mForServer.unsafe_get(); + sp<RpcServer> server = mForServer.promote(); + + LOG_ALWAYS_FATAL_IF((unsafeServer == nullptr) != (server == nullptr), + "wp<> is to avoid strong cycle only"); + return server; } bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0, "Must only setup session once, but already has %zu clients", - mClientConnections.size()); + mOutgoingConnections.size()); } - if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false; + if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false; // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once. @@ -362,7 +365,8 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { return true; } -bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) { +bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id, + bool reverse) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); @@ -386,9 +390,9 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t return false; } - RpcConnectionHeader header{ - .sessionId = id, - }; + RpcConnectionHeader header{.options = 0}; + memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress)); + if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE; if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) { @@ -428,7 +432,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return true; } else { - return addClientConnection(std::move(serverFd)); + return addOutgoingConnection(std::move(serverFd), true); } } @@ -436,7 +440,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t return false; } -bool RpcSession::addClientConnection(unique_fd fd) { +bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); @@ -451,11 +455,13 @@ bool RpcSession::addClientConnection(unique_fd fd) { connection->fd = std::move(fd); connection->exclusiveTid = gettid(); - mClientConnections.push_back(connection); + mOutgoingConnections.push_back(connection); } - status_t status = - mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + status_t status = OK; + if (init) { + mState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this)); + } { std::lock_guard<std::mutex> _l(mMutex); @@ -466,7 +472,7 @@ bool RpcSession::addClientConnection(unique_fd fd) { } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, - int32_t sessionId) { + const RpcAddress& sessionId) { LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); @@ -482,25 +488,26 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene return true; } -sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) { +sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) { std::lock_guard<std::mutex> _l(mMutex); sp<RpcConnection> session = sp<RpcConnection>::make(); session->fd = std::move(fd); session->exclusiveTid = gettid(); - mServerConnections.push_back(session); + mIncomingConnections.push_back(session); return session; } -bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { +bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { std::lock_guard<std::mutex> _l(mMutex); - if (auto it = std::find(mServerConnections.begin(), mServerConnections.end(), connection); - it != mServerConnections.end()) { - mServerConnections.erase(it); - if (mServerConnections.size() == 0) { + if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection); + it != mIncomingConnections.end()) { + mIncomingConnections.erase(it); + if (mIncomingConnections.size() == 0) { sp<EventListener> listener = mEventListener.promote(); if (listener) { - listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this)); + listener->onSessionLockedAllIncomingThreadsEnded( + sp<RpcSession>::fromExisting(this)); } } return true; @@ -525,11 +532,11 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection if available - findConnection(tid, &exclusive, &available, session->mClientConnections, - session->mClientConnectionsOffset); + findConnection(tid, &exclusive, &available, session->mOutgoingConnections, + session->mOutgoingConnectionsOffset); // WARNING: this assumes a server cannot request its client to send - // a transaction, as mServerConnections is excluded below. + // a transaction, as mIncomingConnections is excluded below. // // Imagine we have more than one thread in play, and a single thread // sends a synchronous, then an asynchronous command. Imagine the @@ -539,17 +546,31 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // command. So, we move to considering the second available thread // for subsequent calls. if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) { - session->mClientConnectionsOffset = - (session->mClientConnectionsOffset + 1) % session->mClientConnections.size(); + session->mOutgoingConnectionsOffset = (session->mOutgoingConnectionsOffset + 1) % + session->mOutgoingConnections.size(); } - // USE SERVING SOCKET (for nested transaction) - // - // asynchronous calls cannot be nested + // USE SERVING SOCKET (e.g. nested transaction) if (use != ConnectionUse::CLIENT_ASYNC) { + sp<RpcConnection> exclusiveIncoming; // server connections are always assigned to a thread - findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections, - 0 /* index hint */); + findConnection(tid, &exclusiveIncoming, nullptr /*available*/, + session->mIncomingConnections, 0 /* index hint */); + + // asynchronous calls cannot be nested, we currently allow ref count + // calls to be nested (so that you can use this without having extra + // threads). Note 'drainCommands' is used so that these ref counts can't + // build up. + if (exclusiveIncoming != nullptr) { + if (exclusiveIncoming->allowNested) { + // guaranteed to be processed as nested command + exclusive = exclusiveIncoming; + } else if (use == ConnectionUse::CLIENT_REFCOUNT && available == nullptr) { + // prefer available socket, but if we don't have one, don't + // wait for one + exclusive = exclusiveIncoming; + } + } } // if our thread is already using a connection, prioritize using that @@ -563,16 +584,16 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co break; } - if (session->mClientConnections.size() == 0) { + if (session->mOutgoingConnections.size() == 0) { ALOGE("Session has no client connections. This is required for an RPC server to make " "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server " "connections: %zu", - static_cast<int>(use), session->mServerConnections.size()); + static_cast<int>(use), session->mIncomingConnections.size()); return WOULD_BLOCK; } LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", - session->mClientConnections.size(), session->mServerConnections.size()); + session->mOutgoingConnections.size(), session->mIncomingConnections.size()); session->mAvailableConnectionCv.wait(_l); } session->mWaitingThreads--; |