summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libs/binder/RpcSession.cpp18
-rw-r--r--libs/binder/RpcState.cpp113
-rw-r--r--libs/binder/RpcState.h15
-rw-r--r--libs/binder/include/binder/RpcSession.h12
-rw-r--r--libs/binder/tests/binderRpcTest.cpp2
5 files changed, 91 insertions, 69 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 93e04f7558..62118ffddc 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -113,17 +113,21 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
-bool RpcSession::shutdown() {
+bool RpcSession::shutdownAndWait(bool wait) {
std::unique_lock<std::mutex> _l(mMutex);
- LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
- LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
mShutdownTrigger->trigger();
- mShutdownListener->waitForShutdown(_l);
- mState->terminate();
- LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+ if (wait) {
+ LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
+ mShutdownListener->waitForShutdown(_l);
+ LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+ }
+
+ _l.unlock();
+ mState->clear();
+
return true;
}
@@ -139,7 +143,7 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa
status_t RpcSession::sendDecStrong(const RpcAddress& address) {
ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT_REFCOUNT);
- return state()->sendDecStrong(connection.fd(), address);
+ return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
}
std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 93f15294df..7e731f3426 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -137,9 +137,39 @@ void RpcState::dump() {
dumpLocked();
}
-void RpcState::terminate() {
+void RpcState::clear() {
std::unique_lock<std::mutex> _l(mNodeMutex);
- terminate(_l);
+
+ if (mTerminated) {
+ LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
+ "New state should be impossible after terminating!");
+ return;
+ }
+
+ if (SHOULD_LOG_RPC_DETAIL) {
+ ALOGE("RpcState::clear()");
+ dumpLocked();
+ }
+
+ // if the destructor of a binder object makes another RPC call, then calling
+ // decStrong could deadlock. So, we must hold onto these binders until
+ // mNodeMutex is no longer taken.
+ std::vector<sp<IBinder>> tempHoldBinder;
+
+ mTerminated = true;
+ for (auto& [address, node] : mNodeForAddress) {
+ sp<IBinder> binder = node.binder.promote();
+ LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
+
+ if (node.sentRef != nullptr) {
+ tempHoldBinder.push_back(node.sentRef);
+ }
+ }
+
+ mNodeForAddress.clear();
+
+ _l.unlock();
+ tempHoldBinder.clear(); // explicit
}
void RpcState::dumpLocked() {
@@ -170,32 +200,6 @@ void RpcState::dumpLocked() {
ALOGE("END DUMP OF RpcState");
}
-void RpcState::terminate(std::unique_lock<std::mutex>& lock) {
- if (SHOULD_LOG_RPC_DETAIL) {
- ALOGE("RpcState::terminate()");
- dumpLocked();
- }
-
- // if the destructor of a binder object makes another RPC call, then calling
- // decStrong could deadlock. So, we must hold onto these binders until
- // mNodeMutex is no longer taken.
- std::vector<sp<IBinder>> tempHoldBinder;
-
- mTerminated = true;
- for (auto& [address, node] : mNodeForAddress) {
- sp<IBinder> binder = node.binder.promote();
- LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
-
- if (node.sentRef != nullptr) {
- tempHoldBinder.push_back(node.sentRef);
- }
- }
-
- mNodeForAddress.clear();
-
- lock.unlock();
- tempHoldBinder.clear(); // explicit
-}
RpcState::CommandData::CommandData(size_t size) : mSize(size) {
// The maximum size for regular binder is 1MB for all concurrent
@@ -218,13 +222,13 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
mData.reset(new (std::nothrow) uint8_t[size]);
}
-status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size) {
+status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, const void* data, size_t size) {
LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot send %s at size %zu (too big)", what, size);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -235,7 +239,7 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const vo
LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
size, fd.get(), strerror(savedErrno));
- terminate();
+ (void)session->shutdownAndWait(false);
return -savedErrno;
}
@@ -246,7 +250,7 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi
const char* what, void* data, size_t size) {
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot rec %s at size %zu (too big)", what, size);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -358,7 +362,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&
if (flags & IBinder::FLAG_ONEWAY) {
asyncNumber = it->second.asyncNumber;
- if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+ if (!nodeProgressAsyncNumber(&it->second)) {
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return DEAD_OBJECT;
+ }
}
}
@@ -390,7 +398,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&
data.dataSize());
if (status_t status =
- rpcSend(fd, "transaction", transactionData.data(), transactionData.size());
+ rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
status != OK)
return status;
@@ -442,7 +450,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
if (command.bodySize < sizeof(RpcWireReply)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
sizeof(RpcWireReply), command.bodySize);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
@@ -457,7 +465,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
return OK;
}
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcAddress& addr) {
{
std::lock_guard<std::mutex> _l(mNodeMutex);
if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -476,10 +485,10 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad
.command = RPC_COMMAND_DEC_STRONG,
.bodySize = sizeof(RpcWireAddress),
};
- if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+ if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
return status;
- if (status_t status =
- rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
+ if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+ sizeof(RpcWireAddress));
status != OK)
return status;
return OK;
@@ -538,7 +547,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
// also can't consider it a fatal error because this would allow any client
// to kill us, so ending the session for misbehaving client.
ALOGE("Unknown RPC command %d - terminating session", command.command);
- terminate();
+ (void)session->shutdownAndWait(false);
return DEAD_OBJECT;
}
status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -571,7 +580,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
if (transactionData.size() < sizeof(RpcWireTransaction)) {
ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
sizeof(RpcWireTransaction), transactionData.size());
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
@@ -600,12 +609,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
// session.
ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ (void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
} else if (target->localBinder() == nullptr) {
ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ (void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
} else if (transaction->flags & IBinder::FLAG_ONEWAY) {
std::lock_guard<std::mutex> _l(mNodeMutex);
@@ -707,7 +716,11 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
// last refcount dropped after this transaction happened
if (it == mNodeForAddress.end()) return OK;
- if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+ if (!nodeProgressAsyncNumber(&it->second)) {
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
+ return DEAD_OBJECT;
+ }
if (it->second.asyncTodo.size() == 0) return OK;
if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
@@ -753,7 +766,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
reply.dataSize());
- return rpcSend(fd, "reply", replyData.data(), replyData.size());
+ return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
}
status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -772,7 +785,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
if (command.bodySize < sizeof(RpcWireAddress)) {
ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
sizeof(RpcWireAddress), command.bodySize);
- terminate();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
@@ -790,7 +803,8 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
if (target == nullptr) {
ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
addr.toString().c_str());
- terminate();
+ _l.unlock();
+ (void)session->shutdownAndWait(false);
return BAD_VALUE;
}
@@ -826,12 +840,11 @@ sp<IBinder> RpcState::tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& i
return ref;
}
-bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) {
+bool RpcState::nodeProgressAsyncNumber(BinderNode* node) {
// 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
// a single binder
if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
ALOGE("Out of async transaction IDs. Terminating");
- terminate(lock);
return false;
}
node->asyncNumber++;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 81ff458ba7..13c31154eb 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -65,7 +65,8 @@ public:
uint32_t code, const Parcel& data,
const sp<RpcSession>& session, Parcel* reply,
uint32_t flags);
- [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+ [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const RpcAddress& address);
enum class CommandType {
ANY,
@@ -110,11 +111,10 @@ public:
* WARNING: RpcState is responsible for calling this when the session is
* no longer recoverable.
*/
- void terminate();
+ void clear();
private:
void dumpLocked();
- void terminate(std::unique_lock<std::mutex>& lock);
// Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
// large allocations to avoid being requested from allocating too much data.
@@ -130,8 +130,8 @@ private:
size_t mSize;
};
- [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
- size_t size);
+ [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+ const char* what, const void* data, size_t size);
[[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
const char* what, void* data, size_t size);
@@ -204,9 +204,8 @@ private:
// dropped after any locks are removed.
sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
// true - success
- // false - state terminated, lock gone, halt
- [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node,
- std::unique_lock<std::mutex>& lock);
+ // false - session shutdown, halt
+ [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
std::mutex mNodeMutex;
bool mTerminated = false;
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 2f2c77cbae..7aa6d021e7 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -97,14 +97,20 @@ public:
status_t getRemoteMaxThreads(size_t* maxThreads);
/**
- * Shuts down the service. Only works for client sessions (server-side
- * sessions currently only support shutting down the entire server).
+ * Shuts down the service.
+ *
+ * For client sessions, wait can be true or false. For server sessions,
+ * waiting is not currently supported (will abort).
*
* Warning: this is currently not active/nice (the server isn't told we're
* shutting down). Being nicer to the server could potentially make it
* reclaim resources faster.
+ *
+ * If this is called w/ 'wait' true, then this will wait for shutdown to
+ * complete before returning. This will hang if it is called from the
+ * session threadpool (when processing received calls).
*/
- [[nodiscard]] bool shutdown();
+ [[nodiscard]] bool shutdownAndWait(bool wait);
[[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags);
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 0a970fb5cf..c84666047d 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -966,7 +966,7 @@ TEST_P(BinderRpc, Callbacks) {
// since this session has a reverse connection w/ a threadpool, we
// need to manually shut it down
- EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
proc.expectAlreadyShutdown = true;
}