From dd67b94ac8cf1333a5afebbd84f974d156111fee Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Fri, 23 Jul 2021 17:15:41 -0700 Subject: libbinder: fix RPC setup races When setting up connections, there are a few cases where we take the server lock and then we take the session lock. However, when a session is shutting down, there is one case where we took the session lock and then the server lock. This is a big no-no, and it was causing a deadlock in the 'Fds' test (this creates many threads - but it is very shortlived, the threads are still being setup on the server when the process shutsdown, hitting the deadlock occassionally). The solution to this involves keeping a little bit of extra state inside of RpcSession directly to understand when it's shutting down. Also, we now fully cleanup sessions before removing them during the shutdown process. From this point on, we should always take the server lock and then the session lock in order to avoid races (never the session and then the server). Bug: N/A Test: binderRpcTest Test: while $ANDROID_BUILD_TOP/out/host/linux-x86/nativetest/binderRpcTest/binderRpcTest --gtest_filter="*Fd*"; do : ; done Change-Id: I9144c43939c0640a2ec53f93f6e685ddce4b3e83 --- libs/binder/RpcServer.cpp | 2 +- libs/binder/RpcSession.cpp | 38 ++++++++++++++++++++++++++------- libs/binder/include/binder/RpcServer.h | 2 +- libs/binder/include/binder/RpcSession.h | 7 +++--- 4 files changed, 36 insertions(+), 13 deletions(-) diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index c6cf2c5ee8..200d923b6d 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -369,7 +369,7 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { return true; } -void RpcServer::onSessionLockedAllIncomingThreadsEnded(const sp& session) { +void RpcServer::onSessionAllIncomingThreadsEnded(const sp& session) { auto id = session->mId; LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID"); LOG_RPC_DETAIL("Dropping session with address %s", id->toString().c_str()); diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index c01a03d4c6..5d1df83fe2 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -132,6 +132,7 @@ bool RpcSession::shutdownAndWait(bool wait) { if (wait) { LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownListener->waitForShutdown(_l); + LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); } @@ -261,7 +262,7 @@ status_t RpcSession::readId() { return OK; } -void RpcSession::WaitForShutdownListener::onSessionLockedAllIncomingThreadsEnded( +void RpcSession::WaitForShutdownListener::onSessionAllIncomingThreadsEnded( const sp& session) { (void)session; mShutdown = true; @@ -293,7 +294,13 @@ RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // be able to do nested calls (we can't only read from it) sp connection = assignIncomingConnectionToThisThread(std::move(fd)); - status_t status = mState->readConnectionInit(connection, sp::fromExisting(this)); + status_t status; + + if (connection == nullptr) { + status = DEAD_OBJECT; + } else { + status = mState->readConnectionInit(connection, sp::fromExisting(this)); + } return PreJoinSetupResult{ .connection = std::move(connection), @@ -360,6 +367,7 @@ void RpcSession::join(sp&& session, PreJoinSetupResult&& setupResult sp& connection = setupResult.connection; if (setupResult.status == OK) { + LOG_ALWAYS_FATAL_IF(!connection, "must have connection if setup succeeded"); JavaThreadAttacher javaThreadAttacher; while (true) { status_t status = session->state()->getAndExecuteCommand(connection, session, @@ -375,9 +383,6 @@ void RpcSession::join(sp&& session, PreJoinSetupResult&& setupResult statusToString(setupResult.status).c_str()); } - LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection), - "bad state: connection object guaranteed to be in list"); - sp listener; { std::lock_guard _l(session->mMutex); @@ -389,6 +394,12 @@ void RpcSession::join(sp&& session, PreJoinSetupResult&& setupResult listener = session->mEventListener.promote(); } + // done after all cleanup, since session shutdown progresses via callbacks here + if (connection != nullptr) { + LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection), + "bad state: connection object guaranteed to be in list"); + } + session = nullptr; if (listener != nullptr) { @@ -579,24 +590,35 @@ bool RpcSession::setForServer(const wp& server, const wp RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) { std::lock_guard _l(mMutex); + + // Don't accept any more connections, some have shutdown. Usually this + // happens when new connections are still being established as part of a + // very short-lived session which shuts down after it already started + // accepting new connections. + if (mIncomingConnections.size() < mMaxIncomingConnections) { + return nullptr; + } + sp session = sp::make(); session->fd = std::move(fd); session->exclusiveTid = gettid(); + mIncomingConnections.push_back(session); + mMaxIncomingConnections = mIncomingConnections.size(); return session; } bool RpcSession::removeIncomingConnection(const sp& connection) { - std::lock_guard _l(mMutex); + std::unique_lock _l(mMutex); if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection); it != mIncomingConnections.end()) { mIncomingConnections.erase(it); if (mIncomingConnections.size() == 0) { sp listener = mEventListener.promote(); if (listener) { - listener->onSessionLockedAllIncomingThreadsEnded( - sp::fromExisting(this)); + _l.unlock(); + listener->onSessionAllIncomingThreadsEnded(sp::fromExisting(this)); } } return true; diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index c8d2857347..a8094dd081 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -156,7 +156,7 @@ private: friend sp; RpcServer(); - void onSessionLockedAllIncomingThreadsEnded(const sp& session) override; + void onSessionAllIncomingThreadsEnded(const sp& session) override; void onSessionIncomingThreadEnded() override; static void establishConnection(sp&& server, base::unique_fd clientFd); diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index fdca2a987c..2101df85c4 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -177,19 +177,19 @@ private: class EventListener : public virtual RefBase { public: - virtual void onSessionLockedAllIncomingThreadsEnded(const sp& session) = 0; + virtual void onSessionAllIncomingThreadsEnded(const sp& session) = 0; virtual void onSessionIncomingThreadEnded() = 0; }; class WaitForShutdownListener : public EventListener { public: - void onSessionLockedAllIncomingThreadsEnded(const sp& session) override; + void onSessionAllIncomingThreadsEnded(const sp& session) override; void onSessionIncomingThreadEnded() override; void waitForShutdown(std::unique_lock& lock); private: std::condition_variable mCv; - bool mShutdown = false; + volatile bool mShutdown = false; }; struct RpcConnection : public RefBase { @@ -297,6 +297,7 @@ private: // hint index into clients, ++ when sending an async transaction size_t mOutgoingConnectionsOffset = 0; std::vector> mOutgoingConnections; + size_t mMaxIncomingConnections = 0; std::vector> mIncomingConnections; std::map mThreads; }; -- cgit v1.2.3-59-g8ed1b