diff options
-rw-r--r-- | cmds/installd/dexopt.cpp | 37 | ||||
-rw-r--r-- | cmds/installd/tests/installd_dexopt_test.cpp | 6 | ||||
-rw-r--r-- | libs/binder/Parcel.cpp | 1 | ||||
-rw-r--r-- | libs/binder/RpcServer.cpp | 19 | ||||
-rw-r--r-- | libs/binder/RpcSession.cpp | 44 | ||||
-rw-r--r-- | libs/binder/RpcState.cpp | 144 | ||||
-rw-r--r-- | libs/binder/RpcState.h | 15 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcServer.h | 4 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 34 | ||||
-rw-r--r-- | libs/binder/rust/src/lib.rs | 4 | ||||
-rw-r--r-- | libs/binder/rust/src/proxy.rs | 22 | ||||
-rw-r--r-- | libs/binder/rust/tests/integration.rs | 23 | ||||
-rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 36 |
13 files changed, 272 insertions, 117 deletions
diff --git a/cmds/installd/dexopt.cpp b/cmds/installd/dexopt.cpp index 204953cd07..cc0434d9e4 100644 --- a/cmds/installd/dexopt.cpp +++ b/cmds/installd/dexopt.cpp @@ -292,8 +292,8 @@ static void SetDex2OatScheduling(bool set_to_bg) { } } -static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t flags) { - unique_fd fd(TEMP_FAILURE_RETRY(open(profile.c_str(), flags, 0600))); +static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t flags, mode_t mode) { + unique_fd fd(TEMP_FAILURE_RETRY(open(profile.c_str(), flags, mode))); if (fd.get() < 0) { if (errno != EEXIST) { PLOG(ERROR) << "Failed to create profile " << profile; @@ -310,7 +310,7 @@ static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t f return fd; } -static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t flags) { +static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t flags, mode_t mode) { // Do not follow symlinks when opening a profile: // - primary profiles should not contain symlinks in their paths // - secondary dex paths should have been already resolved and validated @@ -320,7 +320,7 @@ static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t fla // Reference profiles and snapshots are created on the fly; so they might not exist beforehand. unique_fd fd; if ((flags & O_CREAT) != 0) { - fd = create_profile(uid, profile, flags); + fd = create_profile(uid, profile, flags, mode); } else { fd.reset(TEMP_FAILURE_RETRY(open(profile.c_str(), flags))); } @@ -336,6 +336,16 @@ static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t fla PLOG(ERROR) << "Failed to open profile " << profile; } return invalid_unique_fd(); + } else { + // If we just create the file we need to set its mode because on Android + // open has a mask that only allows owner access. + if ((flags & O_CREAT) != 0) { + if (fchmod(fd.get(), mode) != 0) { + PLOG(ERROR) << "Could not set mode " << std::hex << mode << std::dec + << " on profile" << profile; + // Not a terminal failure. + } + } } return fd; @@ -345,20 +355,29 @@ static unique_fd open_current_profile(uid_t uid, userid_t user, const std::strin const std::string& location, bool is_secondary_dex) { std::string profile = create_current_profile_path(user, package_name, location, is_secondary_dex); - return open_profile(uid, profile, O_RDONLY); + return open_profile(uid, profile, O_RDONLY, /*mode=*/ 0); } static unique_fd open_reference_profile(uid_t uid, const std::string& package_name, const std::string& location, bool read_write, bool is_secondary_dex) { std::string profile = create_reference_profile_path(package_name, location, is_secondary_dex); - return open_profile(uid, profile, read_write ? (O_CREAT | O_RDWR) : O_RDONLY); + return open_profile( + uid, + profile, + read_write ? (O_CREAT | O_RDWR) : O_RDONLY, + S_IRUSR | S_IWUSR | S_IRGRP); // so that ART can also read it when apps run. } static UniqueFile open_reference_profile_as_unique_file(uid_t uid, const std::string& package_name, const std::string& location, bool read_write, bool is_secondary_dex) { std::string profile_path = create_reference_profile_path(package_name, location, is_secondary_dex); - unique_fd ufd = open_profile(uid, profile_path, read_write ? (O_CREAT | O_RDWR) : O_RDONLY); + unique_fd ufd = open_profile( + uid, + profile_path, + read_write ? (O_CREAT | O_RDWR) : O_RDONLY, + S_IRUSR | S_IWUSR | S_IRGRP); // so that ART can also read it when apps run. + return UniqueFile(ufd.release(), profile_path, [](const std::string& path) { clear_profile(path); }); @@ -367,7 +386,7 @@ static UniqueFile open_reference_profile_as_unique_file(uid_t uid, const std::st static unique_fd open_spnashot_profile(uid_t uid, const std::string& package_name, const std::string& location) { std::string profile = create_snapshot_profile_path(package_name, location); - return open_profile(uid, profile, O_CREAT | O_RDWR | O_TRUNC); + return open_profile(uid, profile, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR); } static void open_profile_files(uid_t uid, const std::string& package_name, @@ -2484,7 +2503,7 @@ static bool create_boot_image_profile_snapshot(const std::string& package_name, for (size_t i = 0; i < profiles.size(); ) { std::vector<unique_fd> profiles_fd; for (size_t k = 0; k < kAggregationBatchSize && i < profiles.size(); k++, i++) { - unique_fd fd = open_profile(AID_SYSTEM, profiles[i], O_RDONLY); + unique_fd fd = open_profile(AID_SYSTEM, profiles[i], O_RDONLY, /*mode=*/ 0); if (fd.get() >= 0) { profiles_fd.push_back(std::move(fd)); } diff --git a/cmds/installd/tests/installd_dexopt_test.cpp b/cmds/installd/tests/installd_dexopt_test.cpp index e27202597c..216347e616 100644 --- a/cmds/installd/tests/installd_dexopt_test.cpp +++ b/cmds/installd/tests/installd_dexopt_test.cpp @@ -919,7 +919,7 @@ class ProfileTest : public DexoptTest { return; } - // Check that the snapshot was created witht he expected acess flags. + // Check that the snapshot was created with the expected access flags. CheckFileAccess(snap_profile_, kSystemUid, kSystemGid, 0600 | S_IFREG); // The snapshot should be equivalent to the merge of profiles. @@ -962,8 +962,8 @@ class ProfileTest : public DexoptTest { return; } - // Check that the snapshot was created witht he expected acess flags. - CheckFileAccess(ref_profile_, kTestAppUid, kTestAppUid, 0600 | S_IFREG); + // Check that the snapshot was created with the expected access flags. + CheckFileAccess(ref_profile_, kTestAppUid, kTestAppUid, 0640 | S_IFREG); // The snapshot should be equivalent to the merge of profiles. std::string ref_profile_content = ref_profile_ + ".expected"; diff --git a/libs/binder/Parcel.cpp b/libs/binder/Parcel.cpp index 232a70c894..ebba375a79 100644 --- a/libs/binder/Parcel.cpp +++ b/libs/binder/Parcel.cpp @@ -206,6 +206,7 @@ status_t Parcel::flattenBinder(const sp<IBinder>& binder) { status_t status = writeInt32(1); // non-null if (status != OK) return status; RpcAddress address = RpcAddress::zero(); + // TODO(b/167966510): need to undo this if the Parcel is not sent status = mSession->state()->onBinderLeaving(mSession, binder, &address); if (status != OK) return status; status = address.writeToParcel(this); diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 2f378dadce..2d2eed2671 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -187,6 +187,11 @@ bool RpcServer::shutdown() { } mShutdownTrigger->trigger(); + for (auto& [id, session] : mSessions) { + (void)id; + session->mShutdownTrigger->trigger(); + } + while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) { if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) { ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, " @@ -261,7 +266,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie }; server->mConnectingThreads.erase(threadId); - if (!idValid) { + if (!idValid || server->mShutdownTrigger->isTriggered()) { return; } @@ -276,10 +281,14 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); - session->setForServer(server, - sp<RpcServer::EventListener>::fromExisting( - static_cast<RpcServer::EventListener*>(server.get())), - server->mSessionIdCounter, server->mShutdownTrigger); + if (!session->setForServer(server, + sp<RpcServer::EventListener>::fromExisting( + static_cast<RpcServer::EventListener*>( + server.get())), + server->mSessionIdCounter)) { + ALOGE("Failed to attach server to session"); + return; + } server->mSessions[server->mSessionIdCounter] = session; } else { diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index c5633771d0..de9aa220f8 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -113,17 +113,21 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); } -bool RpcSession::shutdown() { +bool RpcSession::shutdownAndWait(bool wait) { std::unique_lock<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session"); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed"); - LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); mShutdownTrigger->trigger(); - mShutdownListener->waitForShutdown(_l); - mState->terminate(); - LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + if (wait) { + LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed"); + mShutdownListener->waitForShutdown(_l); + LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed"); + } + + _l.unlock(); + mState->clear(); + return true; } @@ -139,12 +143,15 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa status_t RpcSession::sendDecStrong(const RpcAddress& address) { ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT_REFCOUNT); - return state()->sendDecStrong(connection.fd(), address); + return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address); } std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() { auto ret = std::make_unique<RpcSession::FdTrigger>(); - if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr; + if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { + ALOGE("Could not create pipe %s", strerror(errno)); + return nullptr; + } return ret; } @@ -152,6 +159,10 @@ void RpcSession::FdTrigger::trigger() { mWrite.reset(); } +bool RpcSession::FdTrigger::isTriggered() { + return mWrite == -1; +} + status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { while (true) { pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0}, @@ -285,7 +296,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false; - // TODO(b/185167543): we should add additional sessions dynamically + // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once. // TODO(b/186470974): first risk of blocking size_t numThreadsAvailable; @@ -303,11 +314,11 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { - // TODO(b/185167543): shutdown existing connections? + // TODO(b/189955605): shutdown existing connections? if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false; } - // TODO(b/185167543): we should add additional sessions dynamically + // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once - the other side should be responsible for setting // up additional connections. We need to create at least one (unless 0 are // requested to be set) in order to allow the other side to reliably make @@ -408,20 +419,21 @@ bool RpcSession::addClientConnection(unique_fd fd) { return true; } -void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, - int32_t sessionId, - const std::shared_ptr<FdTrigger>& shutdownTrigger) { +bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, + int32_t sessionId) { LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); LOG_ALWAYS_FATAL_IF(eventListener == nullptr); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr); - LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr); + + mShutdownTrigger = FdTrigger::make(); + if (mShutdownTrigger == nullptr) return false; mId = sessionId; mForServer = server; mEventListener = eventListener; - mShutdownTrigger = shutdownTrigger; + return true; } sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) { diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 93f15294df..6899981e83 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -137,9 +137,39 @@ void RpcState::dump() { dumpLocked(); } -void RpcState::terminate() { +void RpcState::clear() { std::unique_lock<std::mutex> _l(mNodeMutex); - terminate(_l); + + if (mTerminated) { + LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(), + "New state should be impossible after terminating!"); + return; + } + + if (SHOULD_LOG_RPC_DETAIL) { + ALOGE("RpcState::clear()"); + dumpLocked(); + } + + // if the destructor of a binder object makes another RPC call, then calling + // decStrong could deadlock. So, we must hold onto these binders until + // mNodeMutex is no longer taken. + std::vector<sp<IBinder>> tempHoldBinder; + + mTerminated = true; + for (auto& [address, node] : mNodeForAddress) { + sp<IBinder> binder = node.binder.promote(); + LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get()); + + if (node.sentRef != nullptr) { + tempHoldBinder.push_back(node.sentRef); + } + } + + mNodeForAddress.clear(); + + _l.unlock(); + tempHoldBinder.clear(); // explicit } void RpcState::dumpLocked() { @@ -170,32 +200,6 @@ void RpcState::dumpLocked() { ALOGE("END DUMP OF RpcState"); } -void RpcState::terminate(std::unique_lock<std::mutex>& lock) { - if (SHOULD_LOG_RPC_DETAIL) { - ALOGE("RpcState::terminate()"); - dumpLocked(); - } - - // if the destructor of a binder object makes another RPC call, then calling - // decStrong could deadlock. So, we must hold onto these binders until - // mNodeMutex is no longer taken. - std::vector<sp<IBinder>> tempHoldBinder; - - mTerminated = true; - for (auto& [address, node] : mNodeForAddress) { - sp<IBinder> binder = node.binder.promote(); - LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get()); - - if (node.sentRef != nullptr) { - tempHoldBinder.push_back(node.sentRef); - } - } - - mNodeForAddress.clear(); - - lock.unlock(); - tempHoldBinder.clear(); // explicit -} RpcState::CommandData::CommandData(size_t size) : mSize(size) { // The maximum size for regular binder is 1MB for all concurrent @@ -218,13 +222,13 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { mData.reset(new (std::nothrow) uint8_t[size]); } -status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data, - size_t size) { +status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session, + const char* what, const void* data, size_t size) { LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); if (size > std::numeric_limits<ssize_t>::max()) { ALOGE("Cannot send %s at size %zu (too big)", what, size); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -235,7 +239,7 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const vo LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, size, fd.get(), strerror(savedErrno)); - terminate(); + (void)session->shutdownAndWait(false); return -savedErrno; } @@ -246,7 +250,7 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi const char* what, void* data, size_t size) { if (size > std::numeric_limits<ssize_t>::max()) { ALOGE("Cannot rec %s at size %zu (too big)", what, size); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -358,7 +362,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& if (flags & IBinder::FLAG_ONEWAY) { asyncNumber = it->second.asyncNumber; - if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT; + if (!nodeProgressAsyncNumber(&it->second)) { + _l.unlock(); + (void)session->shutdownAndWait(false); + return DEAD_OBJECT; + } } } @@ -390,8 +398,10 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& data.dataSize()); if (status_t status = - rpcSend(fd, "transaction", transactionData.data(), transactionData.size()); + rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size()); status != OK) + // TODO(b/167966510): need to undo onBinderLeaving - we know the + // refcount isn't successfully transferred. return status; if (flags & IBinder::FLAG_ONEWAY) { @@ -442,7 +452,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& if (command.bodySize < sizeof(RpcWireReply)) { ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!", sizeof(RpcWireReply), command.bodySize); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data()); @@ -457,7 +467,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& return OK; } -status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) { +status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, + const RpcAddress& addr) { { std::lock_guard<std::mutex> _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races @@ -476,10 +487,10 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad .command = RPC_COMMAND_DEC_STRONG, .bodySize = sizeof(RpcWireAddress), }; - if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK) + if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK) return status; - if (status_t status = - rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress)); + if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(), + sizeof(RpcWireAddress)); status != OK) return status; return OK; @@ -538,7 +549,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS // also can't consider it a fatal error because this would allow any client // to kill us, so ending the session for misbehaving client. ALOGE("Unknown RPC command %d - terminating session", command.command); - terminate(); + (void)session->shutdownAndWait(false); return DEAD_OBJECT; } status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session, @@ -571,7 +582,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data()); @@ -600,15 +611,15 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // session. ALOGE("While transacting, binder has been deleted at address %s. Terminating!", addr.toString().c_str()); - terminate(); + (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; } else if (target->localBinder() == nullptr) { ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!", addr.toString().c_str()); - terminate(); + (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; } else if (transaction->flags & IBinder::FLAG_ONEWAY) { - std::lock_guard<std::mutex> _l(mNodeMutex); + std::unique_lock<std::mutex> _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it->second.binder.promote() != target) { ALOGE("Binder became invalid during transaction. Bad client? %s", @@ -617,16 +628,33 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R } else if (transaction->asyncNumber != it->second.asyncNumber) { // we need to process some other asynchronous transaction // first - // TODO(b/183140903): limit enqueues/detect overfill for bad client - // TODO(b/183140903): detect when an object is deleted when it still has - // pending async transactions it->second.asyncTodo.push(BinderNode::AsyncTodo{ .ref = target, .data = std::move(transactionData), .asyncNumber = transaction->asyncNumber, }); - LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber, - addr.toString().c_str()); + + size_t numPending = it->second.asyncTodo.size(); + LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)", + transaction->asyncNumber, addr.toString().c_str(), numPending); + + constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000; + constexpr size_t kArbitraryOnewayCallWarnLevel = 1000; + constexpr size_t kArbitraryOnewayCallWarnPer = 1000; + + if (numPending >= kArbitraryOnewayCallWarnLevel) { + if (numPending >= kArbitraryOnewayCallTerminateLevel) { + ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending); + _l.unlock(); + (void)session->shutdownAndWait(false); + return FAILED_TRANSACTION; + } + + if (numPending % kArbitraryOnewayCallWarnPer == 0) { + ALOGW("Warning: many oneway transactions built up on %p (%zu)", + target.get(), numPending); + } + } return OK; } } @@ -707,7 +735,11 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // last refcount dropped after this transaction happened if (it == mNodeForAddress.end()) return OK; - if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT; + if (!nodeProgressAsyncNumber(&it->second)) { + _l.unlock(); + (void)session->shutdownAndWait(false); + return DEAD_OBJECT; + } if (it->second.asyncTodo.size() == 0) return OK; if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) { @@ -753,7 +785,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(), reply.dataSize()); - return rpcSend(fd, "reply", replyData.data(), replyData.size()); + return rpcSend(fd, session, "reply", replyData.data(), replyData.size()); } status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, @@ -772,7 +804,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi if (command.bodySize < sizeof(RpcWireAddress)) { ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!", sizeof(RpcWireAddress), command.bodySize); - terminate(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data()); @@ -790,7 +822,8 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi if (target == nullptr) { ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!", addr.toString().c_str()); - terminate(); + _l.unlock(); + (void)session->shutdownAndWait(false); return BAD_VALUE; } @@ -826,12 +859,11 @@ sp<IBinder> RpcState::tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& i return ref; } -bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) { +bool RpcState::nodeProgressAsyncNumber(BinderNode* node) { // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to // a single binder if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) { ALOGE("Out of async transaction IDs. Terminating"); - terminate(lock); return false; } node->asyncNumber++; diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index 81ff458ba7..13c31154eb 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -65,7 +65,8 @@ public: uint32_t code, const Parcel& data, const sp<RpcSession>& session, Parcel* reply, uint32_t flags); - [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address); + [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, + const RpcAddress& address); enum class CommandType { ANY, @@ -110,11 +111,10 @@ public: * WARNING: RpcState is responsible for calling this when the session is * no longer recoverable. */ - void terminate(); + void clear(); private: void dumpLocked(); - void terminate(std::unique_lock<std::mutex>& lock); // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps // large allocations to avoid being requested from allocating too much data. @@ -130,8 +130,8 @@ private: size_t mSize; }; - [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data, - size_t size); + [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session, + const char* what, const void* data, size_t size); [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what, void* data, size_t size); @@ -204,9 +204,8 @@ private: // dropped after any locks are removed. sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it); // true - success - // false - state terminated, lock gone, halt - [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node, - std::unique_lock<std::mutex>& lock); + // false - session shutdown, halt + [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node); std::mutex mNodeMutex; bool mTerminated = false; diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index 98db2212f2..4e6934b276 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -97,7 +97,7 @@ public: * * If this is not specified, this will be a single-threaded server. * - * TODO(b/185167543): these are currently created per client, but these + * TODO(b/167966510): these are currently created per client, but these * should be shared. */ void setMaxThreads(size_t threads); @@ -173,7 +173,7 @@ private: wp<IBinder> mRootObjectWeak; std::map<int32_t, sp<RpcSession>> mSessions; int32_t mSessionIdCounter = 0; - std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger; + std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger; std::condition_variable mShutdownCv; }; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index a6bc1a9cc7..4650cf21ca 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -54,7 +54,7 @@ public: * If this is called, 'shutdown' on this session must also be called. * Otherwise, a threadpool will leak. * - * TODO(b/185167543): start these dynamically + * TODO(b/189955605): start these dynamically */ void setMaxThreads(size_t threads); size_t getMaxThreads(); @@ -97,14 +97,20 @@ public: status_t getRemoteMaxThreads(size_t* maxThreads); /** - * Shuts down the service. Only works for client sessions (server-side - * sessions currently only support shutting down the entire server). + * Shuts down the service. + * + * For client sessions, wait can be true or false. For server sessions, + * waiting is not currently supported (will abort). * * Warning: this is currently not active/nice (the server isn't told we're * shutting down). Being nicer to the server could potentially make it * reclaim resources faster. + * + * If this is called w/ 'wait' true, then this will wait for shutdown to + * complete before returning. This will hang if it is called from the + * session threadpool (when processing received calls). */ - [[nodiscard]] bool shutdown(); + [[nodiscard]] bool shutdownAndWait(bool wait); [[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags); @@ -129,14 +135,15 @@ private: static std::unique_ptr<FdTrigger> make(); /** - * poll() on this fd for POLLHUP to get notification when trigger is called + * Close the write end of the pipe so that the read end receives POLLHUP. + * Not threadsafe. */ - base::borrowed_fd readFd() const { return mRead; } + void trigger(); /** - * Close the write end of the pipe so that the read end receives POLLHUP. + * Whether this has been triggered. */ - void trigger(); + bool isTriggered(); /** * Poll for a read event. @@ -197,9 +204,9 @@ private: [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, bool server); [[nodiscard]] bool addClientConnection(base::unique_fd fd); - void setForServer(const wp<RpcServer>& server, - const wp<RpcSession::EventListener>& eventListener, int32_t sessionId, - const std::shared_ptr<FdTrigger>& shutdownTrigger); + [[nodiscard]] bool setForServer(const wp<RpcServer>& server, + const wp<RpcSession::EventListener>& eventListener, + int32_t sessionId); sp<RpcConnection> assignServerToThisThread(base::unique_fd fd); [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection); @@ -252,7 +259,7 @@ private: // TODO(b/183988761): this shouldn't be guessable std::optional<int32_t> mId; - std::shared_ptr<FdTrigger> mShutdownTrigger; + std::unique_ptr<FdTrigger> mShutdownTrigger; std::unique_ptr<RpcState> mState; @@ -266,9 +273,6 @@ private: size_t mClientConnectionsOffset = 0; std::vector<sp<RpcConnection>> mClientConnections; std::vector<sp<RpcConnection>> mServerConnections; - - // TODO(b/185167543): allow sharing between different sessions in a - // process? (or combine with mServerConnections) std::map<std::thread::id, std::thread> mThreads; }; diff --git a/libs/binder/rust/src/lib.rs b/libs/binder/rust/src/lib.rs index 2694cba870..7c0584bade 100644 --- a/libs/binder/rust/src/lib.rs +++ b/libs/binder/rust/src/lib.rs @@ -115,14 +115,14 @@ pub use error::{status_t, ExceptionCode, Result, Status, StatusCode}; pub use native::add_service; pub use native::Binder; pub use parcel::Parcel; -pub use proxy::{get_interface, get_service}; +pub use proxy::{get_interface, get_service, wait_for_interface, wait_for_service}; pub use proxy::{AssociateClass, DeathRecipient, Proxy, SpIBinder, WpIBinder}; pub use state::{ProcessState, ThreadState}; /// The public API usable outside AIDL-generated interface crates. pub mod public_api { pub use super::parcel::ParcelFileDescriptor; - pub use super::{add_service, get_interface}; + pub use super::{add_service, get_interface, wait_for_interface}; pub use super::{ BinderFeatures, DeathRecipient, ExceptionCode, IBinder, Interface, ProcessState, SpIBinder, Status, StatusCode, Strong, ThreadState, Weak, WpIBinder, diff --git a/libs/binder/rust/src/proxy.rs b/libs/binder/rust/src/proxy.rs index 52036f5312..4a6d118f07 100644 --- a/libs/binder/rust/src/proxy.rs +++ b/libs/binder/rust/src/proxy.rs @@ -653,6 +653,18 @@ pub fn get_service(name: &str) -> Option<SpIBinder> { } } +/// Retrieve an existing service, or start it if it is configured as a dynamic +/// service and isn't yet started. +pub fn wait_for_service(name: &str) -> Option<SpIBinder> { + let name = CString::new(name).ok()?; + unsafe { + // Safety: `AServiceManager_waitforService` returns either a null + // pointer or a valid pointer to an owned `AIBinder`. Either of these + // values is safe to pass to `SpIBinder::from_raw`. + SpIBinder::from_raw(sys::AServiceManager_waitForService(name.as_ptr())) + } +} + /// Retrieve an existing service for a particular interface, blocking for a few /// seconds if it doesn't yet exist. pub fn get_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> { @@ -663,6 +675,16 @@ pub fn get_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> { } } +/// Retrieve an existing service for a particular interface, or start it if it +/// is configured as a dynamic service and isn't yet started. +pub fn wait_for_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> { + let service = wait_for_service(name); + match service { + Some(service) => FromIBinder::try_from(service), + None => Err(StatusCode::NAME_NOT_FOUND), + } +} + /// # Safety /// /// `SpIBinder` guarantees that `binder` always contains a valid pointer to an diff --git a/libs/binder/rust/tests/integration.rs b/libs/binder/rust/tests/integration.rs index 03320076cb..10b77f4840 100644 --- a/libs/binder/rust/tests/integration.rs +++ b/libs/binder/rust/tests/integration.rs @@ -274,6 +274,20 @@ mod tests { } #[test] + fn check_wait_for_service() { + let mut sm = + binder::wait_for_service("manager").expect("Did not get manager binder service"); + assert!(sm.is_binder_alive()); + assert!(sm.ping_binder().is_ok()); + + // The service manager service isn't an ITest, so this must fail. + assert_eq!( + binder::wait_for_interface::<dyn ITest>("manager").err(), + Some(StatusCode::BAD_TYPE) + ); + } + + #[test] fn trivial_client() { let service_name = "trivial_client_test"; let _process = ScopedServiceProcess::new(service_name); @@ -283,6 +297,15 @@ mod tests { } #[test] + fn wait_for_trivial_client() { + let service_name = "wait_for_trivial_client_test"; + let _process = ScopedServiceProcess::new(service_name); + let test_client: Strong<dyn ITest> = + binder::wait_for_interface(service_name).expect("Did not get manager binder service"); + assert_eq!(test_client.test().unwrap(), "wait_for_trivial_client_test"); + } + + #[test] fn get_selinux_context() { let service_name = "get_selinux_context"; let _process = ScopedServiceProcess::new(service_name); diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index 0a970fb5cf..82f8a3e273 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -941,6 +941,40 @@ TEST_P(BinderRpc, OnewayCallQueueing) { for (auto& t : threads) t.join(); } +TEST_P(BinderRpc, OnewayCallExhaustion) { + constexpr size_t kNumClients = 2; + constexpr size_t kTooLongMs = 1000; + + auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/); + + // Build up oneway calls on the second session to make sure it terminates + // and shuts down. The first session should be unaffected (proc destructor + // checks the first session). + auto iface = interface_cast<IBinderRpcTest>(proc.proc.sessions.at(1).root); + + std::vector<std::thread> threads; + for (size_t i = 0; i < kNumClients; i++) { + // one of these threads will get stuck queueing a transaction once the + // socket fills up, the other will be able to fill up transactions on + // this object + threads.push_back(std::thread([&] { + while (iface->sleepMsAsync(kTooLongMs).isOk()) { + } + })); + } + for (auto& t : threads) t.join(); + + Status status = iface->sleepMsAsync(kTooLongMs); + EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status; + + // the second session should be shutdown in the other process by the time we + // are able to join above (it'll only be hung up once it finishes processing + // any pending commands). We need to erase this session from the record + // here, so that the destructor for our session won't check that this + // session is valid, but we still want it to test the other session. + proc.proc.sessions.erase(proc.proc.sessions.begin() + 1); +} + TEST_P(BinderRpc, Callbacks) { const static std::string kTestString = "good afternoon!"; @@ -966,7 +1000,7 @@ TEST_P(BinderRpc, Callbacks) { // since this session has a reverse connection w/ a threadpool, we // need to manually shut it down - EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown()); + EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); proc.expectAlreadyShutdown = true; } |