summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libs/binder/RpcServer.cpp6
-rw-r--r--libs/binder/RpcSession.cpp170
-rw-r--r--libs/binder/RpcState.cpp40
-rw-r--r--libs/binder/RpcState.h6
-rw-r--r--libs/binder/RpcWireFormat.h16
-rw-r--r--libs/binder/include/binder/RpcSession.h34
-rw-r--r--libs/binder/tests/binderRpcTest.cpp8
7 files changed, 201 insertions, 79 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2d2eed2671..60be406f6f 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -307,13 +307,15 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
}
detachGuard.Disable();
- session->preJoin(std::move(thisThread));
+ session->preJoinThreadOwnership(std::move(thisThread));
}
+ auto setupResult = session->preJoinSetup(std::move(clientFd));
+
// avoid strong cycle
server = nullptr;
- RpcSession::join(std::move(session), std::move(clientFd));
+ RpcSession::join(std::move(session), std::move(setupResult));
}
bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index de9aa220f8..a759ae36c3 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -104,12 +104,18 @@ bool RpcSession::addNullDebuggingClient() {
}
sp<IBinder> RpcSession::getRootObject() {
- ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT);
+ ExclusiveConnection connection;
+ status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
+ ConnectionUse::CLIENT, &connection);
+ if (status != OK) return nullptr;
return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this));
}
status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
- ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT);
+ ExclusiveConnection connection;
+ status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
+ ConnectionUse::CLIENT, &connection);
+ if (status != OK) return status;
return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
@@ -133,16 +139,22 @@ bool RpcSession::shutdownAndWait(bool wait) {
status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags) {
- ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
- (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC
- : ConnectionUse::CLIENT);
+ ExclusiveConnection connection;
+ status_t status =
+ ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
+ (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC
+ : ConnectionUse::CLIENT,
+ &connection);
+ if (status != OK) return status;
return state()->transact(connection.fd(), binder, code, data,
sp<RpcSession>::fromExisting(this), reply, flags);
}
status_t RpcSession::sendDecStrong(const RpcAddress& address) {
- ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
- ConnectionUse::CLIENT_REFCOUNT);
+ ExclusiveConnection connection;
+ status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
+ ConnectionUse::CLIENT_REFCOUNT, &connection);
+ if (status != OK) return status;
return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
}
@@ -208,9 +220,12 @@ status_t RpcSession::readId() {
int32_t id;
- ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT);
- status_t status =
- state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
+ ExclusiveConnection connection;
+ status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
+ ConnectionUse::CLIENT, &connection);
+ if (status != OK) return status;
+
+ status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
if (status != OK) return status;
LOG_RPC_DETAIL("RpcSession %p has id %d", this, id);
@@ -236,7 +251,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::
}
}
-void RpcSession::preJoin(std::thread thread) {
+void RpcSession::preJoinThreadOwnership(std::thread thread) {
LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
{
@@ -245,20 +260,36 @@ void RpcSession::preJoin(std::thread thread) {
}
}
-void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
+RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
// must be registered to allow arbitrary client code executing commands to
// be able to do nested calls (we can't only read from it)
- sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
+ sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));
- while (true) {
- status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
- RpcState::CommandType::ANY);
+ status_t status =
+ mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
- if (error != OK) {
- LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
- statusToString(error).c_str());
- break;
+ return PreJoinSetupResult{
+ .connection = std::move(connection),
+ .status = status,
+ };
+}
+
+void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) {
+ sp<RpcConnection>& connection = setupResult.connection;
+
+ if (setupResult.status == OK) {
+ while (true) {
+ status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
+ RpcState::CommandType::ANY);
+ if (status != OK) {
+ LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
+ statusToString(status).c_str());
+ break;
+ }
}
+ } else {
+ ALOGE("Connection failed to init, closing with status %s",
+ statusToString(setupResult.status).c_str());
}
LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
@@ -381,14 +412,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
unique_fd fd = std::move(serverFd);
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
sp<RpcSession> session = thiz;
- session->preJoin(std::move(thread));
- ownershipTransferred = true;
- joinCv.notify_one();
+ session->preJoinThreadOwnership(std::move(thread));
+
+ // only continue once we have a response or the connection fails
+ auto setupResult = session->preJoinSetup(std::move(fd));
+ ownershipTransferred = true;
threadLock.unlock();
+ joinCv.notify_one();
// do not use & vars below
- RpcSession::join(std::move(session), std::move(fd));
+ RpcSession::join(std::move(session), std::move(setupResult));
});
joinCv.wait(lock, [&] { return ownershipTransferred; });
LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
@@ -403,20 +437,32 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
}
bool RpcSession::addClientConnection(unique_fd fd) {
- std::lock_guard<std::mutex> _l(mMutex);
+ sp<RpcConnection> connection = sp<RpcConnection>::make();
+ {
+ std::lock_guard<std::mutex> _l(mMutex);
+
+ // first client connection added, but setForServer not called, so
+ // initializaing for a client.
+ if (mShutdownTrigger == nullptr) {
+ mShutdownTrigger = FdTrigger::make();
+ mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
+ if (mShutdownTrigger == nullptr) return false;
+ }
- // first client connection added, but setForServer not called, so
- // initializaing for a client.
- if (mShutdownTrigger == nullptr) {
- mShutdownTrigger = FdTrigger::make();
- mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
- if (mShutdownTrigger == nullptr) return false;
+ connection->fd = std::move(fd);
+ connection->exclusiveTid = gettid();
+ mClientConnections.push_back(connection);
}
- sp<RpcConnection> session = sp<RpcConnection>::make();
- session->fd = std::move(fd);
- mClientConnections.push_back(session);
- return true;
+ status_t status =
+ mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+
+ {
+ std::lock_guard<std::mutex> _l(mMutex);
+ connection->exclusiveTid = std::nullopt;
+ }
+
+ return status == OK;
}
bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
@@ -462,13 +508,16 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) {
return false;
}
-RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& session,
- ConnectionUse use)
- : mSession(session) {
+status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, ConnectionUse use,
+ ExclusiveConnection* connection) {
+ connection->mSession = session;
+ connection->mConnection = nullptr;
+ connection->mReentrant = false;
+
pid_t tid = gettid();
- std::unique_lock<std::mutex> _l(mSession->mMutex);
+ std::unique_lock<std::mutex> _l(session->mMutex);
- mSession->mWaitingThreads++;
+ session->mWaitingThreads++;
while (true) {
sp<RpcConnection> exclusive;
sp<RpcConnection> available;
@@ -476,8 +525,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
// CHECK FOR DEDICATED CLIENT SOCKET
//
// A server/looper should always use a dedicated connection if available
- findConnection(tid, &exclusive, &available, mSession->mClientConnections,
- mSession->mClientConnectionsOffset);
+ findConnection(tid, &exclusive, &available, session->mClientConnections,
+ session->mClientConnectionsOffset);
// WARNING: this assumes a server cannot request its client to send
// a transaction, as mServerConnections is excluded below.
@@ -490,8 +539,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
// command. So, we move to considering the second available thread
// for subsequent calls.
if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
- mSession->mClientConnectionsOffset =
- (mSession->mClientConnectionsOffset + 1) % mSession->mClientConnections.size();
+ session->mClientConnectionsOffset =
+ (session->mClientConnectionsOffset + 1) % session->mClientConnections.size();
}
// USE SERVING SOCKET (for nested transaction)
@@ -499,33 +548,36 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
// asynchronous calls cannot be nested
if (use != ConnectionUse::CLIENT_ASYNC) {
// server connections are always assigned to a thread
- findConnection(tid, &exclusive, nullptr /*available*/, mSession->mServerConnections,
+ findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections,
0 /* index hint */);
}
// if our thread is already using a connection, prioritize using that
if (exclusive != nullptr) {
- mConnection = exclusive;
- mReentrant = true;
+ connection->mConnection = exclusive;
+ connection->mReentrant = true;
break;
} else if (available != nullptr) {
- mConnection = available;
- mConnection->exclusiveTid = tid;
+ connection->mConnection = available;
+ connection->mConnection->exclusiveTid = tid;
break;
}
- // TODO(b/185167543): this should return an error, rather than crash a
- // server
- // in regular binder, this would usually be a deadlock :)
- LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
- "Session has no client connections. This is required for an RPC server "
- "to make any non-nested (e.g. oneway or on another thread) calls.");
+ if (session->mClientConnections.size() == 0) {
+ ALOGE("Session has no client connections. This is required for an RPC server to make "
+ "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server "
+ "connections: %zu",
+ static_cast<int>(use), session->mServerConnections.size());
+ return WOULD_BLOCK;
+ }
LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
- mSession->mClientConnections.size(), mSession->mServerConnections.size());
- mSession->mAvailableConnectionCv.wait(_l);
+ session->mClientConnections.size(), session->mServerConnections.size());
+ session->mAvailableConnectionCv.wait(_l);
}
- mSession->mWaitingThreads--;
+ session->mWaitingThreads--;
+
+ return OK;
}
void RpcSession::ExclusiveConnection::findConnection(pid_t tid, sp<RpcConnection>* exclusive,
@@ -559,7 +611,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() {
// reentrant use of a connection means something less deep in the call stack
// is using this fd, and it retains the right to it. So, we don't give up
// exclusive ownership, and no thread is freed.
- if (!mReentrant) {
+ if (!mReentrant && mConnection != nullptr) {
std::unique_lock<std::mutex> _l(mSession->mMutex);
mConnection->exclusiveTid = std::nullopt;
if (mSession->mWaitingThreads > 0) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 6899981e83..53eba5aea6 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -265,6 +265,27 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi
return OK;
}
+status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
+ RpcClientConnectionInit init{
+ .msg = RPC_CONNECTION_INIT_OKAY,
+ };
+ return rpcSend(fd, session, "connection init", &init, sizeof(init));
+}
+
+status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
+ RpcClientConnectionInit init;
+ if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
+ return status;
+
+ static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
+ if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) {
+ ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)),
+ init.msg);
+ return BAD_VALUE;
+ }
+ return OK;
+}
+
sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
Parcel data;
data.markForRpc(session);
@@ -565,7 +586,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio
status != OK)
return status;
- return processTransactInternal(fd, session, std::move(transactionData), nullptr /*targetRef*/);
+ return processTransactInternal(fd, session, std::move(transactionData));
}
static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -578,7 +599,13 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d
}
status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
- CommandData transactionData, sp<IBinder>&& targetRef) {
+ CommandData transactionData) {
+ // for 'recursive' calls to this, we have already read and processed the
+ // binder from the transaction data and taken reference counts into account,
+ // so it is cached here.
+ sp<IBinder> targetRef;
+processTransactInternalTailCall:
+
if (transactionData.size() < sizeof(RpcWireTransaction)) {
ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
sizeof(RpcWireTransaction), transactionData.size());
@@ -751,13 +778,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
// - gotta go fast
auto& todo = const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top());
- CommandData nextData = std::move(todo.data);
- sp<IBinder> nextRef = std::move(todo.ref);
+ // reset up arguments
+ transactionData = std::move(todo.data);
+ targetRef = std::move(todo.ref);
it->second.asyncTodo.pop();
- _l.unlock();
- return processTransactInternal(fd, session, std::move(nextData),
- std::move(nextRef));
+ goto processTransactInternalTailCall;
}
}
return OK;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 13c31154eb..5bfef69c32 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -51,6 +51,9 @@ public:
RpcState();
~RpcState();
+ status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+ status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+
// TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -144,8 +147,7 @@ private:
const RpcWireHeader& command);
[[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
const sp<RpcSession>& session,
- CommandData transactionData,
- sp<IBinder>&& targetRef);
+ CommandData transactionData);
[[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
const sp<RpcSession>& session,
const RpcWireHeader& command);
diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h
index 649c1eeb8e..b5e5bc1e0f 100644
--- a/libs/binder/RpcWireFormat.h
+++ b/libs/binder/RpcWireFormat.h
@@ -26,12 +26,28 @@ enum : uint8_t {
RPC_CONNECTION_OPTION_REVERSE = 0x1,
};
+/**
+ * This is sent to an RpcServer in order to request a new connection is created,
+ * either as part of a new session or an existing session
+ */
struct RpcConnectionHeader {
int32_t sessionId;
uint8_t options;
uint8_t reserved[3];
};
+#define RPC_CONNECTION_INIT_OKAY "cci"
+
+/**
+ * Whenever a client connection is setup, this is sent as the initial
+ * transaction. The main use of this is in order to control the timing for when
+ * a reverse connection is setup.
+ */
+struct RpcClientConnectionInit {
+ char msg[4];
+ uint8_t reserved[4];
+};
+
enum : uint32_t {
/**
* follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 4650cf21ca..4ddf422850 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -185,13 +185,6 @@ private:
bool mShutdown = false;
};
- status_t readId();
-
- // transfer ownership of thread
- void preJoin(std::thread thread);
- // join on thread passed to preJoin
- static void join(sp<RpcSession>&& session, base::unique_fd client);
-
struct RpcConnection : public RefBase {
base::unique_fd fd;
@@ -200,6 +193,27 @@ private:
std::optional<pid_t> exclusiveTid;
};
+ status_t readId();
+
+ // A thread joining a server must always call these functions in order, and
+ // cleanup is only programmed once into join. These are in separate
+ // functions in order to allow for different locks to be taken during
+ // different parts of setup.
+ //
+ // transfer ownership of thread (usually done while a lock is taken on the
+ // structure which originally owns the thread)
+ void preJoinThreadOwnership(std::thread thread);
+ // pass FD to thread and read initial connection information
+ struct PreJoinSetupResult {
+ // Server connection object associated with this
+ sp<RpcConnection> connection;
+ // Status of setup
+ status_t status;
+ };
+ PreJoinSetupResult preJoinSetup(base::unique_fd fd);
+ // join on thread passed to preJoinThreadOwnership
+ static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
+
[[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
[[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
bool server);
@@ -216,10 +230,12 @@ private:
CLIENT_REFCOUNT,
};
- // RAII object for session connection
+ // Object representing exclusive access to a connection.
class ExclusiveConnection {
public:
- explicit ExclusiveConnection(const sp<RpcSession>& session, ConnectionUse use);
+ static status_t find(const sp<RpcSession>& session, ConnectionUse use,
+ ExclusiveConnection* connection);
+
~ExclusiveConnection();
const base::unique_fd& fd() { return mConnection->fd; }
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 82f8a3e273..a79295a792 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -1007,6 +1007,14 @@ TEST_P(BinderRpc, Callbacks) {
}
}
+TEST_P(BinderRpc, OnewayCallbackWithNoThread) {
+ auto proc = createRpcTestSocketServerProcess(1);
+ auto cb = sp<MyBinderRpcCallback>::make();
+
+ Status status = proc.rootIface->doCallback(cb, true /*oneway*/, false /*delayed*/, "anything");
+ EXPECT_EQ(WOULD_BLOCK, status.transactionError());
+}
+
TEST_P(BinderRpc, Die) {
for (bool doDeathCleanup : {true, false}) {
auto proc = createRpcTestSocketServerProcess(1);