diff options
author | 2021-04-24 01:54:26 +0000 | |
---|---|---|
committer | 2021-04-28 17:37:27 +0000 | |
commit | f137de90de786d4d794362ef324ae29ef12b0085 (patch) | |
tree | 8361674fd2a4cc3edcc7e7a88cd006eaa32469e8 | |
parent | 27f2ed6973774978dc123854fb389ad962ec08e5 (diff) |
libbinder: finalize connect/server APIs
Before, you needed to manually setup the required number of sockets on
the client and server sides of a connection and manually setup threads.
Now, you configure the thread count on RpcServer and call join once, and
on the client side, you connect once, and the connection figured out how
many connections it will make.
Now, we will be able to manage how these sockets/threads get setup
without affecting any client code in various tests.
So, a server looks like this:
sp<RpcServer> server = RpcServer::make();
// still until we are ready to open this up
server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
server->setMaxThreads(3 /* for example */);
// call this for each client (currently this must be setup in
// advance)
sp<RpcConnection> connection = server->addClientConnection();
// other server types are supported
if (!connection->setupInetServer(1234 /*some port*/)) .. error ..
// process requests for each client
server->join();
And a client looks like this:
sp<RpcConnection> connection = RpcConnection::make();
if (!connection->setupInetClient(/*some IP address*/, 1234 /*some port*/))
.. error ..
The above code will create 3 threads on the server serving 3 separate
socket connections that the client can use to make up to 3 simultaneous
sets of syncrhonous calls (this can't be shared because the sockets may
be needed for binder socket calls).
This means that each address (ip + port) in this case can server a single process.
Future considerations:
- if we wanted, we could dynamically setup this connection, so that
extra threads and sockets are only created as needed. This would be at
parity with binder, but also it opens up the possibility for later
errors. TODOs are added in the code for this.
- a single server should be able to share a threadpool between multiple
clients. Currently a new threadpool is created for each client.
- new client connections should be able to be setup dynamically.
Currently, once the threadpool is started, we don't support making
more connections, but we should.
Bug: 185167543
Test: binderRpcTest
Change-Id: I4c11ab64bf7c1c19ca67f6a1c4be21de52358a5c
-rw-r--r-- | libs/binder/RpcConnection.cpp | 51 | ||||
-rw-r--r-- | libs/binder/RpcServer.cpp | 49 | ||||
-rw-r--r-- | libs/binder/RpcState.cpp | 57 | ||||
-rw-r--r-- | libs/binder/RpcState.h | 2 | ||||
-rw-r--r-- | libs/binder/RpcWireFormat.h | 1 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcConnection.h | 24 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcServer.h | 30 | ||||
-rw-r--r-- | libs/binder/tests/binderRpcBenchmark.cpp | 4 | ||||
-rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 46 |
9 files changed, 187 insertions, 77 deletions
diff --git a/libs/binder/RpcConnection.cpp b/libs/binder/RpcConnection.cpp index 2502d1bf94..ee5f50868a 100644 --- a/libs/binder/RpcConnection.cpp +++ b/libs/binder/RpcConnection.cpp @@ -139,8 +139,8 @@ bool RpcConnection::setupUnixDomainServer(const char* path) { return setupSocketServer(UnixSocketAddress(path)); } -bool RpcConnection::addUnixDomainClient(const char* path) { - return addSocketClient(UnixSocketAddress(path)); +bool RpcConnection::setupUnixDomainClient(const char* path) { + return setupSocketClient(UnixSocketAddress(path)); } #ifdef __BIONIC__ @@ -171,8 +171,8 @@ bool RpcConnection::setupVsockServer(unsigned int port) { return setupSocketServer(VsockSocketAddress(kAnyCid, port)); } -bool RpcConnection::addVsockClient(unsigned int cid, unsigned int port) { - return addSocketClient(VsockSocketAddress(cid, port)); +bool RpcConnection::setupVsockClient(unsigned int cid, unsigned int port) { + return setupSocketClient(VsockSocketAddress(cid, port)); } #endif // __BIONIC__ @@ -240,12 +240,12 @@ bool RpcConnection::setupInetServer(unsigned int port, unsigned int* assignedPor return false; } -bool RpcConnection::addInetClient(const char* addr, unsigned int port) { +bool RpcConnection::setupInetClient(const char* addr, unsigned int port) { auto aiStart = GetAddrInfo(addr, port); if (aiStart == nullptr) return false; for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) { InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port); - if (addSocketClient(socketAddress)) return true; + if (setupSocketClient(socketAddress)) return true; } ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port); return false; @@ -268,6 +268,11 @@ sp<IBinder> RpcConnection::getRootObject() { return state()->getRootObject(socket.fd(), sp<RpcConnection>::fromExisting(this)); } +status_t RpcConnection::getMaxThreads(size_t* maxThreads) { + ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::CLIENT); + return state()->getMaxThreads(socket.fd(), sp<RpcConnection>::fromExisting(this), maxThreads); +} + status_t RpcConnection::transact(const RpcAddress& address, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags) { ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), @@ -348,7 +353,39 @@ bool RpcConnection::setupSocketServer(const SocketAddress& addr) { return true; } -bool RpcConnection::addSocketClient(const SocketAddress& addr) { +bool RpcConnection::setupSocketClient(const SocketAddress& addr) { + { + std::lock_guard<std::mutex> _l(mSocketMutex); + LOG_ALWAYS_FATAL_IF(mClients.size() != 0, + "Must only setup connection once, but already has %zu clients", + mClients.size()); + } + + if (!setupOneSocketClient(addr)) return false; + + // TODO(b/185167543): we should add additional connections dynamically + // instead of all at once. + // TODO(b/186470974): first risk of blocking + size_t numThreadsAvailable; + if (status_t status = getMaxThreads(&numThreadsAvailable); status != OK) { + ALOGE("Could not get max threads after initial connection to %s: %s", + addr.toString().c_str(), statusToString(status).c_str()); + return false; + } + + // we've already setup one client + for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { + // TODO(b/185167543): avoid race w/ accept4 not being called on server + for (size_t tries = 0; tries < 5; tries++) { + if (setupOneSocketClient(addr)) break; + usleep(10000); + } + } + + return true; +} + +bool RpcConnection::setupOneSocketClient(const SocketAddress& addr) { unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 9a0be921a7..8f2805fce4 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -19,6 +19,7 @@ #include <sys/socket.h> #include <sys/un.h> +#include <thread> #include <vector> #include <binder/Parcel.h> @@ -41,16 +42,19 @@ void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() mAgreedExperimental = true; } -sp<RpcConnection> RpcServer::addClientConnection() { - LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); - - auto connection = RpcConnection::make(); - connection->setForServer(sp<RpcServer>::fromExisting(this)); +void RpcServer::setMaxThreads(size_t threads) { + LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads"); { + // this lock should only ever be needed in the error case std::lock_guard<std::mutex> _l(mLock); - mConnections.push_back(connection); + LOG_ALWAYS_FATAL_IF(mConnections.size() > 0, + "Must specify max threads before creating a connection"); } - return connection; + mMaxThreads = threads; +} + +size_t RpcServer::getMaxThreads() { + return mMaxThreads; } void RpcServer::setRootObject(const sp<IBinder>& binder) { @@ -63,4 +67,35 @@ sp<IBinder> RpcServer::getRootObject() { return mRootObject; } +sp<RpcConnection> RpcServer::addClientConnection() { + LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); + + auto connection = RpcConnection::make(); + connection->setForServer(sp<RpcServer>::fromExisting(this)); + { + std::lock_guard<std::mutex> _l(mLock); + LOG_ALWAYS_FATAL_IF(mStarted, + "currently only supports adding client connections at creation time"); + mConnections.push_back(connection); + } + return connection; +} + +void RpcServer::join() { + std::vector<std::thread> pool; + { + std::lock_guard<std::mutex> _l(mLock); + mStarted = true; + for (const sp<RpcConnection>& connection : mConnections) { + for (size_t i = 0; i < mMaxThreads; i++) { + pool.push_back(std::thread([=] { connection->join(); })); + } + } + } + + // TODO(b/185167543): don't waste extra thread for join, and combine threads + // between clients + for (auto& t : pool) t.join(); +} + } // namespace android diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index d9341369fa..6bfcc42469 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -248,6 +248,31 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, return reply.readStrongBinder(); } +status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection, + size_t* maxThreads) { + Parcel data; + data.markForRpc(connection); + Parcel reply; + + status_t status = transact(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS, data, + connection, &reply, 0); + if (status != OK) { + ALOGE("Error getting max threads: %s", statusToString(status).c_str()); + return status; + } + + int32_t threads; + status = reply.readInt32(&threads); + if (status != OK) return status; + if (threads <= 0) { + ALOGE("Error invalid max threads: %d", threads); + return BAD_VALUE; + } + + *maxThreads = threads; + return OK; +} + status_t RpcState::transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code, const Parcel& data, const sp<RpcConnection>& connection, Parcel* reply, uint32_t flags) { @@ -516,23 +541,25 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, replyStatus = target->transact(transaction->code, data, &reply, transaction->flags); } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); - // special case for 'zero' address (special server commands) - switch (transaction->code) { - case RPC_SPECIAL_TRANSACT_GET_ROOT: { - sp<IBinder> root; - sp<RpcServer> server = connection->server().promote(); - if (server) { - root = server->getRootObject(); - } else { - ALOGE("Root object requested, but no server attached."); - } - replyStatus = reply.writeStrongBinder(root); - break; - } - default: { - replyStatus = UNKNOWN_TRANSACTION; + sp<RpcServer> server = connection->server().promote(); + if (server) { + // special case for 'zero' address (special server commands) + switch (transaction->code) { + case RPC_SPECIAL_TRANSACT_GET_ROOT: { + replyStatus = reply.writeStrongBinder(server->getRootObject()); + break; + } + case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: { + replyStatus = reply.writeInt32(server->getMaxThreads()); + break; + } + default: { + replyStatus = UNKNOWN_TRANSACTION; + } } + } else { + ALOGE("Special command sent, but no server object attached."); } } } diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h index f4f5151c73..1cfa406791 100644 --- a/libs/binder/RpcState.h +++ b/libs/binder/RpcState.h @@ -51,6 +51,8 @@ public: ~RpcState(); sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcConnection>& connection); + status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcConnection>& connection, + size_t* maxThreadsOut); [[nodiscard]] status_t transact(const base::unique_fd& fd, const RpcAddress& address, uint32_t code, const Parcel& data, diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h index 60ec6c91bf..cc7cacb32d 100644 --- a/libs/binder/RpcWireFormat.h +++ b/libs/binder/RpcWireFormat.h @@ -47,6 +47,7 @@ enum : uint32_t { */ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_ROOT = 0, + RPC_SPECIAL_TRANSACT_GET_MAX_THREADS = 1, }; // serialization is like: diff --git a/libs/binder/include/binder/RpcConnection.h b/libs/binder/include/binder/RpcConnection.h index 09aed13dd3..3a2d8e581d 100644 --- a/libs/binder/include/binder/RpcConnection.h +++ b/libs/binder/include/binder/RpcConnection.h @@ -59,7 +59,7 @@ public: * This should be called once per thread, matching 'join' in the remote * process. */ - [[nodiscard]] bool addUnixDomainClient(const char* path); + [[nodiscard]] bool setupUnixDomainClient(const char* path); #ifdef __BIONIC__ /** @@ -70,7 +70,7 @@ public: /** * Connects to an RPC server at the CVD & port. */ - [[nodiscard]] bool addVsockClient(unsigned int cvd, unsigned int port); + [[nodiscard]] bool setupVsockClient(unsigned int cvd, unsigned int port); #endif // __BIONIC__ /** @@ -87,7 +87,7 @@ public: /** * Connects to an RPC server at the given address and port. */ - [[nodiscard]] bool addInetClient(const char* addr, unsigned int port); + [[nodiscard]] bool setupInetClient(const char* addr, unsigned int port); /** * For debugging! @@ -104,16 +104,16 @@ public: */ sp<IBinder> getRootObject(); + /** + * Query the other side of the connection for the maximum number of threads + * it supports (maximum number of concurrent non-nested synchronous transactions) + */ + status_t getMaxThreads(size_t* maxThreads); + [[nodiscard]] status_t transact(const RpcAddress& address, uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags); [[nodiscard]] status_t sendDecStrong(const RpcAddress& address); - /** - * Adds a server thread accepting connections. Must be called after - * setup*Server. - */ - void join(); - ~RpcConnection(); void setForServer(const wp<RpcServer>& server); @@ -132,8 +132,11 @@ public: private: friend sp<RpcConnection>; + friend RpcServer; RpcConnection(); + void join(); + struct ConnectionSocket : public RefBase { base::unique_fd fd; @@ -143,7 +146,8 @@ private: }; bool setupSocketServer(const SocketAddress& address); - bool addSocketClient(const SocketAddress& address); + bool setupSocketClient(const SocketAddress& address); + bool setupOneSocketClient(const SocketAddress& address); void addClient(base::unique_fd&& fd); sp<ConnectionSocket> assignServerToThisThread(base::unique_fd&& fd); bool removeServerSocket(const sp<ConnectionSocket>& socket); diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index a665fad43a..9247128b95 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -40,25 +40,37 @@ public: void iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction(); /** - * Setup a static connection, when the number of clients are known. + * This must be called before adding a client connection. * - * Each call to this function corresponds to a different client, and clients - * each have their own threadpools. + * If this is not specified, this will be a single-threaded server. * - * TODO(b/167966510): support dynamic creation of connections/threads + * TODO(b/185167543): these are currently created per client, but these + * should be shared. */ - sp<RpcConnection> addClientConnection(); + void setMaxThreads(size_t threads); + size_t getMaxThreads(); /** * The root object can be retrieved by any client, without any * authentication. TODO(b/183988761) */ void setRootObject(const sp<IBinder>& binder); + sp<IBinder> getRootObject(); /** - * Root object set with setRootObject + * Setup a static connection, when the number of clients are known. + * + * Each call to this function corresponds to a different client, and clients + * each have their own threadpools. + * + * TODO(b/167966510): support dynamic creation of connections/threads */ - sp<IBinder> getRootObject(); + sp<RpcConnection> addClientConnection(); + + /** + * You must have at least one client connection before calling this. + */ + void join(); ~RpcServer(); @@ -67,8 +79,10 @@ private: RpcServer(); bool mAgreedExperimental = false; + bool mStarted = false; // TODO(b/185167543): support dynamically added clients + size_t mMaxThreads = 1; - std::mutex mLock; + std::mutex mLock; // for below sp<IBinder> mRootObject; std::vector<sp<RpcConnection>> mConnections; // per-client }; diff --git a/libs/binder/tests/binderRpcBenchmark.cpp b/libs/binder/tests/binderRpcBenchmark.cpp index 7c82226aef..b3282ffb18 100644 --- a/libs/binder/tests/binderRpcBenchmark.cpp +++ b/libs/binder/tests/binderRpcBenchmark.cpp @@ -127,12 +127,12 @@ int main(int argc, char** argv) { sp<RpcConnection> connection = server->addClientConnection(); CHECK(connection->setupUnixDomainServer(addr.c_str())); - connection->join(); + server->join(); }).detach(); for (size_t tries = 0; tries < 5; tries++) { usleep(10000); - if (gConnection->addUnixDomainClient(addr.c_str())) goto success; + if (gConnection->setupUnixDomainClient(addr.c_str())) goto success; } LOG(FATAL) << "Could not connect."; success: diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index ce69ea26a4..f3ec904d69 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -298,8 +298,6 @@ public: ProcessConnection createRpcTestSocketServerProcess( size_t numThreads, const std::function<void(const sp<RpcServer>&, const sp<RpcConnection>&)>& configure) { - CHECK_GT(numThreads, 0); - SocketType socketType = GetParam(); std::string addr = allocateSocketAddress(); @@ -312,6 +310,7 @@ public: sp<RpcServer> server = RpcServer::make(); server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction(); + server->setMaxThreads(numThreads); // server supporting one client on one socket sp<RpcConnection> connection = server->addClientConnection(); @@ -339,13 +338,7 @@ public: configure(server, connection); - // accept 'numThreads' connections - std::vector<std::thread> pool; - for (size_t i = 0; i + 1 < numThreads; i++) { - pool.push_back(std::thread([=] { connection->join(); })); - } - connection->join(); - for (auto& t : pool) t.join(); + server->join(); }), .connection = RpcConnection::make(), }; @@ -358,29 +351,26 @@ public: } // create remainder of connections - for (size_t i = 0; i < numThreads; i++) { - for (size_t tries = 0; tries < 5; tries++) { - usleep(10000); - switch (socketType) { - case SocketType::UNIX: - if (ret.connection->addUnixDomainClient(addr.c_str())) goto success; - break; + for (size_t tries = 0; tries < 10; tries++) { + usleep(10000); + switch (socketType) { + case SocketType::UNIX: + if (ret.connection->setupUnixDomainClient(addr.c_str())) goto success; + break; #ifdef __BIONIC__ - case SocketType::VSOCK: - if (ret.connection->addVsockClient(VMADDR_CID_LOCAL, vsockPort)) - goto success; - break; + case SocketType::VSOCK: + if (ret.connection->setupVsockClient(VMADDR_CID_LOCAL, vsockPort)) goto success; + break; #endif // __BIONIC__ - case SocketType::INET: - if (ret.connection->addInetClient("127.0.0.1", inetPort)) goto success; - break; - default: - LOG_ALWAYS_FATAL("Unknown socket type"); - } + case SocketType::INET: + if (ret.connection->setupInetClient("127.0.0.1", inetPort)) goto success; + break; + default: + LOG_ALWAYS_FATAL("Unknown socket type"); } - LOG_ALWAYS_FATAL("Could not connect"); - success:; } + LOG_ALWAYS_FATAL("Could not connect"); + success: ret.rootBinder = ret.connection->getRootObject(); return ret; |