diff options
| author | 2021-06-15 18:02:40 +0000 | |
|---|---|---|
| committer | 2021-06-15 18:02:40 +0000 | |
| commit | 58fb61efef5da8d60f2ce89152b7cc2ed00d5f12 (patch) | |
| tree | 64b06f1bb2d93dab0799077cf613044e67951741 | |
| parent | c6f1ee8027ebfe45e3e278ec5590e68a7541cd75 (diff) | |
| parent | 2ce034f75bf2262af4fc751f087adcfe0c6919e1 (diff) | |
Merge changes I881f63b8,I4c5edd2d,I8ff0d29c,I6578c3a4,Iea286ab0, ... am: 2ce034f75b
Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1735011
Change-Id: I29254913a0b0ac090b42839d007ad6142077eb9f
| -rw-r--r-- | libs/binder/RpcAddress.cpp | 27 | ||||
| -rw-r--r-- | libs/binder/RpcServer.cpp | 40 | ||||
| -rw-r--r-- | libs/binder/RpcSession.cpp | 135 | ||||
| -rw-r--r-- | libs/binder/RpcState.cpp | 237 | ||||
| -rw-r--r-- | libs/binder/RpcState.h | 70 | ||||
| -rw-r--r-- | libs/binder/RpcWireFormat.h | 20 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcAddress.h | 21 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcServer.h | 8 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcSession.h | 47 | ||||
| -rw-r--r-- | libs/binder/tests/IBinderRpcTest.aidl | 1 | ||||
| -rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 69 |
11 files changed, 409 insertions, 266 deletions
diff --git a/libs/binder/RpcAddress.cpp b/libs/binder/RpcAddress.cpp index 5c3232045e..98dee9a039 100644 --- a/libs/binder/RpcAddress.cpp +++ b/libs/binder/RpcAddress.cpp @@ -29,7 +29,7 @@ RpcAddress RpcAddress::zero() { } bool RpcAddress::isZero() const { - RpcWireAddress ZERO{0}; + RpcWireAddress ZERO{.options = 0}; return memcmp(mRawAddr.get(), &ZERO, sizeof(RpcWireAddress)) == 0; } @@ -51,13 +51,34 @@ static void ReadRandomBytes(uint8_t* buf, size_t len) { close(fd); } -RpcAddress RpcAddress::unique() { +RpcAddress RpcAddress::random(bool forServer) { + // The remainder of this header acts as reserved space for different kinds + // of binder objects. + uint64_t options = RPC_WIRE_ADDRESS_OPTION_CREATED; + + // servers and clients allocate addresses independently, so this bit can + // tell you where an address originates + if (forServer) options |= RPC_WIRE_ADDRESS_OPTION_FOR_SERVER; + RpcAddress ret; - ReadRandomBytes((uint8_t*)ret.mRawAddr.get(), sizeof(RpcWireAddress)); + RpcWireAddress* raw = ret.mRawAddr.get(); + + raw->options = options; + ReadRandomBytes(raw->address, sizeof(raw->address)); + LOG_RPC_DETAIL("Creating new address: %s", ret.toString().c_str()); return ret; } +bool RpcAddress::isForServer() const { + return mRawAddr.get()->options & RPC_WIRE_ADDRESS_OPTION_FOR_SERVER; +} + +bool RpcAddress::isRecognizedType() const { + uint64_t allKnownOptions = RPC_WIRE_ADDRESS_OPTION_CREATED | RPC_WIRE_ADDRESS_OPTION_FOR_SERVER; + return (mRawAddr.get()->options & ~allKnownOptions) == 0; +} + RpcAddress RpcAddress::fromRawEmbedded(const RpcWireAddress* raw) { RpcAddress addr; memcpy(addr.mRawAddr.get(), raw, sizeof(RpcWireAddress)); diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 60be406f6f..a8f3fa8f6f 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -270,14 +270,25 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie return; } - if (header.sessionId == RPC_SESSION_ID_NEW) { + RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId); + + if (sessionId.isZero()) { if (reverse) { ALOGE("Cannot create a new session with a reverse connection, would leak"); return; } - LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs"); - server->mSessionIdCounter++; + RpcAddress sessionId = RpcAddress::zero(); + size_t tries = 0; + do { + // don't block if there is some entropy issue + if (tries++ > 5) { + ALOGE("Cannot find new address: %s", sessionId.toString().c_str()); + return; + } + + sessionId = RpcAddress::random(true /*forServer*/); + } while (server->mSessions.end() != server->mSessions.find(sessionId)); session = RpcSession::make(); session->setMaxThreads(server->mMaxThreads); @@ -285,23 +296,24 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie sp<RpcServer::EventListener>::fromExisting( static_cast<RpcServer::EventListener*>( server.get())), - server->mSessionIdCounter)) { + sessionId)) { ALOGE("Failed to attach server to session"); return; } - server->mSessions[server->mSessionIdCounter] = session; + server->mSessions[sessionId] = session; } else { - auto it = server->mSessions.find(header.sessionId); + auto it = server->mSessions.find(sessionId); if (it == server->mSessions.end()) { - ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId); + ALOGE("Cannot add thread, no record of session with ID %s", + sessionId.toString().c_str()); return; } session = it->second; } if (reverse) { - LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)), + LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true), "server state must already be initialized"); return; } @@ -350,19 +362,21 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { return true; } -void RpcServer::onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) { +void RpcServer::onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) { auto id = session->mId; LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID"); - LOG_RPC_DETAIL("Dropping session %d", *id); + LOG_RPC_DETAIL("Dropping session with address %s", id->toString().c_str()); std::lock_guard<std::mutex> _l(mLock); auto it = mSessions.find(*id); - LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %d", *id); - LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %d", *id); + LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s", + id->toString().c_str()); + LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %s", + id->toString().c_str()); (void)mSessions.erase(it); } -void RpcServer::onSessionServerThreadEnded() { +void RpcServer::onSessionIncomingThreadEnded() { mShutdownCv.notify_all(); } diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index a759ae36c3..4f55eef2d1 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -51,7 +51,7 @@ RpcSession::~RpcSession() { LOG_RPC_DETAIL("RpcSession destroyed %p", this); std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mServerConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mIncomingConnections.size() != 0, "Should not be able to destroy a session with servers in use."); } @@ -61,10 +61,10 @@ sp<RpcSession> RpcSession::make() { void RpcSession::setMaxThreads(size_t threads) { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(), + LOG_ALWAYS_FATAL_IF(!mOutgoingConnections.empty() || !mIncomingConnections.empty(), "Must set max threads before setting up connections, but has %zu client(s) " "and %zu server(s)", - mClientConnections.size(), mServerConnections.size()); + mOutgoingConnections.size(), mIncomingConnections.size()); mMaxThreads = threads; } @@ -100,7 +100,7 @@ bool RpcSession::addNullDebuggingClient() { return false; } - return addClientConnection(std::move(serverFd)); + return addOutgoingConnection(std::move(serverFd), false); } sp<IBinder> RpcSession::getRootObject() { @@ -108,7 +108,7 @@ sp<IBinder> RpcSession::getRootObject() { status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return nullptr; - return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this)); + return state()->getRootObject(connection.get(), sp<RpcSession>::fromExisting(this)); } status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { @@ -116,7 +116,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) { status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return status; - return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads); + return state()->getMaxThreads(connection.get(), sp<RpcSession>::fromExisting(this), maxThreads); } bool RpcSession::shutdownAndWait(bool wait) { @@ -146,7 +146,7 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa : ConnectionUse::CLIENT, &connection); if (status != OK) return status; - return state()->transact(connection.fd(), binder, code, data, + return state()->transact(connection.get(), binder, code, data, sp<RpcSession>::fromExisting(this), reply, flags); } @@ -155,7 +155,7 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) { status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT_REFCOUNT, &connection); if (status != OK) return status; - return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address); + return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address); } std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() { @@ -218,28 +218,27 @@ status_t RpcSession::readId() { LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client."); } - int32_t id; - ExclusiveConnection connection; status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT, &connection); if (status != OK) return status; - status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id); + mId = RpcAddress::zero(); + status = state()->getSessionId(connection.get(), sp<RpcSession>::fromExisting(this), + &mId.value()); if (status != OK) return status; - LOG_RPC_DETAIL("RpcSession %p has id %d", this, id); - mId = id; + LOG_RPC_DETAIL("RpcSession %p has id %s", this, mId->toString().c_str()); return OK; } -void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded( +void RpcSession::WaitForShutdownListener::onSessionLockedAllIncomingThreadsEnded( const sp<RpcSession>& session) { (void)session; mShutdown = true; } -void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() { +void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() { mCv.notify_all(); } @@ -263,10 +262,9 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) { RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) - sp<RpcConnection> connection = assignServerToThisThread(std::move(fd)); + sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd)); - status_t status = - mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + status_t status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this)); return PreJoinSetupResult{ .connection = std::move(connection), @@ -279,7 +277,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult if (setupResult.status == OK) { while (true) { - status_t status = session->state()->getAndExecuteCommand(connection->fd, session, + status_t status = session->state()->getAndExecuteCommand(connection, session, RpcState::CommandType::ANY); if (status != OK) { LOG_RPC_DETAIL("Binder connection thread closing w/ status %s", @@ -292,7 +290,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult statusToString(setupResult.status).c_str()); } - LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection), + LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection), "bad state: connection object guaranteed to be in list"); sp<RpcSession::EventListener> listener; @@ -309,23 +307,28 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult session = nullptr; if (listener != nullptr) { - listener->onSessionServerThreadEnded(); + listener->onSessionIncomingThreadEnded(); } } -wp<RpcServer> RpcSession::server() { - return mForServer; +sp<RpcServer> RpcSession::server() { + RpcServer* unsafeServer = mForServer.unsafe_get(); + sp<RpcServer> server = mForServer.promote(); + + LOG_ALWAYS_FATAL_IF((unsafeServer == nullptr) != (server == nullptr), + "wp<> is to avoid strong cycle only"); + return server; } bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { { std::lock_guard<std::mutex> _l(mMutex); - LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0, + LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0, "Must only setup session once, but already has %zu clients", - mClientConnections.size()); + mOutgoingConnections.size()); } - if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false; + if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false; // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once. @@ -362,7 +365,8 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { return true; } -bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) { +bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id, + bool reverse) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); @@ -386,9 +390,9 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t return false; } - RpcConnectionHeader header{ - .sessionId = id, - }; + RpcConnectionHeader header{.options = 0}; + memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress)); + if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE; if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) { @@ -428,7 +432,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t LOG_ALWAYS_FATAL_IF(!ownershipTransferred); return true; } else { - return addClientConnection(std::move(serverFd)); + return addOutgoingConnection(std::move(serverFd), true); } } @@ -436,7 +440,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t return false; } -bool RpcSession::addClientConnection(unique_fd fd) { +bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); @@ -451,11 +455,13 @@ bool RpcSession::addClientConnection(unique_fd fd) { connection->fd = std::move(fd); connection->exclusiveTid = gettid(); - mClientConnections.push_back(connection); + mOutgoingConnections.push_back(connection); } - status_t status = - mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this)); + status_t status = OK; + if (init) { + mState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this)); + } { std::lock_guard<std::mutex> _l(mMutex); @@ -466,7 +472,7 @@ bool RpcSession::addClientConnection(unique_fd fd) { } bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener, - int32_t sessionId) { + const RpcAddress& sessionId) { LOG_ALWAYS_FATAL_IF(mForServer != nullptr); LOG_ALWAYS_FATAL_IF(server == nullptr); LOG_ALWAYS_FATAL_IF(mEventListener != nullptr); @@ -482,25 +488,26 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene return true; } -sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) { +sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) { std::lock_guard<std::mutex> _l(mMutex); sp<RpcConnection> session = sp<RpcConnection>::make(); session->fd = std::move(fd); session->exclusiveTid = gettid(); - mServerConnections.push_back(session); + mIncomingConnections.push_back(session); return session; } -bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) { +bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) { std::lock_guard<std::mutex> _l(mMutex); - if (auto it = std::find(mServerConnections.begin(), mServerConnections.end(), connection); - it != mServerConnections.end()) { - mServerConnections.erase(it); - if (mServerConnections.size() == 0) { + if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection); + it != mIncomingConnections.end()) { + mIncomingConnections.erase(it); + if (mIncomingConnections.size() == 0) { sp<EventListener> listener = mEventListener.promote(); if (listener) { - listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this)); + listener->onSessionLockedAllIncomingThreadsEnded( + sp<RpcSession>::fromExisting(this)); } } return true; @@ -525,11 +532,11 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // CHECK FOR DEDICATED CLIENT SOCKET // // A server/looper should always use a dedicated connection if available - findConnection(tid, &exclusive, &available, session->mClientConnections, - session->mClientConnectionsOffset); + findConnection(tid, &exclusive, &available, session->mOutgoingConnections, + session->mOutgoingConnectionsOffset); // WARNING: this assumes a server cannot request its client to send - // a transaction, as mServerConnections is excluded below. + // a transaction, as mIncomingConnections is excluded below. // // Imagine we have more than one thread in play, and a single thread // sends a synchronous, then an asynchronous command. Imagine the @@ -539,17 +546,31 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co // command. So, we move to considering the second available thread // for subsequent calls. if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) { - session->mClientConnectionsOffset = - (session->mClientConnectionsOffset + 1) % session->mClientConnections.size(); + session->mOutgoingConnectionsOffset = (session->mOutgoingConnectionsOffset + 1) % + session->mOutgoingConnections.size(); } - // USE SERVING SOCKET (for nested transaction) - // - // asynchronous calls cannot be nested + // USE SERVING SOCKET (e.g. nested transaction) if (use != ConnectionUse::CLIENT_ASYNC) { + sp<RpcConnection> exclusiveIncoming; // server connections are always assigned to a thread - findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections, - 0 /* index hint */); + findConnection(tid, &exclusiveIncoming, nullptr /*available*/, + session->mIncomingConnections, 0 /* index hint */); + + // asynchronous calls cannot be nested, we currently allow ref count + // calls to be nested (so that you can use this without having extra + // threads). Note 'drainCommands' is used so that these ref counts can't + // build up. + if (exclusiveIncoming != nullptr) { + if (exclusiveIncoming->allowNested) { + // guaranteed to be processed as nested command + exclusive = exclusiveIncoming; + } else if (use == ConnectionUse::CLIENT_REFCOUNT && available == nullptr) { + // prefer available socket, but if we don't have one, don't + // wait for one + exclusive = exclusiveIncoming; + } + } } // if our thread is already using a connection, prioritize using that @@ -563,16 +584,16 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co break; } - if (session->mClientConnections.size() == 0) { + if (session->mOutgoingConnections.size() == 0) { ALOGE("Session has no client connections. This is required for an RPC server to make " "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server " "connections: %zu", - static_cast<int>(use), session->mServerConnections.size()); + static_cast<int>(use), session->mIncomingConnections.size()); return WOULD_BLOCK; } LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...", - session->mClientConnections.size(), session->mServerConnections.size()); + session->mOutgoingConnections.size(), session->mIncomingConnections.size()); session->mAvailableConnectionCv.wait(_l); } session->mWaitingThreads--; diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 53eba5aea6..fd2eff6870 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -83,21 +83,45 @@ status_t RpcState::onBinderLeaving(const sp<RpcSession>& session, const sp<IBind } LOG_ALWAYS_FATAL_IF(isRpc, "RPC binder must have known address at this point"); - auto&& [it, inserted] = mNodeForAddress.insert({RpcAddress::unique(), - BinderNode{ - .binder = binder, - .timesSent = 1, - .sentRef = binder, - }}); - // TODO(b/182939933): better organization could avoid needing this log - LOG_ALWAYS_FATAL_IF(!inserted); - - *outAddress = it->first; - return OK; + bool forServer = session->server() != nullptr; + + for (size_t tries = 0; tries < 5; tries++) { + auto&& [it, inserted] = mNodeForAddress.insert({RpcAddress::random(forServer), + BinderNode{ + .binder = binder, + .timesSent = 1, + .sentRef = binder, + }}); + if (inserted) { + *outAddress = it->first; + return OK; + } + + // well, we don't have visibility into the header here, but still + static_assert(sizeof(RpcWireAddress) == 40, "this log needs updating"); + ALOGW("2**256 is 1e77. If you see this log, you probably have some entropy issue, or maybe " + "you witness something incredible!"); + } + + ALOGE("Unable to create an address in order to send out %p", binder.get()); + return WOULD_BLOCK; } status_t RpcState::onBinderEntering(const sp<RpcSession>& session, const RpcAddress& address, sp<IBinder>* out) { + // ensure that: if we want to use addresses for something else in the future (for + // instance, allowing transitive binder sends), that we don't accidentally + // send those addresses to old server. Accidentally ignoring this in that + // case and considering the binder to be recognized could cause this + // process to accidentally proxy transactions for that binder. Of course, + // if we communicate with a binder, it could always be proxying + // information. However, we want to make sure that isn't done on accident + // by a client. + if (!address.isRecognizedType()) { + ALOGE("Address is of an unknown type, rejecting: %s", address.toString().c_str()); + return BAD_VALUE; + } + std::unique_lock<std::mutex> _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; @@ -117,6 +141,14 @@ status_t RpcState::onBinderEntering(const sp<RpcSession>& session, const RpcAddr return OK; } + // we don't know about this binder, so the other side of the connection + // should have created it. + if (address.isForServer() == !!session->server()) { + ALOGE("Server received unrecognized address which we should own the creation of %s.", + address.toString().c_str()); + return BAD_VALUE; + } + auto&& [it, inserted] = mNodeForAddress.insert({address, BinderNode{}}); LOG_ALWAYS_FATAL_IF(!inserted, "Failed to insert binder when creating proxy"); @@ -222,9 +254,11 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) { mData.reset(new (std::nothrow) uint8_t[size]); } -status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session, - const char* what, const void* data, size_t size) { - LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); +status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const char* what, const void* data, + size_t size) { + LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(), + hexString(data, size).c_str()); if (size > std::numeric_limits<ssize_t>::max()) { ALOGE("Cannot send %s at size %zu (too big)", what, size); @@ -232,12 +266,12 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& sess return BAD_VALUE; } - ssize_t sent = TEMP_FAILURE_RETRY(send(fd.get(), data, size, MSG_NOSIGNAL)); + ssize_t sent = TEMP_FAILURE_RETRY(send(connection->fd.get(), data, size, MSG_NOSIGNAL)); if (sent < 0 || sent != static_cast<ssize_t>(size)) { int savedErrno = errno; LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, - size, fd.get(), strerror(savedErrno)); + size, connection->fd.get(), strerror(savedErrno)); (void)session->shutdownAndWait(false); return -savedErrno; @@ -246,35 +280,41 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& sess return OK; } -status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, - const char* what, void* data, size_t size) { +status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const char* what, void* data, + size_t size) { if (size > std::numeric_limits<ssize_t>::max()) { ALOGE("Cannot rec %s at size %zu (too big)", what, size); (void)session->shutdownAndWait(false); return BAD_VALUE; } - if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size); + if (status_t status = + session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size); status != OK) { - LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(), - statusToString(status).c_str()); + LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, + connection->fd.get(), statusToString(status).c_str()); return status; } - LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str()); + LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(), + hexString(data, size).c_str()); return OK; } -status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) { - RpcClientConnectionInit init{ +status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session) { + RpcOutgoingConnectionInit init{ .msg = RPC_CONNECTION_INIT_OKAY, }; - return rpcSend(fd, session, "connection init", &init, sizeof(init)); + return rpcSend(connection, session, "connection init", &init, sizeof(init)); } -status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) { - RpcClientConnectionInit init; - if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK) +status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session) { + RpcOutgoingConnectionInit init; + if (status_t status = rpcRec(connection, session, "connection init", &init, sizeof(init)); + status != OK) return status; static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY)); @@ -286,13 +326,14 @@ status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSes return OK; } -sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) { +sp<IBinder> RpcState::getRootObject(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session) { Parcel data; data.markForRpc(session); Parcel reply; - status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data, - session, &reply, 0); + status_t status = transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, + data, session, &reply, 0); if (status != OK) { ALOGE("Error getting root object: %s", statusToString(status).c_str()); return nullptr; @@ -301,14 +342,15 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSessi return reply.readStrongBinder(); } -status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session, - size_t* maxThreadsOut) { +status_t RpcState::getMaxThreads(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, size_t* maxThreadsOut) { Parcel data; data.markForRpc(session); Parcel reply; - status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, - data, session, &reply, 0); + status_t status = + transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, + data, session, &reply, 0); if (status != OK) { ALOGE("Error getting max threads: %s", statusToString(status).c_str()); return status; @@ -326,30 +368,26 @@ status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession> return OK; } -status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session, - int32_t* sessionIdOut) { +status_t RpcState::getSessionId(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, RpcAddress* sessionIdOut) { Parcel data; data.markForRpc(session); Parcel reply; - status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID, - data, session, &reply, 0); + status_t status = + transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID, + data, session, &reply, 0); if (status != OK) { ALOGE("Error getting session ID: %s", statusToString(status).c_str()); return status; } - int32_t sessionId; - status = reply.readInt32(&sessionId); - if (status != OK) return status; - - *sessionIdOut = sessionId; - return OK; + return sessionIdOut->readFromParcel(reply); } -status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder, uint32_t code, - const Parcel& data, const sp<RpcSession>& session, Parcel* reply, - uint32_t flags) { +status_t RpcState::transact(const sp<RpcSession::RpcConnection>& connection, + const sp<IBinder>& binder, uint32_t code, const Parcel& data, + const sp<RpcSession>& session, Parcel* reply, uint32_t flags) { if (!data.isForRpc()) { ALOGE("Refusing to send RPC with parcel not crafted for RPC"); return BAD_TYPE; @@ -363,12 +401,12 @@ status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder RpcAddress address = RpcAddress::zero(); if (status_t status = onBinderLeaving(session, binder, &address); status != OK) return status; - return transactAddress(fd, address, code, data, session, reply, flags); + return transactAddress(connection, address, code, data, session, reply, flags); } -status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& address, - uint32_t code, const Parcel& data, const sp<RpcSession>& session, - Parcel* reply, uint32_t flags) { +status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connection, + const RpcAddress& address, uint32_t code, const Parcel& data, + const sp<RpcSession>& session, Parcel* reply, uint32_t flags) { LOG_ALWAYS_FATAL_IF(!data.isForRpc()); LOG_ALWAYS_FATAL_IF(data.objectsCount() != 0); @@ -418,25 +456,25 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(), data.dataSize()); - if (status_t status = - rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size()); + if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(), + transactionData.size()); status != OK) // TODO(b/167966510): need to undo onBinderLeaving - we know the // refcount isn't successfully transferred. return status; if (flags & IBinder::FLAG_ONEWAY) { - LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get()); + LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get()); // Do not wait on result. // However, too many oneway calls may cause refcounts to build up and fill up the socket, // so process those. - return drainCommands(fd, session, CommandType::CONTROL_ONLY); + return drainCommands(connection, session, CommandType::CONTROL_ONLY); } LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction."); - return waitForReply(fd, session, reply); + return waitForReply(connection, session, reply); } static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize, @@ -448,17 +486,18 @@ static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize, LOG_ALWAYS_FATAL_IF(objectsCount, 0); } -status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session, - Parcel* reply) { +status_t RpcState::waitForReply(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, Parcel* reply) { RpcWireHeader command; while (true) { - if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command)); + if (status_t status = + rpcRec(connection, session, "command header", &command, sizeof(command)); status != OK) return status; if (command.command == RPC_COMMAND_REPLY) break; - if (status_t status = processServerCommand(fd, session, command, CommandType::ANY); + if (status_t status = processCommand(connection, session, command, CommandType::ANY); status != OK) return status; } @@ -466,7 +505,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& CommandData data(command.bodySize); if (!data.valid()) return NO_MEMORY; - if (status_t status = rpcRec(fd, session, "reply body", data.data(), command.bodySize); + if (status_t status = rpcRec(connection, session, "reply body", data.data(), command.bodySize); status != OK) return status; @@ -488,8 +527,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& return OK; } -status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, - const RpcAddress& addr) { +status_t RpcState::sendDecStrong(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const RpcAddress& addr) { { std::lock_guard<std::mutex> _l(mNodeMutex); if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races @@ -508,39 +547,42 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession> .command = RPC_COMMAND_DEC_STRONG, .bodySize = sizeof(RpcWireAddress), }; - if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK) + if (status_t status = rpcSend(connection, session, "dec ref header", &cmd, sizeof(cmd)); + status != OK) return status; - if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(), + if (status_t status = rpcSend(connection, session, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress)); status != OK) return status; return OK; } -status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session, - CommandType type) { - LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get()); +status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, CommandType type) { + LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get()); RpcWireHeader command; - if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command)); + if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command)); status != OK) return status; - return processServerCommand(fd, session, command, type); + return processCommand(connection, session, command, type); } -status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session, - CommandType type) { +status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, CommandType type) { uint8_t buf; - while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) { - status_t status = getAndExecuteCommand(fd, session, type); + while (0 < TEMP_FAILURE_RETRY( + recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) { + status_t status = getAndExecuteCommand(connection, session, type); if (status != OK) return status; } return OK; } -status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session, - const RpcWireHeader& command, CommandType type) { +status_t RpcState::processCommand(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const RpcWireHeader& command, + CommandType type) { IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull(); IPCThreadState::SpGuard spGuard{ .address = __builtin_frame_address(0), @@ -559,9 +601,9 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS switch (command.command) { case RPC_COMMAND_TRANSACT: if (type != CommandType::ANY) return BAD_TYPE; - return processTransact(fd, session, command); + return processTransact(connection, session, command); case RPC_COMMAND_DEC_STRONG: - return processDecStrong(fd, session, command); + return processDecStrong(connection, session, command); } // We should always know the version of the opposing side, and since the @@ -573,20 +615,20 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS (void)session->shutdownAndWait(false); return DEAD_OBJECT; } -status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session, - const RpcWireHeader& command) { +status_t RpcState::processTransact(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command); CommandData transactionData(command.bodySize); if (!transactionData.valid()) { return NO_MEMORY; } - if (status_t status = rpcRec(fd, session, "transaction body", transactionData.data(), + if (status_t status = rpcRec(connection, session, "transaction body", transactionData.data(), transactionData.size()); status != OK) return status; - return processTransactInternal(fd, session, std::move(transactionData)); + return processTransactInternal(connection, session, std::move(transactionData)); } static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize, @@ -598,7 +640,8 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d (void)objectsCount; } -status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session, +status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, CommandData transactionData) { // for 'recursive' calls to this, we have already read and processed the // binder from the transaction data and taken reference counts into account, @@ -617,6 +660,7 @@ processTransactInternalTailCall: // TODO(b/182939933): heap allocation just for lookup in mNodeForAddress, // maybe add an RpcAddress 'view' if the type remains 'heavy' auto addr = RpcAddress::fromRawEmbedded(&transaction->address); + bool oneway = transaction->flags & IBinder::FLAG_ONEWAY; status_t replyStatus = OK; sp<IBinder> target; @@ -645,7 +689,7 @@ processTransactInternalTailCall: addr.toString().c_str()); (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; - } else if (transaction->flags & IBinder::FLAG_ONEWAY) { + } else if (oneway) { std::unique_lock<std::mutex> _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it->second.binder.promote() != target) { @@ -702,7 +746,12 @@ processTransactInternalTailCall: data.markForRpc(session); if (target) { + bool origAllowNested = connection->allowNested; + connection->allowNested = !oneway; + replyStatus = target->transact(transaction->code, data, &reply, transaction->flags); + + connection->allowNested = origAllowNested; } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); @@ -713,13 +762,13 @@ processTransactInternalTailCall: } case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: { // for client connections, this should always report the value - // originally returned from the server - int32_t id = session->mId.value(); - replyStatus = reply.writeInt32(id); + // originally returned from the server, so this is asserting + // that it exists + replyStatus = session->mId.value().writeToParcel(&reply); break; } default: { - sp<RpcServer> server = session->server().promote(); + sp<RpcServer> server = session->server(); if (server) { switch (transaction->code) { case RPC_SPECIAL_TRANSACT_GET_ROOT: { @@ -738,7 +787,7 @@ processTransactInternalTailCall: } } - if (transaction->flags & IBinder::FLAG_ONEWAY) { + if (oneway) { if (replyStatus != OK) { ALOGW("Oneway call failed with error: %d", replyStatus); } @@ -811,11 +860,11 @@ processTransactInternalTailCall: memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(), reply.dataSize()); - return rpcSend(fd, session, "reply", replyData.data(), replyData.size()); + return rpcSend(connection, session, "reply", replyData.data(), replyData.size()); } -status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, - const RpcWireHeader& command) { +status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const RpcWireHeader& command) { LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command); CommandData commandData(command.bodySize); @@ -823,7 +872,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi return NO_MEMORY; } if (status_t status = - rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size()); + rpcRec(connection, session, "dec ref body", commandData.data(), commandData.size()); status != OK) return status; diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index 5bfef69c32..529dee534c 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -51,34 +51,37 @@ public: RpcState(); ~RpcState(); - status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session); - status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session); + status_t sendConnectionInit(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session); + status_t readConnectionInit(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session); // TODO(b/182940634): combine some special transactions into one "getServerInfo" call? - sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session); - status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session, - size_t* maxThreadsOut); - status_t getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session, - int32_t* sessionIdOut); - - [[nodiscard]] status_t transact(const base::unique_fd& fd, const sp<IBinder>& address, - uint32_t code, const Parcel& data, + sp<IBinder> getRootObject(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session); + status_t getMaxThreads(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, size_t* maxThreadsOut); + status_t getSessionId(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, RpcAddress* sessionIdOut); + + [[nodiscard]] status_t transact(const sp<RpcSession::RpcConnection>& connection, + const sp<IBinder>& address, uint32_t code, const Parcel& data, const sp<RpcSession>& session, Parcel* reply, uint32_t flags); - [[nodiscard]] status_t transactAddress(const base::unique_fd& fd, const RpcAddress& address, - uint32_t code, const Parcel& data, - const sp<RpcSession>& session, Parcel* reply, - uint32_t flags); - [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session, - const RpcAddress& address); + [[nodiscard]] status_t transactAddress(const sp<RpcSession::RpcConnection>& connection, + const RpcAddress& address, uint32_t code, + const Parcel& data, const sp<RpcSession>& session, + Parcel* reply, uint32_t flags); + [[nodiscard]] status_t sendDecStrong(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const RpcAddress& address); enum class CommandType { ANY, CONTROL_ONLY, }; - [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd, + [[nodiscard]] status_t getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, CommandType type); - [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session, - CommandType type); + [[nodiscard]] status_t drainCommands(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, CommandType type); /** * Called by Parcel for outgoing binders. This implies one refcount of @@ -133,22 +136,25 @@ private: size_t mSize; }; - [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session, - const char* what, const void* data, size_t size); - [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, - const char* what, void* data, size_t size); - - [[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session, - Parcel* reply); - [[nodiscard]] status_t processServerCommand(const base::unique_fd& fd, - const sp<RpcSession>& session, - const RpcWireHeader& command, CommandType type); - [[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session, + [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const char* what, + const void* data, size_t size); + [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const char* what, void* data, + size_t size); + + [[nodiscard]] status_t waitForReply(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, Parcel* reply); + [[nodiscard]] status_t processCommand(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, + const RpcWireHeader& command, CommandType type); + [[nodiscard]] status_t processTransact(const sp<RpcSession::RpcConnection>& connection, + const sp<RpcSession>& session, const RpcWireHeader& command); - [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd, + [[nodiscard]] status_t processTransactInternal(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, CommandData transactionData); - [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd, + [[nodiscard]] status_t processDecStrong(const sp<RpcSession::RpcConnection>& connection, const sp<RpcSession>& session, const RpcWireHeader& command); diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h index b5e5bc1e0f..2016483138 100644 --- a/libs/binder/RpcWireFormat.h +++ b/libs/binder/RpcWireFormat.h @@ -20,20 +20,26 @@ namespace android { #pragma clang diagnostic push #pragma clang diagnostic error "-Wpadded" -constexpr int32_t RPC_SESSION_ID_NEW = -1; - enum : uint8_t { RPC_CONNECTION_OPTION_REVERSE = 0x1, }; +constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_CREATED = 1 << 0; // distinguish from '0' address +constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_FOR_SERVER = 1 << 1; + +struct RpcWireAddress { + uint64_t options; + uint8_t address[32]; +}; + /** * This is sent to an RpcServer in order to request a new connection is created, * either as part of a new session or an existing session */ struct RpcConnectionHeader { - int32_t sessionId; + RpcWireAddress sessionId; uint8_t options; - uint8_t reserved[3]; + uint8_t reserved[7]; }; #define RPC_CONNECTION_INIT_OKAY "cci" @@ -43,7 +49,7 @@ struct RpcConnectionHeader { * transaction. The main use of this is in order to control the timing for when * a reverse connection is setup. */ -struct RpcClientConnectionInit { +struct RpcOutgoingConnectionInit { char msg[4]; uint8_t reserved[4]; }; @@ -89,10 +95,6 @@ struct RpcWireHeader { uint32_t reserved[2]; }; -struct RpcWireAddress { - uint8_t address[32]; -}; - struct RpcWireTransaction { RpcWireAddress address; uint32_t code; diff --git a/libs/binder/include/binder/RpcAddress.h b/libs/binder/include/binder/RpcAddress.h index 5a3f3a6afa..e428908c88 100644 --- a/libs/binder/include/binder/RpcAddress.h +++ b/libs/binder/include/binder/RpcAddress.h @@ -29,11 +29,7 @@ class Parcel; struct RpcWireAddress; /** - * This class represents an identifier of a binder object. - * - * The purpose of this class it to hide the ABI of an RpcWireAddress, and - * potentially allow us to change the size of it in the future (RpcWireAddress - * is PIMPL, essentially - although the type that is used here is not exposed). + * This class represents an identifier across an RPC boundary. */ class RpcAddress { public: @@ -46,9 +42,20 @@ public: bool isZero() const; /** - * Create a new address which is unique + * Create a new random address. + */ + static RpcAddress random(bool forServer); + + /** + * Whether this address was created with 'bool forServer' true + */ + bool isForServer() const; + + /** + * Whether this address is one that could be created with this version of + * libbinder. */ - static RpcAddress unique(); + bool isRecognizedType() const; /** * Creates a new address as a copy of an embedded object. diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index 4e6934b276..c8d2857347 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -17,6 +17,7 @@ #include <android-base/unique_fd.h> #include <binder/IBinder.h> +#include <binder/RpcAddress.h> #include <binder/RpcSession.h> #include <utils/Errors.h> #include <utils/RefBase.h> @@ -155,8 +156,8 @@ private: friend sp<RpcServer>; RpcServer(); - void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) override; - void onSessionServerThreadEnded() override; + void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override; + void onSessionIncomingThreadEnded() override; static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd); bool setupSocketServer(const RpcSocketAddress& address); @@ -171,8 +172,7 @@ private: std::map<std::thread::id, std::thread> mConnectingThreads; sp<IBinder> mRootObject; wp<IBinder> mRootObjectWeak; - std::map<int32_t, sp<RpcSession>> mSessions; - int32_t mSessionIdCounter = 0; + std::map<RpcAddress, sp<RpcSession>> mSessions; std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger; std::condition_variable mShutdownCv; }; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 4ddf422850..69c2a1a956 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -118,7 +118,11 @@ public: ~RpcSession(); - wp<RpcServer> server(); + /** + * Server if this session is created as part of a server (symmetrical to + * client servers). Otherwise, nullptr. + */ + sp<RpcServer> server(); // internal only const std::unique_ptr<RpcState>& state() { return mState; } @@ -170,14 +174,14 @@ private: class EventListener : public virtual RefBase { public: - virtual void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) = 0; - virtual void onSessionServerThreadEnded() = 0; + virtual void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0; + virtual void onSessionIncomingThreadEnded() = 0; }; class WaitForShutdownListener : public EventListener { public: - void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) override; - void onSessionServerThreadEnded() override; + void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override; + void onSessionIncomingThreadEnded() override; void waitForShutdown(std::unique_lock<std::mutex>& lock); private: @@ -191,6 +195,8 @@ private: // whether this or another thread is currently using this fd to make // or receive transactions. std::optional<pid_t> exclusiveTid; + + bool allowNested = false; }; status_t readId(); @@ -215,14 +221,14 @@ private: static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result); [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address); - [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId, - bool server); - [[nodiscard]] bool addClientConnection(base::unique_fd fd); + [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, + const RpcAddress& sessionId, bool server); + [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init); [[nodiscard]] bool setForServer(const wp<RpcServer>& server, const wp<RpcSession::EventListener>& eventListener, - int32_t sessionId); - sp<RpcConnection> assignServerToThisThread(base::unique_fd fd); - [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection); + const RpcAddress& sessionId); + sp<RpcConnection> assignIncomingConnectionToThisThread(base::unique_fd fd); + [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection); enum class ConnectionUse { CLIENT, @@ -237,7 +243,7 @@ private: ExclusiveConnection* connection); ~ExclusiveConnection(); - const base::unique_fd& fd() { return mConnection->fd; } + const sp<RpcConnection>& get() { return mConnection; } private: static void findConnection(pid_t tid, sp<RpcConnection>* exclusive, @@ -254,13 +260,13 @@ private: bool mReentrant = false; }; - // On the other side of a session, for each of mClientConnections here, there should - // be one of mServerConnections on the other side (and vice versa). + // On the other side of a session, for each of mOutgoingConnections here, there should + // be one of mIncomingConnections on the other side (and vice versa). // // For the simplest session, a single server with one client, you would // have: - // - the server has a single 'mServerConnections' and a thread listening on this - // - the client has a single 'mClientConnections' and makes calls to this + // - the server has a single 'mIncomingConnections' and a thread listening on this + // - the client has a single 'mOutgoingConnections' and makes calls to this // - here, when the client makes a call, the server can call back into it // (nested calls), but outside of this, the client will only ever read // calls from the server when it makes a call itself. @@ -272,8 +278,7 @@ private: sp<WaitForShutdownListener> mShutdownListener; // used for client sessions wp<EventListener> mEventListener; // mForServer if server, mShutdownListener if client - // TODO(b/183988761): this shouldn't be guessable - std::optional<int32_t> mId; + std::optional<RpcAddress> mId; std::unique_ptr<FdTrigger> mShutdownTrigger; @@ -286,9 +291,9 @@ private: std::condition_variable mAvailableConnectionCv; // for mWaitingThreads size_t mWaitingThreads = 0; // hint index into clients, ++ when sending an async transaction - size_t mClientConnectionsOffset = 0; - std::vector<sp<RpcConnection>> mClientConnections; - std::vector<sp<RpcConnection>> mServerConnections; + size_t mOutgoingConnectionsOffset = 0; + std::vector<sp<RpcConnection>> mOutgoingConnections; + std::vector<sp<RpcConnection>> mIncomingConnections; std::map<std::thread::id, std::thread> mThreads; }; diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl index b0c8b2d8b3..9e1078870c 100644 --- a/libs/binder/tests/IBinderRpcTest.aidl +++ b/libs/binder/tests/IBinderRpcTest.aidl @@ -55,6 +55,7 @@ interface IBinderRpcTest { oneway void sleepMsAsync(int ms); void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value); + oneway void doCallbackAsync(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value); void die(boolean cleanup); void scheduleShutdown(); diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index a79295a792..69682b0ae5 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -214,7 +214,8 @@ public: if (delayed) { std::thread([=]() { ALOGE("Executing delayed callback: '%s'", value.c_str()); - (void)doCallback(callback, oneway, false, value); + Status status = doCallback(callback, oneway, false, value); + ALOGE("Delayed callback status: '%s'", status.toString8().c_str()); }).detach(); return Status::ok(); } @@ -226,6 +227,11 @@ public: return callback->sendCallback(value); } + Status doCallbackAsync(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed, + const std::string& value) override { + return doCallback(callback, oneway, delayed, value); + } + Status die(bool cleanup) override { if (cleanup) { exit(1); @@ -978,31 +984,42 @@ TEST_P(BinderRpc, OnewayCallExhaustion) { TEST_P(BinderRpc, Callbacks) { const static std::string kTestString = "good afternoon!"; - for (bool oneway : {true, false}) { - for (bool delayed : {true, false}) { - auto proc = createRpcTestSocketServerProcess(1, 1, 1); - auto cb = sp<MyBinderRpcCallback>::make(); - - EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString)); - - using std::literals::chrono_literals::operator""s; - std::unique_lock<std::mutex> _l(cb->mMutex); - cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); }); - - EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed; - if (cb->mValues.empty()) continue; - EXPECT_EQ(cb->mValues.at(0), kTestString) - << "oneway: " << oneway << "delayed: " << delayed; - - // since we are severing the connection, we need to go ahead and - // tell the server to shutdown and exit so that waitpid won't hang - EXPECT_OK(proc.rootIface->scheduleShutdown()); - - // since this session has a reverse connection w/ a threadpool, we - // need to manually shut it down - EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); - - proc.expectAlreadyShutdown = true; + for (bool callIsOneway : {true, false}) { + for (bool callbackIsOneway : {true, false}) { + for (bool delayed : {true, false}) { + auto proc = createRpcTestSocketServerProcess(1, 1, 1); + auto cb = sp<MyBinderRpcCallback>::make(); + + if (callIsOneway) { + EXPECT_OK(proc.rootIface->doCallbackAsync(cb, callbackIsOneway, delayed, + kTestString)); + } else { + EXPECT_OK( + proc.rootIface->doCallback(cb, callbackIsOneway, delayed, kTestString)); + } + + using std::literals::chrono_literals::operator""s; + std::unique_lock<std::mutex> _l(cb->mMutex); + cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); }); + + EXPECT_EQ(cb->mValues.size(), 1) + << "callIsOneway: " << callIsOneway + << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed; + if (cb->mValues.empty()) continue; + EXPECT_EQ(cb->mValues.at(0), kTestString) + << "callIsOneway: " << callIsOneway + << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed; + + // since we are severing the connection, we need to go ahead and + // tell the server to shutdown and exit so that waitpid won't hang + EXPECT_OK(proc.rootIface->scheduleShutdown()); + + // since this session has a reverse connection w/ a threadpool, we + // need to manually shut it down + EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); + + proc.expectAlreadyShutdown = true; + } } } } |