summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Steven Moreland <smoreland@google.com> 2021-05-22 01:07:33 +0000
committer Steven Moreland <smoreland@google.com> 2021-05-22 02:26:03 +0000
commitee3f46696a906c2a06443e5eb4501c44ada9f9a6 (patch)
treef6b73b14e1179df7326450fb060bc5a01613945e
parent2b4f380c6548b8c146962eab8bc070e6c3647dc4 (diff)
libbinder: shutdown session threads
The last piece to completely shutting down servers (this is in preparation for adding threadpools to server callbacks, which actually need to be shut down during normal usage). Bug: 185167543 Test: binderRpcTest Change-Id: I20d6ac16c58fe6801545fa7be178518201fe075d
-rw-r--r--libs/binder/RpcServer.cpp14
-rw-r--r--libs/binder/RpcSession.cpp20
-rw-r--r--libs/binder/RpcState.cpp39
-rw-r--r--libs/binder/RpcState.h4
-rw-r--r--libs/binder/include/binder/RpcServer.h3
-rw-r--r--libs/binder/include/binder/RpcSession.h20
6 files changed, 56 insertions, 44 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index e3bf2a5e36..bff5543c9b 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -192,10 +192,10 @@ bool RpcServer::shutdown() {
}
mShutdownTrigger->trigger();
- while (mJoinThreadRunning || !mConnectingThreads.empty()) {
+ while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
- "%zu",
- mJoinThreadRunning, mConnectingThreads.size());
+ "%zu, Sessions: %zu",
+ mJoinThreadRunning, mConnectingThreads.size(), mSessions.size());
mShutdownCv.wait(_l);
}
@@ -278,7 +278,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
server->mSessionIdCounter++;
session = RpcSession::make();
- session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter);
+ session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter,
+ server->mShutdownTrigger);
server->mSessions[server->mSessionIdCounter] = session;
} else {
@@ -344,6 +345,11 @@ void RpcServer::onSessionTerminating(const sp<RpcSession>& session) {
(void)mSessions.erase(it);
}
+void RpcServer::onSessionThreadEnding(const sp<RpcSession>& session) {
+ (void)session;
+ mShutdownCv.notify_all();
+}
+
bool RpcServer::hasServer() {
LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
std::lock_guard<std::mutex> _l(mLock);
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 9f26a33335..7c458c123a 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -207,12 +207,19 @@ void RpcSession::join(unique_fd client) {
LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
"bad state: connection object guaranteed to be in list");
+ sp<RpcServer> server;
{
std::lock_guard<std::mutex> _l(mMutex);
auto it = mThreads.find(std::this_thread::get_id());
LOG_ALWAYS_FATAL_IF(it == mThreads.end());
it->second.detach();
mThreads.erase(it);
+
+ server = mForServer.promote();
+ }
+
+ if (server != nullptr) {
+ server->onSessionThreadEnding(sp<RpcSession>::fromExisting(this));
}
}
@@ -314,14 +321,25 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id)
void RpcSession::addClientConnection(unique_fd fd) {
std::lock_guard<std::mutex> _l(mMutex);
+
+ if (mShutdownTrigger == nullptr) {
+ mShutdownTrigger = FdTrigger::make();
+ }
+
sp<RpcConnection> session = sp<RpcConnection>::make();
session->fd = std::move(fd);
mClientConnections.push_back(session);
}
-void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId) {
+void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId,
+ const std::shared_ptr<FdTrigger>& shutdownTrigger) {
+ LOG_ALWAYS_FATAL_IF(mForServer.unsafe_get() != nullptr);
+ LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
+ LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
+
mId = sessionId;
mForServer = server;
+ mShutdownTrigger = shutdownTrigger;
}
sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 230de6f0ef..6483486340 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -229,30 +229,22 @@ bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void*
return true;
}
-bool RpcState::rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size) {
+bool RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what,
+ void* data, size_t size) {
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot rec %s at size %zu (too big)", what, size);
terminate();
return false;
}
- ssize_t recd = TEMP_FAILURE_RETRY(recv(fd.get(), data, size, MSG_WAITALL | MSG_NOSIGNAL));
-
- if (recd < 0 || recd != static_cast<ssize_t>(size)) {
- terminate();
-
- if (recd == 0 && errno == 0) {
- LOG_RPC_DETAIL("No more data when trying to read %s on fd %d", what, fd.get());
- return false;
- }
-
- ALOGE("Failed to read %s (received %zd of %zu bytes) on fd %d, error: %s", what, recd, size,
- fd.get(), strerror(errno));
+ if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
+ status != OK) {
+ ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
+ statusToString(status).c_str());
return false;
- } else {
- LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
}
+ LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
return true;
}
@@ -398,7 +390,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
Parcel* reply) {
RpcWireHeader command;
while (true) {
- if (!rpcRec(fd, "command header", &command, sizeof(command))) {
+ if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
return DEAD_OBJECT;
}
@@ -413,7 +405,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
return NO_MEMORY;
}
- if (!rpcRec(fd, "reply body", data.data(), command.bodySize)) {
+ if (!rpcRec(fd, session, "reply body", data.data(), command.bodySize)) {
return DEAD_OBJECT;
}
@@ -465,7 +457,7 @@ status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcS
LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
RpcWireHeader command;
- if (!rpcRec(fd, "command header", &command, sizeof(command))) {
+ if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
return DEAD_OBJECT;
}
@@ -493,7 +485,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
case RPC_COMMAND_TRANSACT:
return processTransact(fd, session, command);
case RPC_COMMAND_DEC_STRONG:
- return processDecStrong(fd, command);
+ return processDecStrong(fd, session, command);
}
// We should always know the version of the opposing side, and since the
@@ -513,7 +505,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio
if (!transactionData.valid()) {
return NO_MEMORY;
}
- if (!rpcRec(fd, "transaction body", transactionData.data(), transactionData.size())) {
+ if (!rpcRec(fd, session, "transaction body", transactionData.data(), transactionData.size())) {
return DEAD_OBJECT;
}
@@ -626,7 +618,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
//
// sessions associated with servers must have an ID
// (hence abort)
- int32_t id = session->getPrivateAccessorForId().get().value();
+ int32_t id = session->mId.value();
replyStatus = reply.writeInt32(id);
break;
}
@@ -721,14 +713,15 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
return OK;
}
-status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) {
+status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcWireHeader& command) {
LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);
CommandData commandData(command.bodySize);
if (!commandData.valid()) {
return NO_MEMORY;
}
- if (!rpcRec(fd, "dec ref body", commandData.data(), commandData.size())) {
+ if (!rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size())) {
return DEAD_OBJECT;
}
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 31f8a22065..f913925adb 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -117,7 +117,8 @@ private:
[[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data,
size_t size);
- [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size);
+ [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, void* data, size_t size);
[[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
Parcel* reply);
@@ -130,6 +131,7 @@ private:
const sp<RpcSession>& session,
CommandData transactionData);
[[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
+ const sp<RpcSession>& session,
const RpcWireHeader& command);
struct BinderNode {
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 50770f12d2..178459d2fe 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -150,6 +150,7 @@ public:
// internal use only
void onSessionTerminating(const sp<RpcSession>& session);
+ void onSessionThreadEnding(const sp<RpcSession>& session);
private:
friend sp<RpcServer>;
@@ -171,7 +172,7 @@ private:
wp<IBinder> mRootObjectWeak;
std::map<int32_t, sp<RpcSession>> mSessions;
int32_t mSessionIdCounter = 0;
- std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+ std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
std::condition_variable mShutdownCv;
};
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index d6b796f281..d46f27567c 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -94,27 +94,16 @@ public:
// internal only
const std::unique_ptr<RpcState>& state() { return mState; }
- class PrivateAccessorForId {
- private:
- friend class RpcSession;
- friend class RpcState;
- explicit PrivateAccessorForId(const RpcSession* session) : mSession(session) {}
-
- const std::optional<int32_t> get() { return mSession->mId; }
-
- const RpcSession* mSession;
- };
- PrivateAccessorForId getPrivateAccessorForId() const { return PrivateAccessorForId(this); }
-
private:
- friend PrivateAccessorForId;
friend sp<RpcSession>;
friend RpcServer;
+ friend RpcState;
RpcSession();
/** This is not a pipe. */
struct FdTrigger {
static std::unique_ptr<FdTrigger> make();
+
/**
* poll() on this fd for POLLHUP to get notification when trigger is called
*/
@@ -167,7 +156,8 @@ private:
bool setupSocketClient(const RpcSocketAddress& address);
bool setupOneSocketClient(const RpcSocketAddress& address, int32_t sessionId);
void addClientConnection(base::unique_fd fd);
- void setForServer(const wp<RpcServer>& server, int32_t sessionId);
+ void setForServer(const wp<RpcServer>& server, int32_t sessionId,
+ const std::shared_ptr<FdTrigger>& shutdownTrigger);
sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
bool removeServerConnection(const sp<RpcConnection>& connection);
@@ -218,6 +208,8 @@ private:
// TODO(b/183988761): this shouldn't be guessable
std::optional<int32_t> mId;
+ std::shared_ptr<FdTrigger> mShutdownTrigger;
+
std::unique_ptr<RpcState> mState;
std::mutex mMutex; // for all below