summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Steven Moreland <smoreland@google.com> 2021-06-15 18:02:40 +0000
committer Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> 2021-06-15 18:02:40 +0000
commit58fb61efef5da8d60f2ce89152b7cc2ed00d5f12 (patch)
tree64b06f1bb2d93dab0799077cf613044e67951741
parentc6f1ee8027ebfe45e3e278ec5590e68a7541cd75 (diff)
parent2ce034f75bf2262af4fc751f087adcfe0c6919e1 (diff)
Merge changes I881f63b8,I4c5edd2d,I8ff0d29c,I6578c3a4,Iea286ab0, ... am: 2ce034f75b
Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1735011 Change-Id: I29254913a0b0ac090b42839d007ad6142077eb9f
-rw-r--r--libs/binder/RpcAddress.cpp27
-rw-r--r--libs/binder/RpcServer.cpp40
-rw-r--r--libs/binder/RpcSession.cpp135
-rw-r--r--libs/binder/RpcState.cpp237
-rw-r--r--libs/binder/RpcState.h70
-rw-r--r--libs/binder/RpcWireFormat.h20
-rw-r--r--libs/binder/include/binder/RpcAddress.h21
-rw-r--r--libs/binder/include/binder/RpcServer.h8
-rw-r--r--libs/binder/include/binder/RpcSession.h47
-rw-r--r--libs/binder/tests/IBinderRpcTest.aidl1
-rw-r--r--libs/binder/tests/binderRpcTest.cpp69
11 files changed, 409 insertions, 266 deletions
diff --git a/libs/binder/RpcAddress.cpp b/libs/binder/RpcAddress.cpp
index 5c3232045e..98dee9a039 100644
--- a/libs/binder/RpcAddress.cpp
+++ b/libs/binder/RpcAddress.cpp
@@ -29,7 +29,7 @@ RpcAddress RpcAddress::zero() {
}
bool RpcAddress::isZero() const {
- RpcWireAddress ZERO{0};
+ RpcWireAddress ZERO{.options = 0};
return memcmp(mRawAddr.get(), &ZERO, sizeof(RpcWireAddress)) == 0;
}
@@ -51,13 +51,34 @@ static void ReadRandomBytes(uint8_t* buf, size_t len) {
close(fd);
}
-RpcAddress RpcAddress::unique() {
+RpcAddress RpcAddress::random(bool forServer) {
+ // The remainder of this header acts as reserved space for different kinds
+ // of binder objects.
+ uint64_t options = RPC_WIRE_ADDRESS_OPTION_CREATED;
+
+ // servers and clients allocate addresses independently, so this bit can
+ // tell you where an address originates
+ if (forServer) options |= RPC_WIRE_ADDRESS_OPTION_FOR_SERVER;
+
RpcAddress ret;
- ReadRandomBytes((uint8_t*)ret.mRawAddr.get(), sizeof(RpcWireAddress));
+ RpcWireAddress* raw = ret.mRawAddr.get();
+
+ raw->options = options;
+ ReadRandomBytes(raw->address, sizeof(raw->address));
+
LOG_RPC_DETAIL("Creating new address: %s", ret.toString().c_str());
return ret;
}
+bool RpcAddress::isForServer() const {
+ return mRawAddr.get()->options & RPC_WIRE_ADDRESS_OPTION_FOR_SERVER;
+}
+
+bool RpcAddress::isRecognizedType() const {
+ uint64_t allKnownOptions = RPC_WIRE_ADDRESS_OPTION_CREATED | RPC_WIRE_ADDRESS_OPTION_FOR_SERVER;
+ return (mRawAddr.get()->options & ~allKnownOptions) == 0;
+}
+
RpcAddress RpcAddress::fromRawEmbedded(const RpcWireAddress* raw) {
RpcAddress addr;
memcpy(addr.mRawAddr.get(), raw, sizeof(RpcWireAddress));
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 60be406f6f..a8f3fa8f6f 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -270,14 +270,25 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
return;
}
- if (header.sessionId == RPC_SESSION_ID_NEW) {
+ RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId);
+
+ if (sessionId.isZero()) {
if (reverse) {
ALOGE("Cannot create a new session with a reverse connection, would leak");
return;
}
- LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs");
- server->mSessionIdCounter++;
+ RpcAddress sessionId = RpcAddress::zero();
+ size_t tries = 0;
+ do {
+ // don't block if there is some entropy issue
+ if (tries++ > 5) {
+ ALOGE("Cannot find new address: %s", sessionId.toString().c_str());
+ return;
+ }
+
+ sessionId = RpcAddress::random(true /*forServer*/);
+ } while (server->mSessions.end() != server->mSessions.find(sessionId));
session = RpcSession::make();
session->setMaxThreads(server->mMaxThreads);
@@ -285,23 +296,24 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
sp<RpcServer::EventListener>::fromExisting(
static_cast<RpcServer::EventListener*>(
server.get())),
- server->mSessionIdCounter)) {
+ sessionId)) {
ALOGE("Failed to attach server to session");
return;
}
- server->mSessions[server->mSessionIdCounter] = session;
+ server->mSessions[sessionId] = session;
} else {
- auto it = server->mSessions.find(header.sessionId);
+ auto it = server->mSessions.find(sessionId);
if (it == server->mSessions.end()) {
- ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId);
+ ALOGE("Cannot add thread, no record of session with ID %s",
+ sessionId.toString().c_str());
return;
}
session = it->second;
}
if (reverse) {
- LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)),
+ LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
"server state must already be initialized");
return;
}
@@ -350,19 +362,21 @@ bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
return true;
}
-void RpcServer::onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) {
+void RpcServer::onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) {
auto id = session->mId;
LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID");
- LOG_RPC_DETAIL("Dropping session %d", *id);
+ LOG_RPC_DETAIL("Dropping session with address %s", id->toString().c_str());
std::lock_guard<std::mutex> _l(mLock);
auto it = mSessions.find(*id);
- LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %d", *id);
- LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %d", *id);
+ LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s",
+ id->toString().c_str());
+ LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %s",
+ id->toString().c_str());
(void)mSessions.erase(it);
}
-void RpcServer::onSessionServerThreadEnded() {
+void RpcServer::onSessionIncomingThreadEnded() {
mShutdownCv.notify_all();
}
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index a759ae36c3..4f55eef2d1 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -51,7 +51,7 @@ RpcSession::~RpcSession() {
LOG_RPC_DETAIL("RpcSession destroyed %p", this);
std::lock_guard<std::mutex> _l(mMutex);
- LOG_ALWAYS_FATAL_IF(mServerConnections.size() != 0,
+ LOG_ALWAYS_FATAL_IF(mIncomingConnections.size() != 0,
"Should not be able to destroy a session with servers in use.");
}
@@ -61,10 +61,10 @@ sp<RpcSession> RpcSession::make() {
void RpcSession::setMaxThreads(size_t threads) {
std::lock_guard<std::mutex> _l(mMutex);
- LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(),
+ LOG_ALWAYS_FATAL_IF(!mOutgoingConnections.empty() || !mIncomingConnections.empty(),
"Must set max threads before setting up connections, but has %zu client(s) "
"and %zu server(s)",
- mClientConnections.size(), mServerConnections.size());
+ mOutgoingConnections.size(), mIncomingConnections.size());
mMaxThreads = threads;
}
@@ -100,7 +100,7 @@ bool RpcSession::addNullDebuggingClient() {
return false;
}
- return addClientConnection(std::move(serverFd));
+ return addOutgoingConnection(std::move(serverFd), false);
}
sp<IBinder> RpcSession::getRootObject() {
@@ -108,7 +108,7 @@ sp<IBinder> RpcSession::getRootObject() {
status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT, &connection);
if (status != OK) return nullptr;
- return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this));
+ return state()->getRootObject(connection.get(), sp<RpcSession>::fromExisting(this));
}
status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
@@ -116,7 +116,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT, &connection);
if (status != OK) return status;
- return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
+ return state()->getMaxThreads(connection.get(), sp<RpcSession>::fromExisting(this), maxThreads);
}
bool RpcSession::shutdownAndWait(bool wait) {
@@ -146,7 +146,7 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa
: ConnectionUse::CLIENT,
&connection);
if (status != OK) return status;
- return state()->transact(connection.fd(), binder, code, data,
+ return state()->transact(connection.get(), binder, code, data,
sp<RpcSession>::fromExisting(this), reply, flags);
}
@@ -155,7 +155,7 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) {
status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT_REFCOUNT, &connection);
if (status != OK) return status;
- return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
+ return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
}
std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
@@ -218,28 +218,27 @@ status_t RpcSession::readId() {
LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client.");
}
- int32_t id;
-
ExclusiveConnection connection;
status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
ConnectionUse::CLIENT, &connection);
if (status != OK) return status;
- status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
+ mId = RpcAddress::zero();
+ status = state()->getSessionId(connection.get(), sp<RpcSession>::fromExisting(this),
+ &mId.value());
if (status != OK) return status;
- LOG_RPC_DETAIL("RpcSession %p has id %d", this, id);
- mId = id;
+ LOG_RPC_DETAIL("RpcSession %p has id %s", this, mId->toString().c_str());
return OK;
}
-void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded(
+void RpcSession::WaitForShutdownListener::onSessionLockedAllIncomingThreadsEnded(
const sp<RpcSession>& session) {
(void)session;
mShutdown = true;
}
-void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() {
+void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() {
mCv.notify_all();
}
@@ -263,10 +262,9 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) {
RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
// must be registered to allow arbitrary client code executing commands to
// be able to do nested calls (we can't only read from it)
- sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));
+ sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd));
- status_t status =
- mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+ status_t status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this));
return PreJoinSetupResult{
.connection = std::move(connection),
@@ -279,7 +277,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
if (setupResult.status == OK) {
while (true) {
- status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
+ status_t status = session->state()->getAndExecuteCommand(connection, session,
RpcState::CommandType::ANY);
if (status != OK) {
LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
@@ -292,7 +290,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
statusToString(setupResult.status).c_str());
}
- LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
+ LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection),
"bad state: connection object guaranteed to be in list");
sp<RpcSession::EventListener> listener;
@@ -309,23 +307,28 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
session = nullptr;
if (listener != nullptr) {
- listener->onSessionServerThreadEnded();
+ listener->onSessionIncomingThreadEnded();
}
}
-wp<RpcServer> RpcSession::server() {
- return mForServer;
+sp<RpcServer> RpcSession::server() {
+ RpcServer* unsafeServer = mForServer.unsafe_get();
+ sp<RpcServer> server = mForServer.promote();
+
+ LOG_ALWAYS_FATAL_IF((unsafeServer == nullptr) != (server == nullptr),
+ "wp<> is to avoid strong cycle only");
+ return server;
}
bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
{
std::lock_guard<std::mutex> _l(mMutex);
- LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0,
+ LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0,
"Must only setup session once, but already has %zu clients",
- mClientConnections.size());
+ mOutgoingConnections.size());
}
- if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false;
+ if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false;
// TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once.
@@ -362,7 +365,8 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
return true;
}
-bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) {
+bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
+ bool reverse) {
for (size_t tries = 0; tries < 5; tries++) {
if (tries > 0) usleep(10000);
@@ -386,9 +390,9 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
return false;
}
- RpcConnectionHeader header{
- .sessionId = id,
- };
+ RpcConnectionHeader header{.options = 0};
+ memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
+
if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE;
if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
@@ -428,7 +432,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
return true;
} else {
- return addClientConnection(std::move(serverFd));
+ return addOutgoingConnection(std::move(serverFd), true);
}
}
@@ -436,7 +440,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
return false;
}
-bool RpcSession::addClientConnection(unique_fd fd) {
+bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
sp<RpcConnection> connection = sp<RpcConnection>::make();
{
std::lock_guard<std::mutex> _l(mMutex);
@@ -451,11 +455,13 @@ bool RpcSession::addClientConnection(unique_fd fd) {
connection->fd = std::move(fd);
connection->exclusiveTid = gettid();
- mClientConnections.push_back(connection);
+ mOutgoingConnections.push_back(connection);
}
- status_t status =
- mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+ status_t status = OK;
+ if (init) {
+ mState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this));
+ }
{
std::lock_guard<std::mutex> _l(mMutex);
@@ -466,7 +472,7 @@ bool RpcSession::addClientConnection(unique_fd fd) {
}
bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
- int32_t sessionId) {
+ const RpcAddress& sessionId) {
LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
LOG_ALWAYS_FATAL_IF(server == nullptr);
LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
@@ -482,25 +488,26 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene
return true;
}
-sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
+sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) {
std::lock_guard<std::mutex> _l(mMutex);
sp<RpcConnection> session = sp<RpcConnection>::make();
session->fd = std::move(fd);
session->exclusiveTid = gettid();
- mServerConnections.push_back(session);
+ mIncomingConnections.push_back(session);
return session;
}
-bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) {
+bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) {
std::lock_guard<std::mutex> _l(mMutex);
- if (auto it = std::find(mServerConnections.begin(), mServerConnections.end(), connection);
- it != mServerConnections.end()) {
- mServerConnections.erase(it);
- if (mServerConnections.size() == 0) {
+ if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection);
+ it != mIncomingConnections.end()) {
+ mIncomingConnections.erase(it);
+ if (mIncomingConnections.size() == 0) {
sp<EventListener> listener = mEventListener.promote();
if (listener) {
- listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this));
+ listener->onSessionLockedAllIncomingThreadsEnded(
+ sp<RpcSession>::fromExisting(this));
}
}
return true;
@@ -525,11 +532,11 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
// CHECK FOR DEDICATED CLIENT SOCKET
//
// A server/looper should always use a dedicated connection if available
- findConnection(tid, &exclusive, &available, session->mClientConnections,
- session->mClientConnectionsOffset);
+ findConnection(tid, &exclusive, &available, session->mOutgoingConnections,
+ session->mOutgoingConnectionsOffset);
// WARNING: this assumes a server cannot request its client to send
- // a transaction, as mServerConnections is excluded below.
+ // a transaction, as mIncomingConnections is excluded below.
//
// Imagine we have more than one thread in play, and a single thread
// sends a synchronous, then an asynchronous command. Imagine the
@@ -539,17 +546,31 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
// command. So, we move to considering the second available thread
// for subsequent calls.
if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
- session->mClientConnectionsOffset =
- (session->mClientConnectionsOffset + 1) % session->mClientConnections.size();
+ session->mOutgoingConnectionsOffset = (session->mOutgoingConnectionsOffset + 1) %
+ session->mOutgoingConnections.size();
}
- // USE SERVING SOCKET (for nested transaction)
- //
- // asynchronous calls cannot be nested
+ // USE SERVING SOCKET (e.g. nested transaction)
if (use != ConnectionUse::CLIENT_ASYNC) {
+ sp<RpcConnection> exclusiveIncoming;
// server connections are always assigned to a thread
- findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections,
- 0 /* index hint */);
+ findConnection(tid, &exclusiveIncoming, nullptr /*available*/,
+ session->mIncomingConnections, 0 /* index hint */);
+
+ // asynchronous calls cannot be nested, we currently allow ref count
+ // calls to be nested (so that you can use this without having extra
+ // threads). Note 'drainCommands' is used so that these ref counts can't
+ // build up.
+ if (exclusiveIncoming != nullptr) {
+ if (exclusiveIncoming->allowNested) {
+ // guaranteed to be processed as nested command
+ exclusive = exclusiveIncoming;
+ } else if (use == ConnectionUse::CLIENT_REFCOUNT && available == nullptr) {
+ // prefer available socket, but if we don't have one, don't
+ // wait for one
+ exclusive = exclusiveIncoming;
+ }
+ }
}
// if our thread is already using a connection, prioritize using that
@@ -563,16 +584,16 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
break;
}
- if (session->mClientConnections.size() == 0) {
+ if (session->mOutgoingConnections.size() == 0) {
ALOGE("Session has no client connections. This is required for an RPC server to make "
"any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server "
"connections: %zu",
- static_cast<int>(use), session->mServerConnections.size());
+ static_cast<int>(use), session->mIncomingConnections.size());
return WOULD_BLOCK;
}
LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
- session->mClientConnections.size(), session->mServerConnections.size());
+ session->mOutgoingConnections.size(), session->mIncomingConnections.size());
session->mAvailableConnectionCv.wait(_l);
}
session->mWaitingThreads--;
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 53eba5aea6..fd2eff6870 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -83,21 +83,45 @@ status_t RpcState::onBinderLeaving(const sp<RpcSession>& session, const sp<IBind
}
LOG_ALWAYS_FATAL_IF(isRpc, "RPC binder must have known address at this point");
- auto&& [it, inserted] = mNodeForAddress.insert({RpcAddress::unique(),
- BinderNode{
- .binder = binder,
- .timesSent = 1,
- .sentRef = binder,
- }});
- // TODO(b/182939933): better organization could avoid needing this log
- LOG_ALWAYS_FATAL_IF(!inserted);
-
- *outAddress = it->first;
- return OK;
+ bool forServer = session->server() != nullptr;
+
+ for (size_t tries = 0; tries < 5; tries++) {
+ auto&& [it, inserted] = mNodeForAddress.insert({RpcAddress::random(forServer),
+ BinderNode{
+ .binder = binder,
+ .timesSent = 1,
+ .sentRef = binder,
+ }});
+ if (inserted) {
+ *outAddress = it->first;
+ return OK;
+ }
+
+ // well, we don't have visibility into the header here, but still
+ static_assert(sizeof(RpcWireAddress) == 40, "this log needs updating");
+ ALOGW("2**256 is 1e77. If you see this log, you probably have some entropy issue, or maybe "
+ "you witness something incredible!");
+ }
+
+ ALOGE("Unable to create an address in order to send out %p", binder.get());
+ return WOULD_BLOCK;
}
status_t RpcState::onBinderEntering(const sp<RpcSession>& session, const RpcAddress& address,
sp<IBinder>* out) {
+ // ensure that: if we want to use addresses for something else in the future (for
+ // instance, allowing transitive binder sends), that we don't accidentally
+ // send those addresses to old server. Accidentally ignoring this in that
+ // case and considering the binder to be recognized could cause this
+ // process to accidentally proxy transactions for that binder. Of course,
+ // if we communicate with a binder, it could always be proxying
+ // information. However, we want to make sure that isn't done on accident
+ // by a client.
+ if (!address.isRecognizedType()) {
+ ALOGE("Address is of an unknown type, rejecting: %s", address.toString().c_str());
+ return BAD_VALUE;
+ }
+
std::unique_lock<std::mutex> _l(mNodeMutex);
if (mTerminated) return DEAD_OBJECT;
@@ -117,6 +141,14 @@ status_t RpcState::onBinderEntering(const sp<RpcSession>& session, const RpcAddr
return OK;
}
+ // we don't know about this binder, so the other side of the connection
+ // should have created it.
+ if (address.isForServer() == !!session->server()) {
+ ALOGE("Server received unrecognized address which we should own the creation of %s.",
+ address.toString().c_str());
+ return BAD_VALUE;
+ }
+
auto&& [it, inserted] = mNodeForAddress.insert({address, BinderNode{}});
LOG_ALWAYS_FATAL_IF(!inserted, "Failed to insert binder when creating proxy");
@@ -222,9 +254,11 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
mData.reset(new (std::nothrow) uint8_t[size]);
}
-status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
- const char* what, const void* data, size_t size) {
- LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
+status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const char* what, const void* data,
+ size_t size) {
+ LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
+ hexString(data, size).c_str());
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot send %s at size %zu (too big)", what, size);
@@ -232,12 +266,12 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& sess
return BAD_VALUE;
}
- ssize_t sent = TEMP_FAILURE_RETRY(send(fd.get(), data, size, MSG_NOSIGNAL));
+ ssize_t sent = TEMP_FAILURE_RETRY(send(connection->fd.get(), data, size, MSG_NOSIGNAL));
if (sent < 0 || sent != static_cast<ssize_t>(size)) {
int savedErrno = errno;
LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
- size, fd.get(), strerror(savedErrno));
+ size, connection->fd.get(), strerror(savedErrno));
(void)session->shutdownAndWait(false);
return -savedErrno;
@@ -246,35 +280,41 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& sess
return OK;
}
-status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
- const char* what, void* data, size_t size) {
+status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const char* what, void* data,
+ size_t size) {
if (size > std::numeric_limits<ssize_t>::max()) {
ALOGE("Cannot rec %s at size %zu (too big)", what, size);
(void)session->shutdownAndWait(false);
return BAD_VALUE;
}
- if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
+ if (status_t status =
+ session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
status != OK) {
- LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
- statusToString(status).c_str());
+ LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
+ connection->fd.get(), statusToString(status).c_str());
return status;
}
- LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
+ LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
+ hexString(data, size).c_str());
return OK;
}
-status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
- RpcClientConnectionInit init{
+status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session) {
+ RpcOutgoingConnectionInit init{
.msg = RPC_CONNECTION_INIT_OKAY,
};
- return rpcSend(fd, session, "connection init", &init, sizeof(init));
+ return rpcSend(connection, session, "connection init", &init, sizeof(init));
}
-status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
- RpcClientConnectionInit init;
- if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
+status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session) {
+ RpcOutgoingConnectionInit init;
+ if (status_t status = rpcRec(connection, session, "connection init", &init, sizeof(init));
+ status != OK)
return status;
static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
@@ -286,13 +326,14 @@ status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSes
return OK;
}
-sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
+sp<IBinder> RpcState::getRootObject(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session) {
Parcel data;
data.markForRpc(session);
Parcel reply;
- status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data,
- session, &reply, 0);
+ status_t status = transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT,
+ data, session, &reply, 0);
if (status != OK) {
ALOGE("Error getting root object: %s", statusToString(status).c_str());
return nullptr;
@@ -301,14 +342,15 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSessi
return reply.readStrongBinder();
}
-status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
- size_t* maxThreadsOut) {
+status_t RpcState::getMaxThreads(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, size_t* maxThreadsOut) {
Parcel data;
data.markForRpc(session);
Parcel reply;
- status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
- data, session, &reply, 0);
+ status_t status =
+ transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
+ data, session, &reply, 0);
if (status != OK) {
ALOGE("Error getting max threads: %s", statusToString(status).c_str());
return status;
@@ -326,30 +368,26 @@ status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>
return OK;
}
-status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
- int32_t* sessionIdOut) {
+status_t RpcState::getSessionId(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, RpcAddress* sessionIdOut) {
Parcel data;
data.markForRpc(session);
Parcel reply;
- status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
- data, session, &reply, 0);
+ status_t status =
+ transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
+ data, session, &reply, 0);
if (status != OK) {
ALOGE("Error getting session ID: %s", statusToString(status).c_str());
return status;
}
- int32_t sessionId;
- status = reply.readInt32(&sessionId);
- if (status != OK) return status;
-
- *sessionIdOut = sessionId;
- return OK;
+ return sessionIdOut->readFromParcel(reply);
}
-status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder, uint32_t code,
- const Parcel& data, const sp<RpcSession>& session, Parcel* reply,
- uint32_t flags) {
+status_t RpcState::transact(const sp<RpcSession::RpcConnection>& connection,
+ const sp<IBinder>& binder, uint32_t code, const Parcel& data,
+ const sp<RpcSession>& session, Parcel* reply, uint32_t flags) {
if (!data.isForRpc()) {
ALOGE("Refusing to send RPC with parcel not crafted for RPC");
return BAD_TYPE;
@@ -363,12 +401,12 @@ status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder
RpcAddress address = RpcAddress::zero();
if (status_t status = onBinderLeaving(session, binder, &address); status != OK) return status;
- return transactAddress(fd, address, code, data, session, reply, flags);
+ return transactAddress(connection, address, code, data, session, reply, flags);
}
-status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& address,
- uint32_t code, const Parcel& data, const sp<RpcSession>& session,
- Parcel* reply, uint32_t flags) {
+status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connection,
+ const RpcAddress& address, uint32_t code, const Parcel& data,
+ const sp<RpcSession>& session, Parcel* reply, uint32_t flags) {
LOG_ALWAYS_FATAL_IF(!data.isForRpc());
LOG_ALWAYS_FATAL_IF(data.objectsCount() != 0);
@@ -418,25 +456,25 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&
memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
data.dataSize());
- if (status_t status =
- rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
+ if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
+ transactionData.size());
status != OK)
// TODO(b/167966510): need to undo onBinderLeaving - we know the
// refcount isn't successfully transferred.
return status;
if (flags & IBinder::FLAG_ONEWAY) {
- LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get());
+ LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());
// Do not wait on result.
// However, too many oneway calls may cause refcounts to build up and fill up the socket,
// so process those.
- return drainCommands(fd, session, CommandType::CONTROL_ONLY);
+ return drainCommands(connection, session, CommandType::CONTROL_ONLY);
}
LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
- return waitForReply(fd, session, reply);
+ return waitForReply(connection, session, reply);
}
static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -448,17 +486,18 @@ static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize,
LOG_ALWAYS_FATAL_IF(objectsCount, 0);
}
-status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
- Parcel* reply) {
+status_t RpcState::waitForReply(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, Parcel* reply) {
RpcWireHeader command;
while (true) {
- if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
+ if (status_t status =
+ rpcRec(connection, session, "command header", &command, sizeof(command));
status != OK)
return status;
if (command.command == RPC_COMMAND_REPLY) break;
- if (status_t status = processServerCommand(fd, session, command, CommandType::ANY);
+ if (status_t status = processCommand(connection, session, command, CommandType::ANY);
status != OK)
return status;
}
@@ -466,7 +505,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
CommandData data(command.bodySize);
if (!data.valid()) return NO_MEMORY;
- if (status_t status = rpcRec(fd, session, "reply body", data.data(), command.bodySize);
+ if (status_t status = rpcRec(connection, session, "reply body", data.data(), command.bodySize);
status != OK)
return status;
@@ -488,8 +527,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
return OK;
}
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
- const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const RpcAddress& addr) {
{
std::lock_guard<std::mutex> _l(mNodeMutex);
if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -508,39 +547,42 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>
.command = RPC_COMMAND_DEC_STRONG,
.bodySize = sizeof(RpcWireAddress),
};
- if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+ if (status_t status = rpcSend(connection, session, "dec ref header", &cmd, sizeof(cmd));
+ status != OK)
return status;
- if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+ if (status_t status = rpcSend(connection, session, "dec ref body", &addr.viewRawEmbedded(),
sizeof(RpcWireAddress));
status != OK)
return status;
return OK;
}
-status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
- CommandType type) {
- LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
+status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, CommandType type) {
+ LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());
RpcWireHeader command;
- if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
+ if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
status != OK)
return status;
- return processServerCommand(fd, session, command, type);
+ return processCommand(connection, session, command, type);
}
-status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
- CommandType type) {
+status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, CommandType type) {
uint8_t buf;
- while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
- status_t status = getAndExecuteCommand(fd, session, type);
+ while (0 < TEMP_FAILURE_RETRY(
+ recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+ status_t status = getAndExecuteCommand(connection, session, type);
if (status != OK) return status;
}
return OK;
}
-status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
- const RpcWireHeader& command, CommandType type) {
+status_t RpcState::processCommand(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const RpcWireHeader& command,
+ CommandType type) {
IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull();
IPCThreadState::SpGuard spGuard{
.address = __builtin_frame_address(0),
@@ -559,9 +601,9 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
switch (command.command) {
case RPC_COMMAND_TRANSACT:
if (type != CommandType::ANY) return BAD_TYPE;
- return processTransact(fd, session, command);
+ return processTransact(connection, session, command);
case RPC_COMMAND_DEC_STRONG:
- return processDecStrong(fd, session, command);
+ return processDecStrong(connection, session, command);
}
// We should always know the version of the opposing side, and since the
@@ -573,20 +615,20 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
(void)session->shutdownAndWait(false);
return DEAD_OBJECT;
}
-status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
- const RpcWireHeader& command) {
+status_t RpcState::processTransact(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const RpcWireHeader& command) {
LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command);
CommandData transactionData(command.bodySize);
if (!transactionData.valid()) {
return NO_MEMORY;
}
- if (status_t status = rpcRec(fd, session, "transaction body", transactionData.data(),
+ if (status_t status = rpcRec(connection, session, "transaction body", transactionData.data(),
transactionData.size());
status != OK)
return status;
- return processTransactInternal(fd, session, std::move(transactionData));
+ return processTransactInternal(connection, session, std::move(transactionData));
}
static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -598,7 +640,8 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d
(void)objectsCount;
}
-status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
+status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session,
CommandData transactionData) {
// for 'recursive' calls to this, we have already read and processed the
// binder from the transaction data and taken reference counts into account,
@@ -617,6 +660,7 @@ processTransactInternalTailCall:
// TODO(b/182939933): heap allocation just for lookup in mNodeForAddress,
// maybe add an RpcAddress 'view' if the type remains 'heavy'
auto addr = RpcAddress::fromRawEmbedded(&transaction->address);
+ bool oneway = transaction->flags & IBinder::FLAG_ONEWAY;
status_t replyStatus = OK;
sp<IBinder> target;
@@ -645,7 +689,7 @@ processTransactInternalTailCall:
addr.toString().c_str());
(void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
- } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
+ } else if (oneway) {
std::unique_lock<std::mutex> _l(mNodeMutex);
auto it = mNodeForAddress.find(addr);
if (it->second.binder.promote() != target) {
@@ -702,7 +746,12 @@ processTransactInternalTailCall:
data.markForRpc(session);
if (target) {
+ bool origAllowNested = connection->allowNested;
+ connection->allowNested = !oneway;
+
replyStatus = target->transact(transaction->code, data, &reply, transaction->flags);
+
+ connection->allowNested = origAllowNested;
} else {
LOG_RPC_DETAIL("Got special transaction %u", transaction->code);
@@ -713,13 +762,13 @@ processTransactInternalTailCall:
}
case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: {
// for client connections, this should always report the value
- // originally returned from the server
- int32_t id = session->mId.value();
- replyStatus = reply.writeInt32(id);
+ // originally returned from the server, so this is asserting
+ // that it exists
+ replyStatus = session->mId.value().writeToParcel(&reply);
break;
}
default: {
- sp<RpcServer> server = session->server().promote();
+ sp<RpcServer> server = session->server();
if (server) {
switch (transaction->code) {
case RPC_SPECIAL_TRANSACT_GET_ROOT: {
@@ -738,7 +787,7 @@ processTransactInternalTailCall:
}
}
- if (transaction->flags & IBinder::FLAG_ONEWAY) {
+ if (oneway) {
if (replyStatus != OK) {
ALOGW("Oneway call failed with error: %d", replyStatus);
}
@@ -811,11 +860,11 @@ processTransactInternalTailCall:
memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
reply.dataSize());
- return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
+ return rpcSend(connection, session, "reply", replyData.data(), replyData.size());
}
-status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
- const RpcWireHeader& command) {
+status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const RpcWireHeader& command) {
LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);
CommandData commandData(command.bodySize);
@@ -823,7 +872,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
return NO_MEMORY;
}
if (status_t status =
- rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size());
+ rpcRec(connection, session, "dec ref body", commandData.data(), commandData.size());
status != OK)
return status;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 5bfef69c32..529dee534c 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -51,34 +51,37 @@ public:
RpcState();
~RpcState();
- status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
- status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+ status_t sendConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session);
+ status_t readConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session);
// TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
- sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
- status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
- size_t* maxThreadsOut);
- status_t getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
- int32_t* sessionIdOut);
-
- [[nodiscard]] status_t transact(const base::unique_fd& fd, const sp<IBinder>& address,
- uint32_t code, const Parcel& data,
+ sp<IBinder> getRootObject(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session);
+ status_t getMaxThreads(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, size_t* maxThreadsOut);
+ status_t getSessionId(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, RpcAddress* sessionIdOut);
+
+ [[nodiscard]] status_t transact(const sp<RpcSession::RpcConnection>& connection,
+ const sp<IBinder>& address, uint32_t code, const Parcel& data,
const sp<RpcSession>& session, Parcel* reply, uint32_t flags);
- [[nodiscard]] status_t transactAddress(const base::unique_fd& fd, const RpcAddress& address,
- uint32_t code, const Parcel& data,
- const sp<RpcSession>& session, Parcel* reply,
- uint32_t flags);
- [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
- const RpcAddress& address);
+ [[nodiscard]] status_t transactAddress(const sp<RpcSession::RpcConnection>& connection,
+ const RpcAddress& address, uint32_t code,
+ const Parcel& data, const sp<RpcSession>& session,
+ Parcel* reply, uint32_t flags);
+ [[nodiscard]] status_t sendDecStrong(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const RpcAddress& address);
enum class CommandType {
ANY,
CONTROL_ONLY,
};
- [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd,
+ [[nodiscard]] status_t getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
const sp<RpcSession>& session, CommandType type);
- [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
- CommandType type);
+ [[nodiscard]] status_t drainCommands(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, CommandType type);
/**
* Called by Parcel for outgoing binders. This implies one refcount of
@@ -133,22 +136,25 @@ private:
size_t mSize;
};
- [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
- const char* what, const void* data, size_t size);
- [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
- const char* what, void* data, size_t size);
-
- [[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
- Parcel* reply);
- [[nodiscard]] status_t processServerCommand(const base::unique_fd& fd,
- const sp<RpcSession>& session,
- const RpcWireHeader& command, CommandType type);
- [[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
+ [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const char* what,
+ const void* data, size_t size);
+ [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, const char* what, void* data,
+ size_t size);
+
+ [[nodiscard]] status_t waitForReply(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session, Parcel* reply);
+ [[nodiscard]] status_t processCommand(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session,
+ const RpcWireHeader& command, CommandType type);
+ [[nodiscard]] status_t processTransact(const sp<RpcSession::RpcConnection>& connection,
+ const sp<RpcSession>& session,
const RpcWireHeader& command);
- [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
+ [[nodiscard]] status_t processTransactInternal(const sp<RpcSession::RpcConnection>& connection,
const sp<RpcSession>& session,
CommandData transactionData);
- [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
+ [[nodiscard]] status_t processDecStrong(const sp<RpcSession::RpcConnection>& connection,
const sp<RpcSession>& session,
const RpcWireHeader& command);
diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h
index b5e5bc1e0f..2016483138 100644
--- a/libs/binder/RpcWireFormat.h
+++ b/libs/binder/RpcWireFormat.h
@@ -20,20 +20,26 @@ namespace android {
#pragma clang diagnostic push
#pragma clang diagnostic error "-Wpadded"
-constexpr int32_t RPC_SESSION_ID_NEW = -1;
-
enum : uint8_t {
RPC_CONNECTION_OPTION_REVERSE = 0x1,
};
+constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_CREATED = 1 << 0; // distinguish from '0' address
+constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_FOR_SERVER = 1 << 1;
+
+struct RpcWireAddress {
+ uint64_t options;
+ uint8_t address[32];
+};
+
/**
* This is sent to an RpcServer in order to request a new connection is created,
* either as part of a new session or an existing session
*/
struct RpcConnectionHeader {
- int32_t sessionId;
+ RpcWireAddress sessionId;
uint8_t options;
- uint8_t reserved[3];
+ uint8_t reserved[7];
};
#define RPC_CONNECTION_INIT_OKAY "cci"
@@ -43,7 +49,7 @@ struct RpcConnectionHeader {
* transaction. The main use of this is in order to control the timing for when
* a reverse connection is setup.
*/
-struct RpcClientConnectionInit {
+struct RpcOutgoingConnectionInit {
char msg[4];
uint8_t reserved[4];
};
@@ -89,10 +95,6 @@ struct RpcWireHeader {
uint32_t reserved[2];
};
-struct RpcWireAddress {
- uint8_t address[32];
-};
-
struct RpcWireTransaction {
RpcWireAddress address;
uint32_t code;
diff --git a/libs/binder/include/binder/RpcAddress.h b/libs/binder/include/binder/RpcAddress.h
index 5a3f3a6afa..e428908c88 100644
--- a/libs/binder/include/binder/RpcAddress.h
+++ b/libs/binder/include/binder/RpcAddress.h
@@ -29,11 +29,7 @@ class Parcel;
struct RpcWireAddress;
/**
- * This class represents an identifier of a binder object.
- *
- * The purpose of this class it to hide the ABI of an RpcWireAddress, and
- * potentially allow us to change the size of it in the future (RpcWireAddress
- * is PIMPL, essentially - although the type that is used here is not exposed).
+ * This class represents an identifier across an RPC boundary.
*/
class RpcAddress {
public:
@@ -46,9 +42,20 @@ public:
bool isZero() const;
/**
- * Create a new address which is unique
+ * Create a new random address.
+ */
+ static RpcAddress random(bool forServer);
+
+ /**
+ * Whether this address was created with 'bool forServer' true
+ */
+ bool isForServer() const;
+
+ /**
+ * Whether this address is one that could be created with this version of
+ * libbinder.
*/
- static RpcAddress unique();
+ bool isRecognizedType() const;
/**
* Creates a new address as a copy of an embedded object.
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 4e6934b276..c8d2857347 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -17,6 +17,7 @@
#include <android-base/unique_fd.h>
#include <binder/IBinder.h>
+#include <binder/RpcAddress.h>
#include <binder/RpcSession.h>
#include <utils/Errors.h>
#include <utils/RefBase.h>
@@ -155,8 +156,8 @@ private:
friend sp<RpcServer>;
RpcServer();
- void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) override;
- void onSessionServerThreadEnded() override;
+ void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
+ void onSessionIncomingThreadEnded() override;
static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
bool setupSocketServer(const RpcSocketAddress& address);
@@ -171,8 +172,7 @@ private:
std::map<std::thread::id, std::thread> mConnectingThreads;
sp<IBinder> mRootObject;
wp<IBinder> mRootObjectWeak;
- std::map<int32_t, sp<RpcSession>> mSessions;
- int32_t mSessionIdCounter = 0;
+ std::map<RpcAddress, sp<RpcSession>> mSessions;
std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
std::condition_variable mShutdownCv;
};
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 4ddf422850..69c2a1a956 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -118,7 +118,11 @@ public:
~RpcSession();
- wp<RpcServer> server();
+ /**
+ * Server if this session is created as part of a server (symmetrical to
+ * client servers). Otherwise, nullptr.
+ */
+ sp<RpcServer> server();
// internal only
const std::unique_ptr<RpcState>& state() { return mState; }
@@ -170,14 +174,14 @@ private:
class EventListener : public virtual RefBase {
public:
- virtual void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) = 0;
- virtual void onSessionServerThreadEnded() = 0;
+ virtual void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
+ virtual void onSessionIncomingThreadEnded() = 0;
};
class WaitForShutdownListener : public EventListener {
public:
- void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) override;
- void onSessionServerThreadEnded() override;
+ void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
+ void onSessionIncomingThreadEnded() override;
void waitForShutdown(std::unique_lock<std::mutex>& lock);
private:
@@ -191,6 +195,8 @@ private:
// whether this or another thread is currently using this fd to make
// or receive transactions.
std::optional<pid_t> exclusiveTid;
+
+ bool allowNested = false;
};
status_t readId();
@@ -215,14 +221,14 @@ private:
static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
[[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
- [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
- bool server);
- [[nodiscard]] bool addClientConnection(base::unique_fd fd);
+ [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
+ const RpcAddress& sessionId, bool server);
+ [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init);
[[nodiscard]] bool setForServer(const wp<RpcServer>& server,
const wp<RpcSession::EventListener>& eventListener,
- int32_t sessionId);
- sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
- [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
+ const RpcAddress& sessionId);
+ sp<RpcConnection> assignIncomingConnectionToThisThread(base::unique_fd fd);
+ [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
enum class ConnectionUse {
CLIENT,
@@ -237,7 +243,7 @@ private:
ExclusiveConnection* connection);
~ExclusiveConnection();
- const base::unique_fd& fd() { return mConnection->fd; }
+ const sp<RpcConnection>& get() { return mConnection; }
private:
static void findConnection(pid_t tid, sp<RpcConnection>* exclusive,
@@ -254,13 +260,13 @@ private:
bool mReentrant = false;
};
- // On the other side of a session, for each of mClientConnections here, there should
- // be one of mServerConnections on the other side (and vice versa).
+ // On the other side of a session, for each of mOutgoingConnections here, there should
+ // be one of mIncomingConnections on the other side (and vice versa).
//
// For the simplest session, a single server with one client, you would
// have:
- // - the server has a single 'mServerConnections' and a thread listening on this
- // - the client has a single 'mClientConnections' and makes calls to this
+ // - the server has a single 'mIncomingConnections' and a thread listening on this
+ // - the client has a single 'mOutgoingConnections' and makes calls to this
// - here, when the client makes a call, the server can call back into it
// (nested calls), but outside of this, the client will only ever read
// calls from the server when it makes a call itself.
@@ -272,8 +278,7 @@ private:
sp<WaitForShutdownListener> mShutdownListener; // used for client sessions
wp<EventListener> mEventListener; // mForServer if server, mShutdownListener if client
- // TODO(b/183988761): this shouldn't be guessable
- std::optional<int32_t> mId;
+ std::optional<RpcAddress> mId;
std::unique_ptr<FdTrigger> mShutdownTrigger;
@@ -286,9 +291,9 @@ private:
std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
size_t mWaitingThreads = 0;
// hint index into clients, ++ when sending an async transaction
- size_t mClientConnectionsOffset = 0;
- std::vector<sp<RpcConnection>> mClientConnections;
- std::vector<sp<RpcConnection>> mServerConnections;
+ size_t mOutgoingConnectionsOffset = 0;
+ std::vector<sp<RpcConnection>> mOutgoingConnections;
+ std::vector<sp<RpcConnection>> mIncomingConnections;
std::map<std::thread::id, std::thread> mThreads;
};
diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl
index b0c8b2d8b3..9e1078870c 100644
--- a/libs/binder/tests/IBinderRpcTest.aidl
+++ b/libs/binder/tests/IBinderRpcTest.aidl
@@ -55,6 +55,7 @@ interface IBinderRpcTest {
oneway void sleepMsAsync(int ms);
void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
+ oneway void doCallbackAsync(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
void die(boolean cleanup);
void scheduleShutdown();
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index a79295a792..69682b0ae5 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -214,7 +214,8 @@ public:
if (delayed) {
std::thread([=]() {
ALOGE("Executing delayed callback: '%s'", value.c_str());
- (void)doCallback(callback, oneway, false, value);
+ Status status = doCallback(callback, oneway, false, value);
+ ALOGE("Delayed callback status: '%s'", status.toString8().c_str());
}).detach();
return Status::ok();
}
@@ -226,6 +227,11 @@ public:
return callback->sendCallback(value);
}
+ Status doCallbackAsync(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed,
+ const std::string& value) override {
+ return doCallback(callback, oneway, delayed, value);
+ }
+
Status die(bool cleanup) override {
if (cleanup) {
exit(1);
@@ -978,31 +984,42 @@ TEST_P(BinderRpc, OnewayCallExhaustion) {
TEST_P(BinderRpc, Callbacks) {
const static std::string kTestString = "good afternoon!";
- for (bool oneway : {true, false}) {
- for (bool delayed : {true, false}) {
- auto proc = createRpcTestSocketServerProcess(1, 1, 1);
- auto cb = sp<MyBinderRpcCallback>::make();
-
- EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString));
-
- using std::literals::chrono_literals::operator""s;
- std::unique_lock<std::mutex> _l(cb->mMutex);
- cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
-
- EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed;
- if (cb->mValues.empty()) continue;
- EXPECT_EQ(cb->mValues.at(0), kTestString)
- << "oneway: " << oneway << "delayed: " << delayed;
-
- // since we are severing the connection, we need to go ahead and
- // tell the server to shutdown and exit so that waitpid won't hang
- EXPECT_OK(proc.rootIface->scheduleShutdown());
-
- // since this session has a reverse connection w/ a threadpool, we
- // need to manually shut it down
- EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
-
- proc.expectAlreadyShutdown = true;
+ for (bool callIsOneway : {true, false}) {
+ for (bool callbackIsOneway : {true, false}) {
+ for (bool delayed : {true, false}) {
+ auto proc = createRpcTestSocketServerProcess(1, 1, 1);
+ auto cb = sp<MyBinderRpcCallback>::make();
+
+ if (callIsOneway) {
+ EXPECT_OK(proc.rootIface->doCallbackAsync(cb, callbackIsOneway, delayed,
+ kTestString));
+ } else {
+ EXPECT_OK(
+ proc.rootIface->doCallback(cb, callbackIsOneway, delayed, kTestString));
+ }
+
+ using std::literals::chrono_literals::operator""s;
+ std::unique_lock<std::mutex> _l(cb->mMutex);
+ cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
+
+ EXPECT_EQ(cb->mValues.size(), 1)
+ << "callIsOneway: " << callIsOneway
+ << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed;
+ if (cb->mValues.empty()) continue;
+ EXPECT_EQ(cb->mValues.at(0), kTestString)
+ << "callIsOneway: " << callIsOneway
+ << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed;
+
+ // since we are severing the connection, we need to go ahead and
+ // tell the server to shutdown and exit so that waitpid won't hang
+ EXPECT_OK(proc.rootIface->scheduleShutdown());
+
+ // since this session has a reverse connection w/ a threadpool, we
+ // need to manually shut it down
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
+
+ proc.expectAlreadyShutdown = true;
+ }
}
}
}