summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Steven Moreland <smoreland@google.com> 2021-07-23 17:15:41 -0700
committer Steven Moreland <smoreland@google.com> 2021-07-23 17:49:09 -0700
commitdd67b94ac8cf1333a5afebbd84f974d156111fee (patch)
tree6893fa3e48f406865537453d2b922c024c43a894
parentc18818c651a5c99c3945d455fb1f9045ca79322c (diff)
libbinder: fix RPC setup races
When setting up connections, there are a few cases where we take the server lock and then we take the session lock. However, when a session is shutting down, there is one case where we took the session lock and then the server lock. This is a big no-no, and it was causing a deadlock in the 'Fds' test (this creates many threads - but it is very shortlived, the threads are still being setup on the server when the process shutsdown, hitting the deadlock occassionally). The solution to this involves keeping a little bit of extra state inside of RpcSession directly to understand when it's shutting down. Also, we now fully cleanup sessions before removing them during the shutdown process. From this point on, we should always take the server lock and then the session lock in order to avoid races (never the session and then the server). Bug: N/A Test: binderRpcTest Test: while $ANDROID_BUILD_TOP/out/host/linux-x86/nativetest/binderRpcTest/binderRpcTest --gtest_filter="*Fd*"; do : ; done Change-Id: I9144c43939c0640a2ec53f93f6e685ddce4b3e83
-rw-r--r--libs/binder/RpcServer.cpp2
-rw-r--r--libs/binder/RpcSession.cpp38
-rw-r--r--libs/binder/include/binder/RpcServer.h2
-rw-r--r--libs/binder/include/binder/RpcSession.h7
4 files changed, 36 insertions, 13 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index c6cf2c5ee8..200d923b6d 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -369,7 +369,7 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
return true;
}
-void RpcServer::onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) {
+void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) {
auto id = session->mId;
LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID");
LOG_RPC_DETAIL("Dropping session with address %s", id->toString().c_str());
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index c01a03d4c6..5d1df83fe2 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -132,6 +132,7 @@ bool RpcSession::shutdownAndWait(bool wait) {
if (wait) {
LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
mShutdownListener->waitForShutdown(_l);
+
LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
}
@@ -261,7 +262,7 @@ status_t RpcSession::readId() {
return OK;
}
-void RpcSession::WaitForShutdownListener::onSessionLockedAllIncomingThreadsEnded(
+void RpcSession::WaitForShutdownListener::onSessionAllIncomingThreadsEnded(
const sp<RpcSession>& session) {
(void)session;
mShutdown = true;
@@ -293,7 +294,13 @@ RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
// be able to do nested calls (we can't only read from it)
sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd));
- status_t status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this));
+ status_t status;
+
+ if (connection == nullptr) {
+ status = DEAD_OBJECT;
+ } else {
+ status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this));
+ }
return PreJoinSetupResult{
.connection = std::move(connection),
@@ -360,6 +367,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
sp<RpcConnection>& connection = setupResult.connection;
if (setupResult.status == OK) {
+ LOG_ALWAYS_FATAL_IF(!connection, "must have connection if setup succeeded");
JavaThreadAttacher javaThreadAttacher;
while (true) {
status_t status = session->state()->getAndExecuteCommand(connection, session,
@@ -375,9 +383,6 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
statusToString(setupResult.status).c_str());
}
- LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection),
- "bad state: connection object guaranteed to be in list");
-
sp<RpcSession::EventListener> listener;
{
std::lock_guard<std::mutex> _l(session->mMutex);
@@ -389,6 +394,12 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
listener = session->mEventListener.promote();
}
+ // done after all cleanup, since session shutdown progresses via callbacks here
+ if (connection != nullptr) {
+ LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection),
+ "bad state: connection object guaranteed to be in list");
+ }
+
session = nullptr;
if (listener != nullptr) {
@@ -579,24 +590,35 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene
sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) {
std::lock_guard<std::mutex> _l(mMutex);
+
+ // Don't accept any more connections, some have shutdown. Usually this
+ // happens when new connections are still being established as part of a
+ // very short-lived session which shuts down after it already started
+ // accepting new connections.
+ if (mIncomingConnections.size() < mMaxIncomingConnections) {
+ return nullptr;
+ }
+
sp<RpcConnection> session = sp<RpcConnection>::make();
session->fd = std::move(fd);
session->exclusiveTid = gettid();
+
mIncomingConnections.push_back(session);
+ mMaxIncomingConnections = mIncomingConnections.size();
return session;
}
bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) {
- std::lock_guard<std::mutex> _l(mMutex);
+ std::unique_lock<std::mutex> _l(mMutex);
if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection);
it != mIncomingConnections.end()) {
mIncomingConnections.erase(it);
if (mIncomingConnections.size() == 0) {
sp<EventListener> listener = mEventListener.promote();
if (listener) {
- listener->onSessionLockedAllIncomingThreadsEnded(
- sp<RpcSession>::fromExisting(this));
+ _l.unlock();
+ listener->onSessionAllIncomingThreadsEnded(sp<RpcSession>::fromExisting(this));
}
}
return true;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index c8d2857347..a8094dd081 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -156,7 +156,7 @@ private:
friend sp<RpcServer>;
RpcServer();
- void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
+ void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
void onSessionIncomingThreadEnded() override;
static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index fdca2a987c..2101df85c4 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -177,19 +177,19 @@ private:
class EventListener : public virtual RefBase {
public:
- virtual void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
+ virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
virtual void onSessionIncomingThreadEnded() = 0;
};
class WaitForShutdownListener : public EventListener {
public:
- void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
+ void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
void onSessionIncomingThreadEnded() override;
void waitForShutdown(std::unique_lock<std::mutex>& lock);
private:
std::condition_variable mCv;
- bool mShutdown = false;
+ volatile bool mShutdown = false;
};
struct RpcConnection : public RefBase {
@@ -297,6 +297,7 @@ private:
// hint index into clients, ++ when sending an async transaction
size_t mOutgoingConnectionsOffset = 0;
std::vector<sp<RpcConnection>> mOutgoingConnections;
+ size_t mMaxIncomingConnections = 0;
std::vector<sp<RpcConnection>> mIncomingConnections;
std::map<std::thread::id, std::thread> mThreads;
};