diff options
| author | 2021-06-08 16:56:44 +0000 | |
|---|---|---|
| committer | 2021-06-08 16:56:44 +0000 | |
| commit | 2c1d00130ad709dfe5cf1bfc612225e755d18dcf (patch) | |
| tree | 27aed362657a88e52b036225609fad56aded4f5f | |
| parent | 5f0b1f59dd7257aca882fbf678ce3ec40e74d376 (diff) | |
| parent | 1b48b713a208d35308b6f990334444fd9b7febe0 (diff) | |
Merge changes Ibe1a854b,I8c281ad2,I9e290cd0,I0035be2d am: 1b48b713a2
Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1727277
Change-Id: Ia3bc2c46ed5a9ae50523a17855f1357229968fd8
| -rw-r--r-- | libs/binder/RpcServer.cpp | 19 | ||||
| -rw-r--r-- | libs/binder/RpcSession.cpp | 38 | ||||
| -rw-r--r-- | libs/binder/RpcState.cpp | 142 | ||||
| -rw-r--r-- | libs/binder/RpcState.h | 15 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcServer.h | 2 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcSession.h | 29 | ||||
| -rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 36 |
7 files changed, 186 insertions, 95 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 2f378dadce..2d2eed2671 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -187,6 +187,11 @@ bool RpcServer::shutdown() { } mShutdownTrigger->trigger(); + for (auto& [id, session] : mSessions) { + (void)id; + session->mShutdownTrigger->trigger(); + } + while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " @@ -261,7 +266,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; server->mConnectingThreads.erase(threadId); - if (!idValid) { + if (!idValid || server->mShutdownTrigger->isTriggered()) { return; } @@ -276,10 +281,14 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); - session->setForServer(server, - sp<RpcServer::EventListener>::fromExisting( - static_cast<RpcServer::EventListener*>(server.get())), - server->mSessionIdCounter, server->mShutdownTrigger); + if (!session->setForServer(server, + sp<RpcServer::EventListener>::fromExisting( + static_cast<RpcServer::EventListener*>( + server.get())), + server->mSessionIdCounter)) { + ALOGE("Failed to attach server to session"); + return; + } server->mSessions[server->mSessionIdCounter] = session; } else { diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index c5633771d0..62118ffddc 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -113,17 +113,21 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); } -bool RpcSession::shutdown() { +bool RpcSession::shutdownAndWait(bool wait) { std::unique_lock<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session"); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); - LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownTrigger->trigger(); - mShutdownListener->waitForShutdown(_l); - mState->terminate(); - LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + if (wait) { + LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); + mShutdownListener->waitForShutdown(_l); + LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + } + + _l.unlock(); + mState->clear(); + return true; } @@ -139,12 +143,15 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa status_t RpcSession::sendDecStrong(const RpcAddress& address) { ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT_REFCOUNT); - return state()->sendDecStrong(connection.fd(), address); + return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address); } std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() { auto ret = std::make_unique<RpcSession::FdTrigger>(); - if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr; + if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { + ALOGE("Could not create pipe %s", strerror(errno)); + return nullptr; + } return ret; } @@ -152,6 +159,10 @@ void RpcSession::FdTrigger::trigger() { mWrite.reset(); } +bool RpcSession::FdTrigger::isTriggered() { + return mWrite == -1; +} + status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { while (true) { pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0}, @@ -408,20 +419,21 @@ bool RpcSession::addClientConnection(unique_fd fd) { return true; } -void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, - int32_t sessionId, - const std::shared_ptr<FdTrigger>& shutdownTrigger) { +bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, + int32_t sessionId) { LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); LOG_ALWAYS_FATAL_IF(eventListener == nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); - LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); + + mShutdownTrigger = FdTrigger::make(); + if (mShutdownTrigger == nullptr) return false; mId = sessionId; mForServer = server; mEventListener = eventListener; - mShutdownTrigger = shutdownTrigger; + return true; } sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) { diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 93f15294df..3113841d27 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -137,9 +137,39 @@ void RpcState::dump() { dumpLocked(); } -void RpcState::terminate() { +void RpcState::clear() { std::unique_lock<std::mutex> _l(mNodeMutex); - terminate(_l); + + if (mTerminated) { + LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(), + "New state should be impossible after terminating!"); + return; + } + + if (SHOULD_LOG_RPC_DETAIL) { + ALOGE("RpcState::clear()"); + dumpLocked(); + } + + // if the destructor of a binder object makes another RPC call, then calling + // decStrong could deadlock. So, we must hold onto these binders until + // mNodeMutex is no longer taken. + std::vector<sp<IBinder>> tempHoldBinder; + + mTerminated = true; + for (auto& [address, node] : mNodeForAddress) { + sp<IBinder> binder = node.binder.promote(); + LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get()); + + if (node.sentRef != nullptr) { + tempHoldBinder.push_back(node.sentRef); + } + } + + mNodeForAddress.clear(); + + _l.unlock(); + tempHoldBinder.clear(); // explicit } void RpcState::dumpLocked() { @@ -170,32 +200,6 @@ void RpcState::dumpLocked() { ALOGE("END DUMP OF RpcState"); } -void RpcState::terminate(std::unique_lock<std::mutex>& lock) { - if (SHOULD_LOG_RPC_DETAIL) { - ALOGE("RpcState::terminate()"); - dumpLocked(); - } - - // if the destructor of a binder object makes another RPC call, then calling - // decStrong could deadlock. So, we must hold onto these binders until - // mNodeMutex is no longer taken. - std::vector<sp<IBinder>> tempHoldBinder; - - mTerminated = true; - for (auto& [address, node] : mNodeForAddress) { - sp<IBinder> binder = node.binder.promote(); - LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get()); - - if (node.sentRef != nullptr) { - tempHoldBinder.push_back(node.sentRef); - } - } - - mNodeForAddress.clear(); - - lock.unlock(); - tempHoldBinder.clear(); // explicit -} RpcState::CommandData::CommandData(size_t size) : mSize(size) { // The maximum size for regular binder is 1MB for all concurrent @@ -218,13 +222,13 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { mData.reset(new (std::nothrow) uint8_t[size]); } -status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, - size_t size) { +status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session, + const char* what, const void* data, size_t size) { LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); if (size > std::numeric_limits<ssize_t>::max()) { ALOGE("Cannot send %s at size %zu (too big)", what, size); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -235,7 +239,7 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const vo LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, size, fd.get(), strerror(savedErrno)); - terminate(); + (void)session->shutdownAndWait(false); return -savedErrno; } @@ -246,7 +250,7 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi const char* what, void* data, size_t size) { if (size > std::numeric_limits<ssize_t>::max()) { ALOGE("Cannot rec %s at size %zu (too big)", what, size); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -358,7 +362,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& if (flags & IBinder::FLAG_ONEWAY) { asyncNumber = it->second.asyncNumber; - if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT; + if (!nodeProgressAsyncNumber(&it->second)) { + _l.unlock(); + (void)session->shutdownAndWait(false); + return DEAD_OBJECT; + } } } @@ -390,7 +398,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& data.dataSize()); if (status_t status = - rpcSend(fd, "transaction", transactionData.data(), transactionData.size()); + rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size()); status != OK) return status; @@ -442,7 +450,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (command.bodySize < sizeof(RpcWireReply)) { ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!", sizeof(RpcWireReply), command.bodySize); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data()); @@ -457,7 +465,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& return OK; } -status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) { +status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, + const RpcAddress& addr) { { std::lock_guard<std::mutex> _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races @@ -476,10 +485,10 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad .command = RPC_COMMAND_DEC_STRONG, .bodySize = sizeof(RpcWireAddress), }; - if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK) + if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK) return status; - if (status_t status = - rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress)); + if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(), + sizeof(RpcWireAddress)); status != OK) return status; return OK; @@ -538,7 +547,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS // also can't consider it a fatal error because this would allow any client // to kill us, so ending the session for misbehaving client. ALOGE("Unknown RPC command %d - terminating session", command.command); - terminate(); + (void)session->shutdownAndWait(false); return DEAD_OBJECT; } status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session, @@ -571,7 +580,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data()); @@ -600,15 +609,15 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // session. ALOGE("While transacting, binder has been deleted at address %s. Terminating!", addr.toString().c_str()); - terminate(); + (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; } else if (target->localBinder() == nullptr) { ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!", addr.toString().c_str()); - terminate(); + (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; } else if (transaction->flags & IBinder::FLAG_ONEWAY) { - std::lock_guard<std::mutex> _l(mNodeMutex); + std::unique_lock<std::mutex> _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it->second.binder.promote() != target) { ALOGE("Binder became invalid during transaction. Bad client? %s", @@ -617,16 +626,33 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R } else if (transaction->asyncNumber != it->second.asyncNumber) { // we need to process some other asynchronous transaction // first - // TODO(b/183140903): limit enqueues/detect overfill for bad client - // TODO(b/183140903): detect when an object is deleted when it still has - // pending async transactions it->second.asyncTodo.push(BinderNode::AsyncTodo{ .ref = target, .data = std::move(transactionData), .asyncNumber = transaction->asyncNumber, }); - LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber, - addr.toString().c_str()); + + size_t numPending = it->second.asyncTodo.size(); + LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)", + transaction->asyncNumber, addr.toString().c_str(), numPending); + + constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000; + constexpr size_t kArbitraryOnewayCallWarnLevel = 1000; + constexpr size_t kArbitraryOnewayCallWarnPer = 1000; + + if (numPending >= kArbitraryOnewayCallWarnLevel) { + if (numPending >= kArbitraryOnewayCallTerminateLevel) { + ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending); + _l.unlock(); + (void)session->shutdownAndWait(false); + return FAILED_TRANSACTION; + } + + if (numPending % kArbitraryOnewayCallWarnPer == 0) { + ALOGW("Warning: many oneway transactions built up on %p (%zu)", + target.get(), numPending); + } + } return OK; } } @@ -707,7 +733,11 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // last refcount dropped after this transaction happened if (it == mNodeForAddress.end()) return OK; - if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT; + if (!nodeProgressAsyncNumber(&it->second)) { + _l.unlock(); + (void)session->shutdownAndWait(false); + return DEAD_OBJECT; + } if (it->second.asyncTodo.size() == 0) return OK; if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) { @@ -753,7 +783,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(), reply.dataSize()); - return rpcSend(fd, "reply", replyData.data(), replyData.size()); + return rpcSend(fd, session, "reply", replyData.data(), replyData.size()); } status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, @@ -772,7 +802,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi if (command.bodySize < sizeof(RpcWireAddress)) { ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!", sizeof(RpcWireAddress), command.bodySize); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data()); @@ -790,7 +820,8 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi if (target == nullptr) { ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!", addr.toString().c_str()); - terminate(); + _l.unlock(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -826,12 +857,11 @@ sp<IBinder> RpcState::tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& i return ref; } -bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) { +bool RpcState::nodeProgressAsyncNumber(BinderNode* node) { // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to // a single binder if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) { ALOGE("Out of async transaction IDs. Terminating"); - terminate(lock); return false; } node->asyncNumber++; diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index 81ff458ba7..13c31154eb 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -65,7 +65,8 @@ public: uint32_t code, const Parcel& data, const sp<RpcSession>& session, Parcel* reply, uint32_t flags); - [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address); + [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, + const RpcAddress& address); enum class CommandType { ANY, @@ -110,11 +111,10 @@ public: * WARNING: RpcState is responsible for calling this when the session is * no longer recoverable. */ - void terminate(); + void clear(); private: void dumpLocked(); - void terminate(std::unique_lock<std::mutex>& lock); // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. @@ -130,8 +130,8 @@ private: size_t mSize; }; - [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data, - size_t size); + [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session, + const char* what, const void* data, size_t size); [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what, void* data, size_t size); @@ -204,9 +204,8 @@ private: // dropped after any locks are removed. sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it); // true - success - // false - state terminated, lock gone, halt - [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node, - std::unique_lock<std::mutex>& lock); + // false - session shutdown, halt + [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node); std::mutex mNodeMutex; bool mTerminated = false; diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index 98db2212f2..b88bf5091b 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -173,7 +173,7 @@ private: wp<IBinder> mRootObjectWeak; std::map<int32_t, sp<RpcSession>> mSessions; int32_t mSessionIdCounter = 0; - std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger; + std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger; std::condition_variable mShutdownCv; }; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index a6bc1a9cc7..7aa6d021e7 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -97,14 +97,20 @@ public: status_t getRemoteMaxThreads(size_t* maxThreads); /** - * Shuts down the service. Only works for client sessions (server-side - * sessions currently only support shutting down the entire server). + * Shuts down the service. + * + * For client sessions, wait can be true or false. For server sessions, + * waiting is not currently supported (will abort). * * Warning: this is currently not active/nice (the server isn't told we're * shutting down). Being nicer to the server could potentially make it * reclaim resources faster. + * + * If this is called w/ 'wait' true, then this will wait for shutdown to + * complete before returning. This will hang if it is called from the + * session threadpool (when processing received calls). */ - [[nodiscard]] bool shutdown(); + [[nodiscard]] bool shutdownAndWait(bool wait); [[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags); @@ -129,14 +135,15 @@ private: static std::unique_ptr<FdTrigger> make(); /** - * poll() on this fd for POLLHUP to get notification when trigger is called + * Close the write end of the pipe so that the read end receives POLLHUP. + * Not threadsafe. */ - base::borrowed_fd readFd() const { return mRead; } + void trigger(); /** - * Close the write end of the pipe so that the read end receives POLLHUP. + * Whether this has been triggered. */ - void trigger(); + bool isTriggered(); /** * Poll for a read event. @@ -197,9 +204,9 @@ private: [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, bool server); [[nodiscard]] bool addClientConnection(base::unique_fd fd); - void setForServer(const wp<RpcServer>& server, - const wp<RpcSession::EventListener>& eventListener, int32_t sessionId, - const std::shared_ptr<FdTrigger>& shutdownTrigger); + [[nodiscard]] bool setForServer(const wp<RpcServer>& server, + const wp<RpcSession::EventListener>& eventListener, + int32_t sessionId); sp<RpcConnection> assignServerToThisThread(base::unique_fd fd); [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection); @@ -252,7 +259,7 @@ private: // TODO(b/183988761): this shouldn't be guessable std::optional<int32_t> mId; - std::shared_ptr<FdTrigger> mShutdownTrigger; + std::unique_ptr<FdTrigger> mShutdownTrigger; std::unique_ptr<RpcState> mState; diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index 0a970fb5cf..82f8a3e273 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -941,6 +941,40 @@ TEST_P(BinderRpc, OnewayCallQueueing) { for (auto& t : threads) t.join(); } +TEST_P(BinderRpc, OnewayCallExhaustion) { + constexpr size_t kNumClients = 2; + constexpr size_t kTooLongMs = 1000; + + auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/); + + // Build up oneway calls on the second session to make sure it terminates + // and shuts down. The first session should be unaffected (proc destructor + // checks the first session). + auto iface = interface_cast<IBinderRpcTest>(proc.proc.sessions.at(1).root); + + std::vector<std::thread> threads; + for (size_t i = 0; i < kNumClients; i++) { + // one of these threads will get stuck queueing a transaction once the + // socket fills up, the other will be able to fill up transactions on + // this object + threads.push_back(std::thread([&] { + while (iface->sleepMsAsync(kTooLongMs).isOk()) { + } + })); + } + for (auto& t : threads) t.join(); + + Status status = iface->sleepMsAsync(kTooLongMs); + EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status; + + // the second session should be shutdown in the other process by the time we + // are able to join above (it'll only be hung up once it finishes processing + // any pending commands). We need to erase this session from the record + // here, so that the destructor for our session won't check that this + // session is valid, but we still want it to test the other session. + proc.proc.sessions.erase(proc.proc.sessions.begin() + 1); +} + TEST_P(BinderRpc, Callbacks) { const static std::string kTestString = "good afternoon!"; @@ -966,7 +1000,7 @@ TEST_P(BinderRpc, Callbacks) { // since this session has a reverse connection w/ a threadpool, we // need to manually shut it down - EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown()); + EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); proc.expectAlreadyShutdown = true; } |