diff options
-rw-r--r-- | libs/binder/RpcServer.cpp | 6 | ||||
-rw-r--r-- | libs/binder/RpcSession.cpp | 170 | ||||
-rw-r--r-- | libs/binder/RpcState.cpp | 40 | ||||
-rw-r--r-- | libs/binder/RpcState.h | 6 | ||||
-rw-r--r-- | libs/binder/RpcWireFormat.h | 16 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 34 | ||||
-rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 8 |
7 files changed, 201 insertions, 79 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 2d2eed2671..60be406f6f 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -307,13 +307,15 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie } detachGuard.Disable(); - session->preJoin(std::move(thisThread)); + session->preJoinThreadOwnership(std::move(thisThread)); } + auto setupResult = session->preJoinSetup(std::move(clientFd)); + // avoid strong cycle server = nullptr; - RpcSession::join(std::move(session), std::move(clientFd)); + RpcSession::join(std::move(session), std::move(setupResult)); } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index de9aa220f8..a759ae36c3 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -104,12 +104,18 @@ bool RpcSession::addNullDebuggingClient() { } sp<IBinder> RpcSession::getRootObject() { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT, &connection); + if (status != OK) return nullptr; return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this)); } status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT, &connection); + if (status != OK) return status; return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); } @@ -133,16 +139,22 @@ bool RpcSession::shutdownAndWait(bool wait) { status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), - (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC - : ConnectionUse::CLIENT); + ExclusiveConnection connection; + status_t status = + ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC + : ConnectionUse::CLIENT, + &connection); + if (status != OK) return status; return state()->transact(connection.fd(), binder, code, data, sp<RpcSession>::fromExisting(this), reply, flags); } status_t RpcSession::sendDecStrong(const RpcAddress& address) { - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), - ConnectionUse::CLIENT_REFCOUNT); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT_REFCOUNT, &connection); + if (status != OK) return status; return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address); } @@ -208,9 +220,12 @@ status_t RpcSession::readId() { int32_t id; - ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT); - status_t status = - state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id); + ExclusiveConnection connection; + status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), + ConnectionUse::CLIENT, &connection); + if (status != OK) return status; + + status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id); if (status != OK) return status; LOG_RPC_DETAIL("RpcSession %p has id %d", this, id); @@ -236,7 +251,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std:: } } -void RpcSession::preJoin(std::thread thread) { +void RpcSession::preJoinThreadOwnership(std::thread thread) { LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread"); { @@ -245,20 +260,36 @@ void RpcSession::preJoin(std::thread thread) { } } -void RpcSession::join(sp<RpcSession>&& session, unique_fd client) { +RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) - sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client)); + sp<RpcConnection> connection = assignServerToThisThread(std::move(fd)); - while (true) { - status_t error = session->state()->getAndExecuteCommand(connection->fd, session, - RpcState::CommandType::ANY); + status_t status = + mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); - if (error != OK) { - LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", - statusToString(error).c_str()); - break; + return PreJoinSetupResult{ + .connection = std::move(connection), + .status = status, + }; +} + +void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) { + sp<RpcConnection>& connection = setupResult.connection; + + if (setupResult.status == OK) { + while (true) { + status_t status = session->state()->getAndExecuteCommand(connection->fd, session, + RpcState::CommandType::ANY); + if (status != OK) { + LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", + statusToString(status).c_str()); + break; + } } + } else { + ALOGE("Connection failed to init, closing with status %s", + statusToString(setupResult.status).c_str()); } LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), @@ -381,14 +412,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t unique_fd fd = std::move(serverFd); // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) sp<RpcSession> session = thiz; - session->preJoin(std::move(thread)); - ownershipTransferred = true; - joinCv.notify_one(); + session->preJoinThreadOwnership(std::move(thread)); + + // only continue once we have a response or the connection fails + auto setupResult = session->preJoinSetup(std::move(fd)); + ownershipTransferred = true; threadLock.unlock(); + joinCv.notify_one(); // do not use & vars below - RpcSession::join(std::move(session), std::move(fd)); + RpcSession::join(std::move(session), std::move(setupResult)); }); joinCv.wait(lock, [&] { return ownershipTransferred; }); LOG_ALWAYS_FATAL_IF(!ownershipTransferred); @@ -403,20 +437,32 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t } bool RpcSession::addClientConnection(unique_fd fd) { - std::lock_guard<std::mutex> _l(mMutex); + sp<RpcConnection> connection = sp<RpcConnection>::make(); + { + std::lock_guard<std::mutex> _l(mMutex); + + // first client connection added, but setForServer not called, so + // initializaing for a client. + if (mShutdownTrigger == nullptr) { + mShutdownTrigger = FdTrigger::make(); + mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); + if (mShutdownTrigger == nullptr) return false; + } - // first client connection added, but setForServer not called, so - // initializaing for a client. - if (mShutdownTrigger == nullptr) { - mShutdownTrigger = FdTrigger::make(); - mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); - if (mShutdownTrigger == nullptr) return false; + connection->fd = std::move(fd); + connection->exclusiveTid = gettid(); + mClientConnections.push_back(connection); } - sp<RpcConnection> session = sp<RpcConnection>::make(); - session->fd = std::move(fd); - mClientConnections.push_back(session); - return true; + status_t status = + mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + + { + std::lock_guard<std::mutex> _l(mMutex); + connection->exclusiveTid = std::nullopt; + } + + return status == OK; } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, @@ -462,13 +508,16 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { return false; } -RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& session, - ConnectionUse use) - : mSession(session) { +status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, ConnectionUse use, + ExclusiveConnection* connection) { + connection->mSession = session; + connection->mConnection = nullptr; + connection->mReentrant = false; + pid_t tid = gettid(); - std::unique_lock<std::mutex> _l(mSession->mMutex); + std::unique_lock<std::mutex> _l(session->mMutex); - mSession->mWaitingThreads++; + session->mWaitingThreads++; while (true) { sp<RpcConnection> exclusive; sp<RpcConnection> available; @@ -476,8 +525,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection if available - findConnection(tid, &exclusive, &available, mSession->mClientConnections, - mSession->mClientConnectionsOffset); + findConnection(tid, &exclusive, &available, session->mClientConnections, + session->mClientConnectionsOffset); // WARNING: this assumes a server cannot request its client to send // a transaction, as mServerConnections is excluded below. @@ -490,8 +539,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // command. So, we move to considering the second available thread // for subsequent calls. if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) { - mSession->mClientConnectionsOffset = - (mSession->mClientConnectionsOffset + 1) % mSession->mClientConnections.size(); + session->mClientConnectionsOffset = + (session->mClientConnectionsOffset + 1) % session->mClientConnections.size(); } // USE SERVING SOCKET (for nested transaction) @@ -499,33 +548,36 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi // asynchronous calls cannot be nested if (use != ConnectionUse::CLIENT_ASYNC) { // server connections are always assigned to a thread - findConnection(tid, &exclusive, nullptr /*available*/, mSession->mServerConnections, + findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections, 0 /* index hint */); } // if our thread is already using a connection, prioritize using that if (exclusive != nullptr) { - mConnection = exclusive; - mReentrant = true; + connection->mConnection = exclusive; + connection->mReentrant = true; break; } else if (available != nullptr) { - mConnection = available; - mConnection->exclusiveTid = tid; + connection->mConnection = available; + connection->mConnection->exclusiveTid = tid; break; } - // TODO(b/185167543): this should return an error, rather than crash a - // server - // in regular binder, this would usually be a deadlock :) - LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0, - "Session has no client connections. This is required for an RPC server " - "to make any non-nested (e.g. oneway or on another thread) calls."); + if (session->mClientConnections.size() == 0) { + ALOGE("Session has no client connections. This is required for an RPC server to make " + "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server " + "connections: %zu", + static_cast<int>(use), session->mServerConnections.size()); + return WOULD_BLOCK; + } LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", - mSession->mClientConnections.size(), mSession->mServerConnections.size()); - mSession->mAvailableConnectionCv.wait(_l); + session->mClientConnections.size(), session->mServerConnections.size()); + session->mAvailableConnectionCv.wait(_l); } - mSession->mWaitingThreads--; + session->mWaitingThreads--; + + return OK; } void RpcSession::ExclusiveConnection::findConnection(pid_t tid, sp<RpcConnection>* exclusive, @@ -559,7 +611,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() { // reentrant use of a connection means something less deep in the call stack // is using this fd, and it retains the right to it. So, we don't give up // exclusive ownership, and no thread is freed. - if (!mReentrant) { + if (!mReentrant && mConnection != nullptr) { std::unique_lock<std::mutex> _l(mSession->mMutex); mConnection->exclusiveTid = std::nullopt; if (mSession->mWaitingThreads > 0) { diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 6899981e83..53eba5aea6 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -265,6 +265,27 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi return OK; } +status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) { + RpcClientConnectionInit init{ + .msg = RPC_CONNECTION_INIT_OKAY, + }; + return rpcSend(fd, session, "connection init", &init, sizeof(init)); +} + +status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) { + RpcClientConnectionInit init; + if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK) + return status; + + static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY)); + if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) { + ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)), + init.msg); + return BAD_VALUE; + } + return OK; +} + sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) { Parcel data; data.markForRpc(session); @@ -565,7 +586,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio status != OK) return status; - return processTransactInternal(fd, session, std::move(transactionData), nullptr /*targetRef*/); + return processTransactInternal(fd, session, std::move(transactionData)); } static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize, @@ -578,7 +599,13 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d } status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, - CommandData transactionData, sp<IBinder>&& targetRef) { + CommandData transactionData) { + // for 'recursive' calls to this, we have already read and processed the + // binder from the transaction data and taken reference counts into account, + // so it is cached here. + sp<IBinder> targetRef; +processTransactInternalTailCall: + if (transactionData.size() < sizeof(RpcWireTransaction)) { ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!", sizeof(RpcWireTransaction), transactionData.size()); @@ -751,13 +778,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R // - gotta go fast auto& todo = const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top()); - CommandData nextData = std::move(todo.data); - sp<IBinder> nextRef = std::move(todo.ref); + // reset up arguments + transactionData = std::move(todo.data); + targetRef = std::move(todo.ref); it->second.asyncTodo.pop(); - _l.unlock(); - return processTransactInternal(fd, session, std::move(nextData), - std::move(nextRef)); + goto processTransactInternalTailCall; } } return OK; diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index 13c31154eb..5bfef69c32 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -51,6 +51,9 @@ public: RpcState(); ~RpcState(); + status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session); + status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session); + // TODO(b/182940634): combine some special transactions into one "getServerInfo" call? sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session); status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session, @@ -144,8 +147,7 @@ private: const RpcWireHeader& command); [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, - CommandData transactionData, - sp<IBinder>&& targetRef); + CommandData transactionData); [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, const RpcWireHeader& command); diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h index 649c1eeb8e..b5e5bc1e0f 100644 --- a/libs/binder/RpcWireFormat.h +++ b/libs/binder/RpcWireFormat.h @@ -26,12 +26,28 @@ enum : uint8_t { RPC_CONNECTION_OPTION_REVERSE = 0x1, }; +/** + * This is sent to an RpcServer in order to request a new connection is created, + * either as part of a new session or an existing session + */ struct RpcConnectionHeader { int32_t sessionId; uint8_t options; uint8_t reserved[3]; }; +#define RPC_CONNECTION_INIT_OKAY "cci" + +/** + * Whenever a client connection is setup, this is sent as the initial + * transaction. The main use of this is in order to control the timing for when + * a reverse connection is setup. + */ +struct RpcClientConnectionInit { + char msg[4]; + uint8_t reserved[4]; +}; + enum : uint32_t { /** * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 4650cf21ca..4ddf422850 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -185,13 +185,6 @@ private: bool mShutdown = false; }; - status_t readId(); - - // transfer ownership of thread - void preJoin(std::thread thread); - // join on thread passed to preJoin - static void join(sp<RpcSession>&& session, base::unique_fd client); - struct RpcConnection : public RefBase { base::unique_fd fd; @@ -200,6 +193,27 @@ private: std::optional<pid_t> exclusiveTid; }; + status_t readId(); + + // A thread joining a server must always call these functions in order, and + // cleanup is only programmed once into join. These are in separate + // functions in order to allow for different locks to be taken during + // different parts of setup. + // + // transfer ownership of thread (usually done while a lock is taken on the + // structure which originally owns the thread) + void preJoinThreadOwnership(std::thread thread); + // pass FD to thread and read initial connection information + struct PreJoinSetupResult { + // Server connection object associated with this + sp<RpcConnection> connection; + // Status of setup + status_t status; + }; + PreJoinSetupResult preJoinSetup(base::unique_fd fd); + // join on thread passed to preJoinThreadOwnership + static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result); + [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address); [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, bool server); @@ -216,10 +230,12 @@ private: CLIENT_REFCOUNT, }; - // RAII object for session connection + // Object representing exclusive access to a connection. class ExclusiveConnection { public: - explicit ExclusiveConnection(const sp<RpcSession>& session, ConnectionUse use); + static status_t find(const sp<RpcSession>& session, ConnectionUse use, + ExclusiveConnection* connection); + ~ExclusiveConnection(); const base::unique_fd& fd() { return mConnection->fd; } diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index 82f8a3e273..a79295a792 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -1007,6 +1007,14 @@ TEST_P(BinderRpc, Callbacks) { } } +TEST_P(BinderRpc, OnewayCallbackWithNoThread) { + auto proc = createRpcTestSocketServerProcess(1); + auto cb = sp<MyBinderRpcCallback>::make(); + + Status status = proc.rootIface->doCallback(cb, true /*oneway*/, false /*delayed*/, "anything"); + EXPECT_EQ(WOULD_BLOCK, status.transactionError()); +} + TEST_P(BinderRpc, Die) { for (bool doDeathCleanup : {true, false}) { auto proc = createRpcTestSocketServerProcess(1); |