diff options
author | 2021-05-01 04:27:25 +0000 | |
---|---|---|
committer | 2021-05-04 01:08:59 +0000 | |
commit | 736664b58bbf466ae025b789a33645b989539eaf (patch) | |
tree | ce153be7aaedd4bc41a7f9bf99539de9bcc308ad /libs/binder/RpcServer.cpp | |
parent | 64117a8dfb5051cfc1399459a018773a6b544b88 (diff) |
libbinder: dynamically accept clients
Server listens on a single port and add clients.
The server looks like this:
while True:
accept client
read client id
if new id:
create new rpc connection
else:
attach thread to existing rpc connection
Roadmap:
- having client add connections only when needed (currently they are
all added at initialization time) - when this change is made, the
server will also need to enforce the max threads per client.
- allowing RpcConnection to create reverse connections with an
threadpool to serve calls in the other direction
- replacing connection IDs with something like TLS
- access controls for who can connect to who in pKVM context
Bug: 185167543
Test: binderRpcTest
Change-Id: I510d23a50cf839c39bc8107c1b0dae24dee3bc7b
Diffstat (limited to 'libs/binder/RpcServer.cpp')
-rw-r--r-- | libs/binder/RpcServer.cpp | 77 |
1 files changed, 49 insertions, 28 deletions
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 5f024caf32..4df12ce2c6 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -126,40 +126,61 @@ void RpcServer::join() { { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join."); - // TODO(b/185167543): support more than one client at once - mConnection = RpcConnection::make(); - mConnection->setForServer(sp<RpcServer>::fromExisting(this), 42 /*placeholder id*/); - - mStarted = true; - for (size_t i = 0; i < mMaxThreads; i++) { - pool.push_back(std::thread([=] { - // TODO(b/185167543): do this dynamically, instead of from a static number - // of threads - unique_fd clientFd(TEMP_FAILURE_RETRY( - accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); - if (clientFd < 0) { - // If this log becomes confusing, should save more state from - // setupUnixDomainServer in order to output here. - ALOGE("Could not accept4 socket: %s", strerror(errno)); - return; - } - - LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); - - mConnection->join(std::move(clientFd)); - })); - } } - // TODO(b/185167543): don't waste extra thread for join, and combine threads - // between clients - for (auto& t : pool) t.join(); + while (true) { + unique_fd clientFd( + TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); + + if (clientFd < 0) { + ALOGE("Could not accept4 socket: %s", strerror(errno)); + continue; + } + LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); + + // TODO(b/183988761): cannot trust this simple ID + LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); + int32_t id; + if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { + ALOGE("Could not read ID from fd %d", clientFd.get()); + continue; + } + + { + std::lock_guard<std::mutex> _l(mLock); + + sp<RpcConnection> connection; + if (id == RPC_CONNECTION_ID_NEW) { + // new client! + LOG_ALWAYS_FATAL_IF(mConnectionIdCounter >= INT32_MAX, "Out of connection IDs"); + mConnectionIdCounter++; + + connection = RpcConnection::make(); + connection->setForServer(wp<RpcServer>::fromExisting(this), mConnectionIdCounter); + + mConnections[mConnectionIdCounter] = connection; + } else { + auto it = mConnections.find(id); + if (it == mConnections.end()) { + ALOGE("Cannot add thread, no record of connection with ID %d", id); + continue; + } + connection = it->second; + } + + connection->startThread(std::move(clientFd)); + } + } } std::vector<sp<RpcConnection>> RpcServer::listConnections() { std::lock_guard<std::mutex> _l(mLock); - if (mConnection == nullptr) return {}; - return {mConnection}; + std::vector<sp<RpcConnection>> connections; + for (auto& [id, connection] : mConnections) { + (void)id; + connections.push_back(connection); + } + return connections; } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { |