diff options
Diffstat (limited to 'libs/binder/RpcSession.cpp')
-rw-r--r-- | libs/binder/RpcSession.cpp | 170 |
1 files changed, 111 insertions, 59 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index de9aa220f8..a759ae36c3 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -104,12 +104,18 @@ bool RpcSession::addNullDebuggingClient() { } sp<IBinder> RpcSession::getRootObject() { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT, &connection); + if (status != OK) return nullptr; return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this)); } status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT, &connection); + if (status != OK) return status; return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); } @@ -133,16 +139,22 @@ bool RpcSession::shutdownAndWait(bool wait) { status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), - (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC - : ConnectionUse::CLIENT); + ExclusiveConnection connection; + status_t status = + ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC + : ConnectionUse::CLIENT, + &connection); + if (status != OK) return status; return state()->transact(connection.fd(), binder, code, data, sp<RpcSession>::fromExisting(this), reply, flags); } status_t RpcSession::sendDecStrong(const RpcAddress& address) { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), - ConnectionUse::CLIENT_REFCOUNT); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT_REFCOUNT, &connection); + if (status != OK) return status; return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address); } @@ -208,9 +220,12 @@ status_t RpcSession::readId() { int32_t id; - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT); - status_t status = - state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT, &connection); + if (status != OK) return status; + + status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id); if (status != OK) return status; LOG_RPC_DETAIL("RpcSession %p has id %d", this, id); @@ -236,7 +251,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } -void RpcSession::preJoin(std::thread thread) { +void RpcSession::preJoinThreadOwnership(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { @@ -245,20 +260,36 @@ void RpcSession::preJoin(std::thread thread) { } } -void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { +RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) - sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); + sp<RpcConnection> connection = assignServerToThisThread(std::move(fd)); - while (true) { - status_t error = session->state()->getAndExecuteCommand(connection->fd, session, - RpcState::CommandType::ANY); + status_t status = + mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); - if (error != OK) { - LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", - statusToString(error).c_str()); - break; + return PreJoinSetupResult{ + .connection = std::move(connection), + .status = status, + }; +} + +void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) { + sp<RpcConnection>& connection = setupResult.connection; + + if (setupResult.status == OK) { + while (true) { + status_t status = session->state()->getAndExecuteCommand(connection->fd, session, + RpcState::CommandType::ANY); + if (status != OK) { + LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", + statusToString(status).c_str()); + break; + } } + } else { + ALOGE("Connection failed to init, closing with status %s", + statusToString(setupResult.status).c_str()); } LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), @@ -381,14 +412,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t unique_fd fd = std::move(serverFd); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; - session->preJoin(std::move(thread)); - ownershipTransferred = true; - joinCv.notify_one(); + session->preJoinThreadOwnership(std::move(thread)); + + // only continue once we have a response or the connection fails + auto setupResult = session->preJoinSetup(std::move(fd)); + ownershipTransferred = true; threadLock.unlock(); + joinCv.notify_one(); // do not use & vars below - RpcSession::join(std::move(session), std::move(fd)); + RpcSession::join(std::move(session), std::move(setupResult)); }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); @@ -403,20 +437,32 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t } bool RpcSession::addClientConnection(unique_fd fd) { - std::lock_guard<std::mutex> _l(mMutex); + sp<RpcConnection> connection = sp<RpcConnection>::make(); + { + std::lock_guard<std::mutex> _l(mMutex); + + // first client connection added, but setForServer not called, so + // initializaing for a client. + if (mShutdownTrigger == nullptr) { + mShutdownTrigger = FdTrigger::make(); + mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); + if (mShutdownTrigger == nullptr) return false; + } - // first client connection added, but setForServer not called, so - // initializaing for a client. - if (mShutdownTrigger == nullptr) { - mShutdownTrigger = FdTrigger::make(); - mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); - if (mShutdownTrigger == nullptr) return false; + connection->fd = std::move(fd); + connection->exclusiveTid = gettid(); + mClientConnections.push_back(connection); } - sp<RpcConnection> session = sp<RpcConnection>::make(); - session->fd = std::move(fd); - mClientConnections.push_back(session); - return true; + status_t status = + mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + + { + std::lock_guard<std::mutex> _l(mMutex); + connection->exclusiveTid = std::nullopt; + } + + return status == OK; } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, @@ -462,13 +508,16 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { return false; } -RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& session, - ConnectionUse use) - : mSession(session) { +status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, ConnectionUse use, + ExclusiveConnection* connection) { + connection->mSession = session; + connection->mConnection = nullptr; + connection->mReentrant = false; + pid_t tid = gettid(); - std::unique_lock<std::mutex> _l(mSession->mMutex); + std::unique_lock<std::mutex> _l(session->mMutex); - mSession->mWaitingThreads++; + session->mWaitingThreads++; while (true) { sp<RpcConnection> exclusive; sp<RpcConnection> available; @@ -476,8 +525,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection if available - findConnection(tid, &exclusive, &available, mSession->mClientConnections, - mSession->mClientConnectionsOffset); + findConnection(tid, &exclusive, &available, session->mClientConnections, + session->mClientConnectionsOffset); // WARNING: this assumes a server cannot request its client to send // a transaction, as mServerConnections is excluded below. @@ -490,8 +539,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // command. So, we move to considering the second available thread // for subsequent calls. if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) { - mSession->mClientConnectionsOffset = - (mSession->mClientConnectionsOffset + 1) % mSession->mClientConnections.size(); + session->mClientConnectionsOffset = + (session->mClientConnectionsOffset + 1) % session->mClientConnections.size(); } // USE SERVING SOCKET (for nested transaction) @@ -499,33 +548,36 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // asynchronous calls cannot be nested if (use != ConnectionUse::CLIENT_ASYNC) { // server connections are always assigned to a thread - findConnection(tid, &exclusive, nullptr /*available*/, mSession->mServerConnections, + findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections, 0 /* index hint */); } // if our thread is already using a connection, prioritize using that if (exclusive != nullptr) { - mConnection = exclusive; - mReentrant = true; + connection->mConnection = exclusive; + connection->mReentrant = true; break; } else if (available != nullptr) { - mConnection = available; - mConnection->exclusiveTid = tid; + connection->mConnection = available; + connection->mConnection->exclusiveTid = tid; break; } - // TODO(b/185167543): this should return an error, rather than crash a - // server - // in regular binder, this would usually be a deadlock :) - LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, - "Session has no client connections. This is required for an RPC server " - "to make any non-nested (e.g. oneway or on another thread) calls."); + if (session->mClientConnections.size() == 0) { + ALOGE("Session has no client connections. This is required for an RPC server to make " + "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server " + "connections: %zu", + static_cast<int>(use), session->mServerConnections.size()); + return WOULD_BLOCK; + } LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", - mSession->mClientConnections.size(), mSession->mServerConnections.size()); - mSession->mAvailableConnectionCv.wait(_l); + session->mClientConnections.size(), session->mServerConnections.size()); + session->mAvailableConnectionCv.wait(_l); } - mSession->mWaitingThreads--; + session->mWaitingThreads--; + + return OK; } void RpcSession::ExclusiveConnection::findConnection(pid_t tid, sp<RpcConnection>* exclusive, @@ -559,7 +611,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() { // reentrant use of a connection means something less deep in the call stack // is using this fd, and it retains the right to it. So, we don't give up // exclusive ownership, and no thread is freed. - if (!mReentrant) { + if (!mReentrant && mConnection != nullptr) { std::unique_lock<std::mutex> _l(mSession->mMutex); mConnection->exclusiveTid = std::nullopt; if (mSession->mWaitingThreads > 0) { |