From ada72bd2674ed8e04f6d8aa06803b058bad88560 Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Wed, 9 Jun 2021 23:29:13 +0000 Subject: libbinder: RPC process oneway w/ 'tail call' When draining oneway commands (which must be serialized), we do a recursive call to process a transaction. However, this wouldn't even be considered to be a tailcall because of the complex destructors which need to run. So, instead we work around this w/ goto to the beginning of the function. The alternative here (to a 'goto') to consider is creating a more complex return type to processTransactInternal which would convince processTransact to re-issue the command. Though, this would be a somewhat larger refactor. Fixes: 190638569 Test: binderRpcTest (OnewayStressTest repeatedly on device doesn't fail for several minutes - failed without this) Change-Id: I9fbc75941452348e498849d5d59130487ef6cc44 --- libs/binder/RpcState.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'libs/binder/RpcState.cpp') diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 6899981e83..62eb58adba 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -565,7 +565,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp& session, - CommandData transactionData, sp&& targetRef) { + CommandData transactionData) { + // for 'recursive' calls to this, we have already read and processed the + // binder from the transaction data and taken reference counts into account, + // so it is cached here. + sp targetRef; +processTransactInternalTailCall: + if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); @@ -751,13 +757,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp(it->second.asyncTodo.top()); - CommandData nextData = std::move(todo.data); - sp nextRef = std::move(todo.ref); + // reset up arguments + transactionData = std::move(todo.data); + targetRef = std::move(todo.ref); it->second.asyncTodo.pop(); - _l.unlock(); - return processTransactInternal(fd, session, std::move(nextData), - std::move(nextRef)); + goto processTransactInternalTailCall; } } return OK; -- cgit v1.2.3-59-g8ed1b From c88b7fccbbc449d4bd0e371ccf489df0eb401750 Mon Sep 17 00:00:00 2001 From: Steven Moreland Date: Thu, 10 Jun 2021 00:40:39 +0000 Subject: libbinder: RPC know when connections setup Previously, there was a race where: a. client creates connection to server b. client sends request for reverse connection to server (but this may still be traveling on the wire) c. client sends transaction to server d. server tries to make a callback e. server fails to make callback because no reverse connection is setup Now, when a new connection is setup, a header on this connection is setup. So, we can wait on this header to be received in (b). Note: currently, (e) results in an abort, this is tracked in b/167966510 with a TODO in the ExclusiveConnection code. This would make a less obvious flake (or perhaps the problem would be ignored), but this race still needs to be fixed for well-behaved clients to be able to function reliably. Fixes: 190639665 Test: binderRpcTest (callback test 10,000s of times) Change-Id: I13bc912692d63ea73d46c5441fa7d51121df2f58 --- libs/binder/RpcServer.cpp | 6 ++- libs/binder/RpcSession.cpp | 85 +++++++++++++++++++++++---------- libs/binder/RpcState.cpp | 21 ++++++++ libs/binder/RpcState.h | 3 ++ libs/binder/RpcWireFormat.h | 16 +++++++ libs/binder/include/binder/RpcSession.h | 28 ++++++++--- 6 files changed, 124 insertions(+), 35 deletions(-) (limited to 'libs/binder/RpcState.cpp') diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 2d2eed2671..60be406f6f 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -307,13 +307,15 @@ void RpcServer::establishConnection(sp&& server, base::unique_fd clie } detachGuard.Disable(); - session->preJoin(std::move(thisThread)); + session->preJoinThreadOwnership(std::move(thisThread)); } + auto setupResult = session->preJoinSetup(std::move(clientFd)); + // avoid strong cycle server = nullptr; - RpcSession::join(std::move(session), std::move(clientFd)); + RpcSession::join(std::move(session), std::move(setupResult)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index de9aa220f8..2a230d290d 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -236,7 +236,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock&& session, unique_fd client) { +RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) - sp connection = session->assignServerToThisThread(std::move(client)); + sp connection = assignServerToThisThread(std::move(fd)); - while (true) { - status_t error = session->state()->getAndExecuteCommand(connection->fd, session, - RpcState::CommandType::ANY); + status_t status = + mState->readConnectionInit(connection->fd, sp::fromExisting(this)); - if (error != OK) { - LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", - statusToString(error).c_str()); - break; + return PreJoinSetupResult{ + .connection = std::move(connection), + .status = status, + }; +} + +void RpcSession::join(sp&& session, PreJoinSetupResult&& setupResult) { + sp& connection = setupResult.connection; + + if (setupResult.status == OK) { + while (true) { + status_t status = session->state()->getAndExecuteCommand(connection->fd, session, + RpcState::CommandType::ANY); + if (status != OK) { + LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", + statusToString(status).c_str()); + break; + } } + } else { + ALOGE("Connection failed to init, closing with status %s", + statusToString(setupResult.status).c_str()); } LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), @@ -381,14 +397,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t unique_fd fd = std::move(serverFd); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp session = thiz; - session->preJoin(std::move(thread)); - ownershipTransferred = true; - joinCv.notify_one(); + session->preJoinThreadOwnership(std::move(thread)); + + // only continue once we have a response or the connection fails + auto setupResult = session->preJoinSetup(std::move(fd)); + ownershipTransferred = true; threadLock.unlock(); + joinCv.notify_one(); // do not use & vars below - RpcSession::join(std::move(session), std::move(fd)); + RpcSession::join(std::move(session), std::move(setupResult)); }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); @@ -403,20 +422,32 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t } bool RpcSession::addClientConnection(unique_fd fd) { - std::lock_guard _l(mMutex); + sp connection = sp::make(); + { + std::lock_guard _l(mMutex); + + // first client connection added, but setForServer not called, so + // initializaing for a client. + if (mShutdownTrigger == nullptr) { + mShutdownTrigger = FdTrigger::make(); + mEventListener = mShutdownListener = sp::make(); + if (mShutdownTrigger == nullptr) return false; + } - // first client connection added, but setForServer not called, so - // initializaing for a client. - if (mShutdownTrigger == nullptr) { - mShutdownTrigger = FdTrigger::make(); - mEventListener = mShutdownListener = sp::make(); - if (mShutdownTrigger == nullptr) return false; + connection->fd = std::move(fd); + connection->exclusiveTid = gettid(); + mClientConnections.push_back(connection); } - sp session = sp::make(); - session->fd = std::move(fd); - mClientConnections.push_back(session); - return true; + status_t status = + mState->sendConnectionInit(connection->fd, sp::fromExisting(this)); + + { + std::lock_guard _l(mMutex); + connection->exclusiveTid = std::nullopt; + } + + return status == OK; } bool RpcSession::setForServer(const wp& server, const wp& eventListener, @@ -519,7 +550,9 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp& sessi // in regular binder, this would usually be a deadlock :) LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, "Session has no client connections. This is required for an RPC server " - "to make any non-nested (e.g. oneway or on another thread) calls."); + "to make any non-nested (e.g. oneway or on another thread) calls. " + "Use: %d. Server connections: %zu", + static_cast(use), mSession->mServerConnections.size()); LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", mSession->mClientConnections.size(), mSession->mServerConnections.size()); diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 62eb58adba..53eba5aea6 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -265,6 +265,27 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp& sessi return OK; } +status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp& session) { + RpcClientConnectionInit init{ + .msg = RPC_CONNECTION_INIT_OKAY, + }; + return rpcSend(fd, session, "connection init", &init, sizeof(init)); +} + +status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp& session) { + RpcClientConnectionInit init; + if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK) + return status; + + static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY)); + if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) { + ALOGE("Connection init message unrecognized %.*s", static_cast(sizeof(init.msg)), + init.msg); + return BAD_VALUE; + } + return OK; +} + sp RpcState::getRootObject(const base::unique_fd& fd, const sp& session) { Parcel data; data.markForRpc(session); diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index db142a11f8..5bfef69c32 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -51,6 +51,9 @@ public: RpcState(); ~RpcState(); + status_t sendConnectionInit(const base::unique_fd& fd, const sp& session); + status_t readConnectionInit(const base::unique_fd& fd, const sp& session); + // TODO(b/182940634): combine some special transactions into one "getServerInfo" call? sp getRootObject(const base::unique_fd& fd, const sp& session); status_t getMaxThreads(const base::unique_fd& fd, const sp& session, diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h index 649c1eeb8e..b5e5bc1e0f 100644 --- a/libs/binder/RpcWireFormat.h +++ b/libs/binder/RpcWireFormat.h @@ -26,12 +26,28 @@ enum : uint8_t { RPC_CONNECTION_OPTION_REVERSE = 0x1, }; +/** + * This is sent to an RpcServer in order to request a new connection is created, + * either as part of a new session or an existing session + */ struct RpcConnectionHeader { int32_t sessionId; uint8_t options; uint8_t reserved[3]; }; +#define RPC_CONNECTION_INIT_OKAY "cci" + +/** + * Whenever a client connection is setup, this is sent as the initial + * transaction. The main use of this is in order to control the timing for when + * a reverse connection is setup. + */ +struct RpcClientConnectionInit { + char msg[4]; + uint8_t reserved[4]; +}; + enum : uint32_t { /** * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 4650cf21ca..6ad15f2cd4 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -185,13 +185,6 @@ private: bool mShutdown = false; }; - status_t readId(); - - // transfer ownership of thread - void preJoin(std::thread thread); - // join on thread passed to preJoin - static void join(sp&& session, base::unique_fd client); - struct RpcConnection : public RefBase { base::unique_fd fd; @@ -200,6 +193,27 @@ private: std::optional exclusiveTid; }; + status_t readId(); + + // A thread joining a server must always call these functions in order, and + // cleanup is only programmed once into join. These are in separate + // functions in order to allow for different locks to be taken during + // different parts of setup. + // + // transfer ownership of thread (usually done while a lock is taken on the + // structure which originally owns the thread) + void preJoinThreadOwnership(std::thread thread); + // pass FD to thread and read initial connection information + struct PreJoinSetupResult { + // Server connection object associated with this + sp connection; + // Status of setup + status_t status; + }; + PreJoinSetupResult preJoinSetup(base::unique_fd fd); + // join on thread passed to preJoinThreadOwnership + static void join(sp&& session, PreJoinSetupResult&& result); + [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address); [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, bool server); -- cgit v1.2.3-59-g8ed1b