From 4e10feb3a1f3d2c4f61a4e81e664debbd8fb3551 Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Tue, 8 Jun 2021 01:17:01 +0000 Subject: libbinder: remove unused FdTrigger::readFd Every LOC is a burden on the mind - forgot to remove this when I stopped using it before. Bug: N/A Test: N/A Change-Id: I0035be2d339d30e0ad5d7fd879232b58bbf4fc3c --- libs/binder/include/binder/RpcSession.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index a6bc1a9cc7..59613c5218 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -128,11 +128,6 @@ private: /** Returns nullptr for error case */ static std::unique_ptr make(); - /** - * poll() on this fd for POLLHUP to get notification when trigger is called - */ - base::borrowed_fd readFd() const { return mRead; } - /** * Close the write end of the pipe so that the read end receives POLLHUP. */ -- cgit v1.2.3-59-g8ed1b From a8b4429c398a79e4454015cbb309514b97484451 Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Tue, 8 Jun 2021 01:27:53 +0000 Subject: libbinder: server sessions shut down independently Before this CL, for a server, there is a single pipe which when hungup will end every session with that server. This, as it turned out in hindsight to be problematic, since we want to shutdown problematic sessions without interrupting other clients. Assuming we keep the pipe-based interrupt, there are essential two solutions to consider here (this CL is option B). a. instead of hanging up these pipes, write information to them, and then wake up all relevant threads which can figure out the right thing to do. - pro: only need to create one FD - con: very complicated because there may be multiple readers of the pipe, etc... - con: will mess with all clients b. have a separate pipe per session - pro: relatively simple logic - con: an extra FD per session So, for now, going with (b). Bug: 183140903 Test: binderRpcTest, run biner_rpc_fuzzer for 5 minutes (I checked locally, w/o the RpcServer check for isTriggered, this also detected that deadlock in less than 1 minute! :D) Change-Id: I9e290cd0a6df1d33435183fb16f508f38071ad62 --- libs/binder/RpcServer.cpp | 19 ++++++++++++++----- libs/binder/RpcSession.cpp | 20 ++++++++++++++------ libs/binder/include/binder/RpcServer.h | 2 +- libs/binder/include/binder/RpcSession.h | 14 ++++++++++---- 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 2f378dadce..2d2eed2671 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -187,6 +187,11 @@ bool RpcServer::shutdown() { } mShutdownTrigger->trigger(); + for (auto& [id, session] : mSessions) { + (void)id; + session->mShutdownTrigger->trigger(); + } + while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " @@ -261,7 +266,7 @@ void RpcServer::establishConnection(sp&& server, base::unique_fd clie }; server->mConnectingThreads.erase(threadId); - if (!idValid) { + if (!idValid || server->mShutdownTrigger->isTriggered()) { return; } @@ -276,10 +281,14 @@ void RpcServer::establishConnection(sp&& server, base::unique_fd clie session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); - session->setForServer(server, - sp::fromExisting( - static_cast(server.get())), - server->mSessionIdCounter, server->mShutdownTrigger); + if (!session->setForServer(server, + sp::fromExisting( + static_cast( + server.get())), + server->mSessionIdCounter)) { + ALOGE("Failed to attach server to session"); + return; + } server->mSessions[server->mSessionIdCounter] = session; } else { diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index c5633771d0..93e04f7558 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -144,7 +144,10 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) { std::unique_ptr RpcSession::FdTrigger::make() { auto ret = std::make_unique(); - if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr; + if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { + ALOGE("Could not create pipe %s", strerror(errno)); + return nullptr; + } return ret; } @@ -152,6 +155,10 @@ void RpcSession::FdTrigger::trigger() { mWrite.reset(); } +bool RpcSession::FdTrigger::isTriggered() { + return mWrite == -1; +} + status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { while (true) { pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0}, @@ -408,20 +415,21 @@ bool RpcSession::addClientConnection(unique_fd fd) { return true; } -void RpcSession::setForServer(const wp& server, const wp& eventListener, - int32_t sessionId, - const std::shared_ptr& shutdownTrigger) { +bool RpcSession::setForServer(const wp& server, const wp& eventListener, + int32_t sessionId) { LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); LOG_ALWAYS_FATAL_IF(eventListener == nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); - LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); + + mShutdownTrigger = FdTrigger::make(); + if (mShutdownTrigger == nullptr) return false; mId = sessionId; mForServer = server; mEventListener = eventListener; - mShutdownTrigger = shutdownTrigger; + return true; } sp RpcSession::assignServerToThisThread(unique_fd fd) { diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index 98db2212f2..b88bf5091b 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -173,7 +173,7 @@ private: wp mRootObjectWeak; std::map> mSessions; int32_t mSessionIdCounter = 0; - std::shared_ptr mShutdownTrigger; + std::unique_ptr mShutdownTrigger; std::condition_variable mShutdownCv; }; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 59613c5218..2f2c77cbae 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -130,9 +130,15 @@ private: /** * Close the write end of the pipe so that the read end receives POLLHUP. + * Not threadsafe. */ void trigger(); + /** + * Whether this has been triggered. + */ + bool isTriggered(); + /** * Poll for a read event. * @@ -192,9 +198,9 @@ private: [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, bool server); [[nodiscard]] bool addClientConnection(base::unique_fd fd); - void setForServer(const wp& server, - const wp& eventListener, int32_t sessionId, - const std::shared_ptr& shutdownTrigger); + [[nodiscard]] bool setForServer(const wp& server, + const wp& eventListener, + int32_t sessionId); sp assignServerToThisThread(base::unique_fd fd); [[nodiscard]] bool removeServerConnection(const sp& connection); @@ -247,7 +253,7 @@ private: // TODO(b/183988761): this shouldn't be guessable std::optional mId; - std::shared_ptr mShutdownTrigger; + std::unique_ptr mShutdownTrigger; std::unique_ptr mState; -- cgit v1.2.3-59-g8ed1b From c9d7b53c6b7e94592818d2552d57f18ce2722c9b Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Fri, 4 Jun 2021 20:57:41 +0000 Subject: libbinder: RPC state termination shutdown session Previously, when state was terminated due to an error condition, it may be the case that a client would still be trying to read data from the session, but the connection would not be hung up (it would just be ignored). So, now hanging up the session connections in this case. Bug: 183140903 Test: binderRpcTest Change-Id: I8c281ad2af3889cc3570a8d3b7bf3def8c51ec79 --- libs/binder/RpcSession.cpp | 18 +++-- libs/binder/RpcState.cpp | 113 ++++++++++++++++++-------------- libs/binder/RpcState.h | 15 ++--- libs/binder/include/binder/RpcSession.h | 12 +++- libs/binder/tests/binderRpcTest.cpp | 2 +- 5 files changed, 91 insertions(+), 69 deletions(-) diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 93e04f7558..62118ffddc 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -113,17 +113,21 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { return state()->getMaxThreads(connection.fd(), sp::fromExisting(this), maxThreads); } -bool RpcSession::shutdown() { +bool RpcSession::shutdownAndWait(bool wait) { std::unique_lock _l(mMutex); - LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session"); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); - LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownTrigger->trigger(); - mShutdownListener->waitForShutdown(_l); - mState->terminate(); - LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + if (wait) { + LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); + mShutdownListener->waitForShutdown(_l); + LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + } + + _l.unlock(); + mState->clear(); + return true; } @@ -139,7 +143,7 @@ status_t RpcSession::transact(const sp& binder, uint32_t code, const Pa status_t RpcSession::sendDecStrong(const RpcAddress& address) { ExclusiveConnection connection(sp::fromExisting(this), ConnectionUse::CLIENT_REFCOUNT); - return state()->sendDecStrong(connection.fd(), address); + return state()->sendDecStrong(connection.fd(), sp::fromExisting(this), address); } std::unique_ptr RpcSession::FdTrigger::make() { diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 93f15294df..7e731f3426 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -137,9 +137,39 @@ void RpcState::dump() { dumpLocked(); } -void RpcState::terminate() { +void RpcState::clear() { std::unique_lock _l(mNodeMutex); - terminate(_l); + + if (mTerminated) { + LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(), + "New state should be impossible after terminating!"); + return; + } + + if (SHOULD_LOG_RPC_DETAIL) { + ALOGE("RpcState::clear()"); + dumpLocked(); + } + + // if the destructor of a binder object makes another RPC call, then calling + // decStrong could deadlock. So, we must hold onto these binders until + // mNodeMutex is no longer taken. + std::vector> tempHoldBinder; + + mTerminated = true; + for (auto& [address, node] : mNodeForAddress) { + sp binder = node.binder.promote(); + LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get()); + + if (node.sentRef != nullptr) { + tempHoldBinder.push_back(node.sentRef); + } + } + + mNodeForAddress.clear(); + + _l.unlock(); + tempHoldBinder.clear(); // explicit } void RpcState::dumpLocked() { @@ -170,32 +200,6 @@ void RpcState::dumpLocked() { ALOGE("END DUMP OF RpcState"); } -void RpcState::terminate(std::unique_lock& lock) { - if (SHOULD_LOG_RPC_DETAIL) { - ALOGE("RpcState::terminate()"); - dumpLocked(); - } - - // if the destructor of a binder object makes another RPC call, then calling - // decStrong could deadlock. So, we must hold onto these binders until - // mNodeMutex is no longer taken. - std::vector> tempHoldBinder; - - mTerminated = true; - for (auto& [address, node] : mNodeForAddress) { - sp binder = node.binder.promote(); - LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get()); - - if (node.sentRef != nullptr) { - tempHoldBinder.push_back(node.sentRef); - } - } - - mNodeForAddress.clear(); - - lock.unlock(); - tempHoldBinder.clear(); // explicit -} RpcState::CommandData::CommandData(size_t size) : mSize(size) { // The maximum size for regular binder is 1MB for all concurrent @@ -218,13 +222,13 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { mData.reset(new (std::nothrow) uint8_t[size]); } -status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, - size_t size) { +status_t RpcState::rpcSend(const base::unique_fd& fd, const sp& session, + const char* what, const void* data, size_t size) { LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); if (size > std::numeric_limits::max()) { ALOGE("Cannot send %s at size %zu (too big)", what, size); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -235,7 +239,7 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const vo LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, size, fd.get(), strerror(savedErrno)); - terminate(); + (void)session->shutdownAndWait(false); return -savedErrno; } @@ -246,7 +250,7 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp& sessi const char* what, void* data, size_t size) { if (size > std::numeric_limits::max()) { ALOGE("Cannot rec %s at size %zu (too big)", what, size); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -358,7 +362,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& if (flags & IBinder::FLAG_ONEWAY) { asyncNumber = it->second.asyncNumber; - if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT; + if (!nodeProgressAsyncNumber(&it->second)) { + _l.unlock(); + (void)session->shutdownAndWait(false); + return DEAD_OBJECT; + } } } @@ -390,7 +398,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& data.dataSize()); if (status_t status = - rpcSend(fd, "transaction", transactionData.data(), transactionData.size()); + rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size()); status != OK) return status; @@ -442,7 +450,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp& if (command.bodySize < sizeof(RpcWireReply)) { ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!", sizeof(RpcWireReply), command.bodySize); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireReply* rpcReply = reinterpret_cast(data.data()); @@ -457,7 +465,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp& return OK; } -status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) { +status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp& session, + const RpcAddress& addr) { { std::lock_guard _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races @@ -476,10 +485,10 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad .command = RPC_COMMAND_DEC_STRONG, .bodySize = sizeof(RpcWireAddress), }; - if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK) + if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK) return status; - if (status_t status = - rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress)); + if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(), + sizeof(RpcWireAddress)); status != OK) return status; return OK; @@ -538,7 +547,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const spshutdownAndWait(false); return DEAD_OBJECT; } status_t RpcState::processTransact(const base::unique_fd& fd, const sp& session, @@ -571,7 +580,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const spshutdownAndWait(false); return BAD_VALUE; } RpcWireTransaction* transaction = reinterpret_cast(transactionData.data()); @@ -600,12 +609,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const spshutdownAndWait(false); replyStatus = BAD_VALUE; } else if (target->localBinder() == nullptr) { ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!", addr.toString().c_str()); - terminate(); + (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; } else if (transaction->flags & IBinder::FLAG_ONEWAY) { std::lock_guard _l(mNodeMutex); @@ -707,7 +716,11 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const spsecond, _l)) return DEAD_OBJECT; + if (!nodeProgressAsyncNumber(&it->second)) { + _l.unlock(); + (void)session->shutdownAndWait(false); + return DEAD_OBJECT; + } if (it->second.asyncTodo.size() == 0) return OK; if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) { @@ -753,7 +766,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp& session, @@ -772,7 +785,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const spshutdownAndWait(false); return BAD_VALUE; } RpcWireAddress* address = reinterpret_cast(commandData.data()); @@ -790,7 +803,8 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const spshutdownAndWait(false); return BAD_VALUE; } @@ -826,12 +840,11 @@ sp RpcState::tryEraseNode(std::map::iterator& i return ref; } -bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock& lock) { +bool RpcState::nodeProgressAsyncNumber(BinderNode* node) { // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to // a single binder if (node->asyncNumber >= std::numeric_limitsasyncNumber)>::max()) { ALOGE("Out of async transaction IDs. Terminating"); - terminate(lock); return false; } node->asyncNumber++; diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index 81ff458ba7..13c31154eb 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -65,7 +65,8 @@ public: uint32_t code, const Parcel& data, const sp& session, Parcel* reply, uint32_t flags); - [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address); + [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp& session, + const RpcAddress& address); enum class CommandType { ANY, @@ -110,11 +111,10 @@ public: * WARNING: RpcState is responsible for calling this when the session is * no longer recoverable. */ - void terminate(); + void clear(); private: void dumpLocked(); - void terminate(std::unique_lock& lock); // Alternative to std::vector that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. @@ -130,8 +130,8 @@ private: size_t mSize; }; - [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data, - size_t size); + [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp& session, + const char* what, const void* data, size_t size); [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp& session, const char* what, void* data, size_t size); @@ -204,9 +204,8 @@ private: // dropped after any locks are removed. sp tryEraseNode(std::map::iterator& it); // true - success - // false - state terminated, lock gone, halt - [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node, - std::unique_lock& lock); + // false - session shutdown, halt + [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node); std::mutex mNodeMutex; bool mTerminated = false; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 2f2c77cbae..7aa6d021e7 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -97,14 +97,20 @@ public: status_t getRemoteMaxThreads(size_t* maxThreads); /** - * Shuts down the service. Only works for client sessions (server-side - * sessions currently only support shutting down the entire server). + * Shuts down the service. + * + * For client sessions, wait can be true or false. For server sessions, + * waiting is not currently supported (will abort). * * Warning: this is currently not active/nice (the server isn't told we're * shutting down). Being nicer to the server could potentially make it * reclaim resources faster. + * + * If this is called w/ 'wait' true, then this will wait for shutdown to + * complete before returning. This will hang if it is called from the + * session threadpool (when processing received calls). */ - [[nodiscard]] bool shutdown(); + [[nodiscard]] bool shutdownAndWait(bool wait); [[nodiscard]] status_t transact(const sp& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags); diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index 0a970fb5cf..c84666047d 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -966,7 +966,7 @@ TEST_P(BinderRpc, Callbacks) { // since this session has a reverse connection w/ a threadpool, we // need to manually shut it down - EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown()); + EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); proc.expectAlreadyShutdown = true; } -- cgit v1.2.3-59-g8ed1b From d45be62a2bdd204d5d728aaa56653ea5e072cd9c Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Fri, 4 Jun 2021 02:19:37 +0000 Subject: libbinder: RPC limit on oneway transactions Artificial large limits on oneway transactions. Warn after 1000 pending transactions and invalidate the connection after 10000. This is a pretty arbitrary limit, approximating the order of magnitude of average sized binder transactions you can have before filling up the binder buffer (I originally did this analysis for HIDL passthrough mode, which uses 3000). However, I've raised the limit because in this case, we must actually terminate the RPC connection. If this happens, it's already quite a bad situation. Best we can hope for is to reset everything up. Unlike kernel binder, we can't return an error when the queue fills up, since we're processing this on the other side of the transaction. We must also do this type of analysis on the server side, regardless of (potentially better) logic in the client side, so that this code also guards against broken clients. Fixes: 183140903 Test: binderRpcTest Change-Id: Ibe1a854b526a7f862776df36d555cb346f01fde7 --- libs/binder/RpcState.cpp | 29 +++++++++++++++++++++++------ libs/binder/tests/binderRpcTest.cpp | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 7e731f3426..3113841d27 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -617,7 +617,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const spshutdownAndWait(false); replyStatus = BAD_VALUE; } else if (transaction->flags & IBinder::FLAG_ONEWAY) { - std::lock_guard _l(mNodeMutex); + std::unique_lock _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it->second.binder.promote() != target) { ALOGE("Binder became invalid during transaction. Bad client? %s", @@ -626,16 +626,33 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const spasyncNumber != it->second.asyncNumber) { // we need to process some other asynchronous transaction // first - // TODO(b/183140903): limit enqueues/detect overfill for bad client - // TODO(b/183140903): detect when an object is deleted when it still has - // pending async transactions it->second.asyncTodo.push(BinderNode::AsyncTodo{ .ref = target, .data = std::move(transactionData), .asyncNumber = transaction->asyncNumber, }); - LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber, - addr.toString().c_str()); + + size_t numPending = it->second.asyncTodo.size(); + LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)", + transaction->asyncNumber, addr.toString().c_str(), numPending); + + constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000; + constexpr size_t kArbitraryOnewayCallWarnLevel = 1000; + constexpr size_t kArbitraryOnewayCallWarnPer = 1000; + + if (numPending >= kArbitraryOnewayCallWarnLevel) { + if (numPending >= kArbitraryOnewayCallTerminateLevel) { + ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending); + _l.unlock(); + (void)session->shutdownAndWait(false); + return FAILED_TRANSACTION; + } + + if (numPending % kArbitraryOnewayCallWarnPer == 0) { + ALOGW("Warning: many oneway transactions built up on %p (%zu)", + target.get(), numPending); + } + } return OK; } } diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index c84666047d..82f8a3e273 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -941,6 +941,40 @@ TEST_P(BinderRpc, OnewayCallQueueing) { for (auto& t : threads) t.join(); } +TEST_P(BinderRpc, OnewayCallExhaustion) { + constexpr size_t kNumClients = 2; + constexpr size_t kTooLongMs = 1000; + + auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/); + + // Build up oneway calls on the second session to make sure it terminates + // and shuts down. The first session should be unaffected (proc destructor + // checks the first session). + auto iface = interface_cast(proc.proc.sessions.at(1).root); + + std::vector threads; + for (size_t i = 0; i < kNumClients; i++) { + // one of these threads will get stuck queueing a transaction once the + // socket fills up, the other will be able to fill up transactions on + // this object + threads.push_back(std::thread([&] { + while (iface->sleepMsAsync(kTooLongMs).isOk()) { + } + })); + } + for (auto& t : threads) t.join(); + + Status status = iface->sleepMsAsync(kTooLongMs); + EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status; + + // the second session should be shutdown in the other process by the time we + // are able to join above (it'll only be hung up once it finishes processing + // any pending commands). We need to erase this session from the record + // here, so that the destructor for our session won't check that this + // session is valid, but we still want it to test the other session. + proc.proc.sessions.erase(proc.proc.sessions.begin() + 1); +} + TEST_P(BinderRpc, Callbacks) { const static std::string kTestString = "good afternoon!"; -- cgit v1.2.3-59-g8ed1b