diff options
-rw-r--r-- | libs/binder/RpcServer.cpp | 6 | ||||
-rw-r--r-- | libs/binder/RpcSession.cpp | 85 | ||||
-rw-r--r-- | libs/binder/RpcState.cpp | 21 | ||||
-rw-r--r-- | libs/binder/RpcState.h | 3 | ||||
-rw-r--r-- | libs/binder/RpcWireFormat.h | 16 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 28 |
6 files changed, 124 insertions, 35 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 2d2eed2671..60be406f6f 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -307,13 +307,15 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } detachGuard.Disable(); - session->preJoin(std::move(thisThread)); + session->preJoinThreadOwnership(std::move(thisThread)); } + auto setupResult = session->preJoinSetup(std::move(clientFd)); + // avoid strong cycle server = nullptr; - RpcSession::join(std::move(session), std::move(clientFd)); + RpcSession::join(std::move(session), std::move(setupResult)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index de9aa220f8..2a230d290d 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -236,7 +236,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } -void RpcSession::preJoin(std::thread thread) { +void RpcSession::preJoinThreadOwnership(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { @@ -245,20 +245,36 @@ void RpcSession::preJoin(std::thread thread) { } } -void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { +RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) - sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); + sp<RpcConnection> connection = assignServerToThisThread(std::move(fd)); - while (true) { - status_t error = session->state()->getAndExecuteCommand(connection->fd, session, - RpcState::CommandType::ANY); + status_t status = + mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); - if (error != OK) { - LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", - statusToString(error).c_str()); - break; + return PreJoinSetupResult{ + .connection = std::move(connection), + .status = status, + }; +} + +void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) { + sp<RpcConnection>& connection = setupResult.connection; + + if (setupResult.status == OK) { + while (true) { + status_t status = session->state()->getAndExecuteCommand(connection->fd, session, + RpcState::CommandType::ANY); + if (status != OK) { + LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", + statusToString(status).c_str()); + break; + } } + } else { + ALOGE("Connection failed to init, closing with status %s", + statusToString(setupResult.status).c_str()); } LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), @@ -381,14 +397,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t unique_fd fd = std::move(serverFd); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; - session->preJoin(std::move(thread)); - ownershipTransferred = true; - joinCv.notify_one(); + session->preJoinThreadOwnership(std::move(thread)); + + // only continue once we have a response or the connection fails + auto setupResult = session->preJoinSetup(std::move(fd)); + ownershipTransferred = true; threadLock.unlock(); + joinCv.notify_one(); // do not use & vars below - RpcSession::join(std::move(session), std::move(fd)); + RpcSession::join(std::move(session), std::move(setupResult)); }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); @@ -403,20 +422,32 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t } bool RpcSession::addClientConnection(unique_fd fd) { - std::lock_guard<std::mutex> _l(mMutex); + sp<RpcConnection> connection = sp<RpcConnection>::make(); + { + std::lock_guard<std::mutex> _l(mMutex); + + // first client connection added, but setForServer not called, so + // initializaing for a client. + if (mShutdownTrigger == nullptr) { + mShutdownTrigger = FdTrigger::make(); + mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); + if (mShutdownTrigger == nullptr) return false; + } - // first client connection added, but setForServer not called, so - // initializaing for a client. - if (mShutdownTrigger == nullptr) { - mShutdownTrigger = FdTrigger::make(); - mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); - if (mShutdownTrigger == nullptr) return false; + connection->fd = std::move(fd); + connection->exclusiveTid = gettid(); + mClientConnections.push_back(connection); } - sp<RpcConnection> session = sp<RpcConnection>::make(); - session->fd = std::move(fd); - mClientConnections.push_back(session); - return true; + status_t status = + mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + + { + std::lock_guard<std::mutex> _l(mMutex); + connection->exclusiveTid = std::nullopt; + } + + return status == OK; } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, @@ -519,7 +550,9 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // in regular binder, this would usually be a deadlock :) LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, "Session has no client connections. This is required for an RPC server " - "to make any non-nested (e.g. oneway or on another thread) calls."); + "to make any non-nested (e.g. oneway or on another thread) calls. " + "Use: %d. Server connections: %zu", + static_cast<int>(use), mSession->mServerConnections.size()); LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", mSession->mClientConnections.size(), mSession->mServerConnections.size()); diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 62eb58adba..53eba5aea6 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -265,6 +265,27 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi return OK; } +status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) { + RpcClientConnectionInit init{ + .msg = RPC_CONNECTION_INIT_OKAY, + }; + return rpcSend(fd, session, "connection init", &init, sizeof(init)); +} + +status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) { + RpcClientConnectionInit init; + if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK) + return status; + + static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY)); + if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) { + ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)), + init.msg); + return BAD_VALUE; + } + return OK; +} + sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) { Parcel data; data.markForRpc(session); diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index db142a11f8..5bfef69c32 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -51,6 +51,9 @@ public: RpcState(); ~RpcState(); + status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session); + status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session); + // TODO(b/182940634): combine some special transactions into one "getServerInfo" call? sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session); status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session, diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h index 649c1eeb8e..b5e5bc1e0f 100644 --- a/libs/binder/RpcWireFormat.h +++ b/libs/binder/RpcWireFormat.h @@ -26,12 +26,28 @@ enum : uint8_t { RPC_CONNECTION_OPTION_REVERSE = 0x1, }; +/** + * This is sent to an RpcServer in order to request a new connection is created, + * either as part of a new session or an existing session + */ struct RpcConnectionHeader { int32_t sessionId; uint8_t options; uint8_t reserved[3]; }; +#define RPC_CONNECTION_INIT_OKAY "cci" + +/** + * Whenever a client connection is setup, this is sent as the initial + * transaction. The main use of this is in order to control the timing for when + * a reverse connection is setup. + */ +struct RpcClientConnectionInit { + char msg[4]; + uint8_t reserved[4]; +}; + enum : uint32_t { /** * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 4650cf21ca..6ad15f2cd4 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -185,13 +185,6 @@ private: bool mShutdown = false; }; - status_t readId(); - - // transfer ownership of thread - void preJoin(std::thread thread); - // join on thread passed to preJoin - static void join(sp<RpcSession>&& session, base::unique_fd client); - struct RpcConnection : public RefBase { base::unique_fd fd; @@ -200,6 +193,27 @@ private: std::optional<pid_t> exclusiveTid; }; + status_t readId(); + + // A thread joining a server must always call these functions in order, and + // cleanup is only programmed once into join. These are in separate + // functions in order to allow for different locks to be taken during + // different parts of setup. + // + // transfer ownership of thread (usually done while a lock is taken on the + // structure which originally owns the thread) + void preJoinThreadOwnership(std::thread thread); + // pass FD to thread and read initial connection information + struct PreJoinSetupResult { + // Server connection object associated with this + sp<RpcConnection> connection; + // Status of setup + status_t status; + }; + PreJoinSetupResult preJoinSetup(base::unique_fd fd); + // join on thread passed to preJoinThreadOwnership + static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result); + [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address); [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, bool server); |