diff options
-rw-r--r-- | libs/binder/RpcConnection.cpp | 28 | ||||
-rw-r--r-- | libs/binder/RpcServer.cpp | 77 | ||||
-rw-r--r-- | libs/binder/RpcWireFormat.h | 2 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcConnection.h | 12 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcServer.h | 5 | ||||
-rw-r--r-- | libs/binder/tests/IBinderRpcTest.aidl | 4 | ||||
-rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 152 |
7 files changed, 183 insertions, 97 deletions
diff --git a/libs/binder/RpcConnection.cpp b/libs/binder/RpcConnection.cpp index 95eba87b68..4b3a53ffe9 100644 --- a/libs/binder/RpcConnection.cpp +++ b/libs/binder/RpcConnection.cpp @@ -133,6 +133,21 @@ status_t RpcConnection::readId() { return OK; } +void RpcConnection::startThread(unique_fd client) { + std::lock_guard<std::mutex> _l(mSocketMutex); + sp<RpcConnection> holdThis = sp<RpcConnection>::fromExisting(this); + int fd = client.release(); + auto thread = std::thread([=] { + holdThis->join(unique_fd(fd)); + { + std::lock_guard<std::mutex> _l(holdThis->mSocketMutex); + size_t erased = mThreads.erase(std::this_thread::get_id()); + LOG_ALWAYS_FATAL_IF(erased != 0, "Could not erase thread."); + } + }); + mThreads[thread.get_id()] = std::move(thread); +} + void RpcConnection::join(unique_fd client) { // must be registered to allow arbitrary client code executing commands to // be able to do nested calls (we can't only read from it) @@ -164,7 +179,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { mClients.size()); } - if (!setupOneSocketClient(addr)) return false; + if (!setupOneSocketClient(addr, RPC_CONNECTION_ID_NEW)) return false; // TODO(b/185167543): we should add additional connections dynamically // instead of all at once. @@ -186,7 +201,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/185167543): avoid race w/ accept4 not being called on server for (size_t tries = 0; tries < 5; tries++) { - if (setupOneSocketClient(addr)) break; + if (setupOneSocketClient(addr, mId.value())) break; usleep(10000); } } @@ -194,7 +209,7 @@ bool RpcConnection::setupSocketClient(const RpcSocketAddress& addr) { return true; } -bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr) { +bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) { unique_fd serverFd( TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); if (serverFd == -1) { @@ -209,6 +224,13 @@ bool RpcConnection::setupOneSocketClient(const RpcSocketAddress& addr) { return false; } + if (sizeof(id) != TEMP_FAILURE_RETRY(write(serverFd.get(), &id, sizeof(id)))) { + int savedErrno = errno; + ALOGE("Could not write id to socket at %s: %s", addr.toString().c_str(), + strerror(savedErrno)); + return false; + } + LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); addClient(std::move(serverFd)); diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 5f024caf32..4df12ce2c6 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -126,40 +126,61 @@ void RpcServer::join() { { std::lock_guard<std::mutex> _l(mLock); LOG_ALWAYS_FATAL_IF(mServer.get() == -1, "RpcServer must be setup to join."); - // TODO(b/185167543): support more than one client at once - mConnection = RpcConnection::make(); - mConnection->setForServer(sp<RpcServer>::fromExisting(this), 42 /*placeholder id*/); - - mStarted = true; - for (size_t i = 0; i < mMaxThreads; i++) { - pool.push_back(std::thread([=] { - // TODO(b/185167543): do this dynamically, instead of from a static number - // of threads - unique_fd clientFd(TEMP_FAILURE_RETRY( - accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); - if (clientFd < 0) { - // If this log becomes confusing, should save more state from - // setupUnixDomainServer in order to output here. - ALOGE("Could not accept4 socket: %s", strerror(errno)); - return; - } - - LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); - - mConnection->join(std::move(clientFd)); - })); - } } - // TODO(b/185167543): don't waste extra thread for join, and combine threads - // between clients - for (auto& t : pool) t.join(); + while (true) { + unique_fd clientFd( + TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC))); + + if (clientFd < 0) { + ALOGE("Could not accept4 socket: %s", strerror(errno)); + continue; + } + LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get()); + + // TODO(b/183988761): cannot trust this simple ID + LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!"); + int32_t id; + if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) { + ALOGE("Could not read ID from fd %d", clientFd.get()); + continue; + } + + { + std::lock_guard<std::mutex> _l(mLock); + + sp<RpcConnection> connection; + if (id == RPC_CONNECTION_ID_NEW) { + // new client! + LOG_ALWAYS_FATAL_IF(mConnectionIdCounter >= INT32_MAX, "Out of connection IDs"); + mConnectionIdCounter++; + + connection = RpcConnection::make(); + connection->setForServer(wp<RpcServer>::fromExisting(this), mConnectionIdCounter); + + mConnections[mConnectionIdCounter] = connection; + } else { + auto it = mConnections.find(id); + if (it == mConnections.end()) { + ALOGE("Cannot add thread, no record of connection with ID %d", id); + continue; + } + connection = it->second; + } + + connection->startThread(std::move(clientFd)); + } + } } std::vector<sp<RpcConnection>> RpcServer::listConnections() { std::lock_guard<std::mutex> _l(mLock); - if (mConnection == nullptr) return {}; - return {mConnection}; + std::vector<sp<RpcConnection>> connections; + for (auto& [id, connection] : mConnections) { + (void)id; + connections.push_back(connection); + } + return connections; } bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) { diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h index 56af0d3ab5..a7e8a52eea 100644 --- a/libs/binder/RpcWireFormat.h +++ b/libs/binder/RpcWireFormat.h @@ -51,6 +51,8 @@ enum : uint32_t { RPC_SPECIAL_TRANSACT_GET_CONNECTION_ID = 2, }; +constexpr int32_t RPC_CONNECTION_ID_NEW = -1; + // serialization is like: // |RpcWireHeader|struct desginated by 'command'| (over and over again) diff --git a/libs/binder/include/binder/RpcConnection.h b/libs/binder/include/binder/RpcConnection.h index 7e31e8a4fc..87984d7ee9 100644 --- a/libs/binder/include/binder/RpcConnection.h +++ b/libs/binder/include/binder/RpcConnection.h @@ -21,7 +21,9 @@ #include <utils/Errors.h> #include <utils/RefBase.h> +#include <map> #include <optional> +#include <thread> #include <vector> // WARNING: This is a feature which is still in development, and it is subject @@ -113,6 +115,7 @@ private: status_t readId(); + void startThread(base::unique_fd client); void join(base::unique_fd client); struct ConnectionSocket : public RefBase { @@ -124,7 +127,7 @@ private: }; bool setupSocketClient(const RpcSocketAddress& address); - bool setupOneSocketClient(const RpcSocketAddress& address); + bool setupOneSocketClient(const RpcSocketAddress& address, int32_t connectionId); void addClient(base::unique_fd fd); void setForServer(const wp<RpcServer>& server, int32_t connectionId); sp<ConnectionSocket> assignServerToThisThread(base::unique_fd fd); @@ -179,11 +182,18 @@ private: std::unique_ptr<RpcState> mState; std::mutex mSocketMutex; // for all below + std::condition_variable mSocketCv; // for mWaitingThreads size_t mWaitingThreads = 0; size_t mClientsOffset = 0; // hint index into clients, ++ when sending an async transaction std::vector<sp<ConnectionSocket>> mClients; std::vector<sp<ConnectionSocket>> mServers; + + // TODO(b/185167543): use for reverse connections (allow client to also + // serve calls on a connection). + // TODO(b/185167543): allow sharing between different connections in a + // process? (or combine with mServers) + std::map<std::thread::id, std::thread> mThreads; }; } // namespace android diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index 5535d8a87c..81ea3a7e6b 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -97,6 +97,8 @@ public: /** * You must have at least one client connection before calling this. + * + * TODO(b/185167543): way to shut down? */ void join(); @@ -120,7 +122,8 @@ private: std::mutex mLock; // for below sp<IBinder> mRootObject; - sp<RpcConnection> mConnection; + std::map<int32_t, sp<RpcConnection>> mConnections; + int32_t mConnectionIdCounter = 0; }; } // namespace android diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl index 2bdb26483f..814e0945bc 100644 --- a/libs/binder/tests/IBinderRpcTest.aidl +++ b/libs/binder/tests/IBinderRpcTest.aidl @@ -18,8 +18,8 @@ interface IBinderRpcTest { oneway void sendString(@utf8InCpp String str); @utf8InCpp String doubleString(@utf8InCpp String str); - // number of known RPC binders to process, RpcState::countBinders - int countBinders(); + // number of known RPC binders to process, RpcState::countBinders by connection + int[] countBinders(); // Caller sends server, callee pings caller's server and returns error code. int pingMe(IBinder binder); diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index d23df8eaec..50bff91f71 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -88,24 +88,21 @@ public: *strstr = str + str; return Status::ok(); } - Status countBinders(int32_t* out) override { + Status countBinders(std::vector<int32_t>* out) override { sp<RpcServer> spServer = server.promote(); if (spServer == nullptr) { return Status::fromExceptionCode(Status::EX_NULL_POINTER); } - size_t count = 0; + out->clear(); for (auto connection : spServer->listConnections()) { - count += connection->state()->countBinders(); - } - // help debugging if we don't have one binder (this call is always made - // in this test when exactly one binder is held, which is held only to - // call this method - all other binders should be cleaned up) - if (count != 1) { - for (auto connection : spServer->listConnections()) { + size_t count = connection->state()->countBinders(); + if (count != 1) { + // this is called when there is only one binder held remaining, + // so to aid debugging connection->state()->dump(); } + out->push_back(count); } - *out = count; return Status::ok(); } Status pingMe(const sp<IBinder>& binder, int32_t* out) override { @@ -232,25 +229,33 @@ struct ProcessConnection { // reference to process hosting a socket server Process host; - // client connection object associated with other process - sp<RpcConnection> connection; + struct ConnectionInfo { + sp<RpcConnection> connection; + sp<IBinder> root; + }; - // pre-fetched root object - sp<IBinder> rootBinder; - - // whether connection should be invalidated by end of run - bool expectInvalid = false; + // client connection objects associated with other process + // each one represents a separate connection + std::vector<ConnectionInfo> connections; ProcessConnection(ProcessConnection&&) = default; ~ProcessConnection() { - rootBinder = nullptr; - EXPECT_NE(nullptr, connection); - EXPECT_NE(nullptr, connection->state()); - EXPECT_EQ(0, connection->state()->countBinders()) << (connection->state()->dump(), "dump:"); + for (auto& connection : connections) { + connection.root = nullptr; + } - wp<RpcConnection> weakConnection = connection; - connection = nullptr; - EXPECT_EQ(nullptr, weakConnection.promote()) << "Leaked connection"; + for (auto& info : connections) { + sp<RpcConnection>& connection = info.connection; + + EXPECT_NE(nullptr, connection); + EXPECT_NE(nullptr, connection->state()); + EXPECT_EQ(0, connection->state()->countBinders()) + << (connection->state()->dump(), "dump:"); + + wp<RpcConnection> weakConnection = connection; + connection = nullptr; + EXPECT_EQ(nullptr, weakConnection.promote()) << "Leaked connection"; + } } }; @@ -259,19 +264,25 @@ struct ProcessConnection { struct BinderRpcTestProcessConnection { ProcessConnection proc; - // pre-fetched root object + // pre-fetched root object (for first connection) sp<IBinder> rootBinder; - // pre-casted root object + // pre-casted root object (for first connection) sp<IBinderRpcTest> rootIface; + // whether connection should be invalidated by end of run + bool expectInvalid = false; + BinderRpcTestProcessConnection(BinderRpcTestProcessConnection&&) = default; ~BinderRpcTestProcessConnection() { - if (!proc.expectInvalid) { - int32_t remoteBinders = 0; - EXPECT_OK(rootIface->countBinders(&remoteBinders)); - // should only be the root binder object, iface - EXPECT_EQ(remoteBinders, 1); + if (!expectInvalid) { + std::vector<int32_t> remoteCounts; + // calling over any connections counts across all connections + EXPECT_OK(rootIface->countBinders(&remoteCounts)); + EXPECT_EQ(remoteCounts.size(), proc.connections.size()); + for (auto remoteCount : remoteCounts) { + EXPECT_EQ(remoteCount, 1); + } } rootIface = nullptr; @@ -306,7 +317,10 @@ public: // This creates a new process serving an interface on a certain number of // threads. ProcessConnection createRpcTestSocketServerProcess( - size_t numThreads, const std::function<void(const sp<RpcServer>&)>& configure) { + size_t numThreads, size_t numConnections, + const std::function<void(const sp<RpcServer>&)>& configure) { + CHECK_GE(numConnections, 1) << "Must have at least one connection to a server"; + SocketType socketType = GetParam(); std::string addr = allocateSocketAddress(); @@ -346,7 +360,6 @@ public: server->join(); }), - .connection = RpcConnection::make(), }; unsigned int inetPort = 0; @@ -356,35 +369,37 @@ public: CHECK_NE(0, inetPort); } - // create remainder of connections - for (size_t tries = 0; tries < 10; tries++) { - usleep(10000); - switch (socketType) { - case SocketType::UNIX: - if (ret.connection->setupUnixDomainClient(addr.c_str())) goto success; - break; + for (size_t i = 0; i < numConnections; i++) { + sp<RpcConnection> connection = RpcConnection::make(); + for (size_t tries = 0; tries < 10; tries++) { + usleep(10000); + switch (socketType) { + case SocketType::UNIX: + if (connection->setupUnixDomainClient(addr.c_str())) goto success; + break; #ifdef __BIONIC__ - case SocketType::VSOCK: - if (ret.connection->setupVsockClient(VMADDR_CID_LOCAL, vsockPort)) goto success; - break; + case SocketType::VSOCK: + if (connection->setupVsockClient(VMADDR_CID_LOCAL, vsockPort)) goto success; + break; #endif // __BIONIC__ - case SocketType::INET: - if (ret.connection->setupInetClient("127.0.0.1", inetPort)) goto success; - break; - default: - LOG_ALWAYS_FATAL("Unknown socket type"); + case SocketType::INET: + if (connection->setupInetClient("127.0.0.1", inetPort)) goto success; + break; + default: + LOG_ALWAYS_FATAL("Unknown socket type"); + } } + LOG_ALWAYS_FATAL("Could not connect"); + success: + ret.connections.push_back({connection, connection->getRootObject()}); } - LOG_ALWAYS_FATAL("Could not connect"); - success: - - ret.rootBinder = ret.connection->getRootObject(); return ret; } - BinderRpcTestProcessConnection createRpcTestSocketServerProcess(size_t numThreads) { + BinderRpcTestProcessConnection createRpcTestSocketServerProcess(size_t numThreads, + size_t numConnections = 1) { BinderRpcTestProcessConnection ret{ - .proc = createRpcTestSocketServerProcess(numThreads, + .proc = createRpcTestSocketServerProcess(numThreads, numConnections, [&](const sp<RpcServer>& server) { sp<MyBinderRpcTest> service = new MyBinderRpcTest; @@ -393,7 +408,7 @@ public: }), }; - ret.rootBinder = ret.proc.rootBinder; + ret.rootBinder = ret.proc.connections.at(0).root; ret.rootIface = interface_cast<IBinderRpcTest>(ret.rootBinder); return ret; @@ -401,16 +416,12 @@ public: }; TEST_P(BinderRpc, RootObjectIsNull) { - auto proc = createRpcTestSocketServerProcess(1, [](const sp<RpcServer>& server) { + auto proc = createRpcTestSocketServerProcess(1, 1, [](const sp<RpcServer>& server) { // this is the default, but to be explicit server->setRootObject(nullptr); }); - // retrieved by getRootObject when process is created above - EXPECT_EQ(nullptr, proc.rootBinder); - - // make sure we can retrieve it again (process doesn't crash) - EXPECT_EQ(nullptr, proc.connection->getRootObject()); + EXPECT_EQ(nullptr, proc.connections.at(0).root); } TEST_P(BinderRpc, Ping) { @@ -425,6 +436,14 @@ TEST_P(BinderRpc, GetInterfaceDescriptor) { EXPECT_EQ(IBinderRpcTest::descriptor, proc.rootBinder->getInterfaceDescriptor()); } +TEST_P(BinderRpc, MultipleConnections) { + auto proc = createRpcTestSocketServerProcess(1 /*threads*/, 5 /*connections*/); + for (auto connection : proc.proc.connections) { + ASSERT_NE(nullptr, connection.root); + EXPECT_EQ(OK, connection.root->pingBinder()); + } +} + TEST_P(BinderRpc, TransactionsMustBeMarkedRpc) { auto proc = createRpcTestSocketServerProcess(1); Parcel data; @@ -572,6 +591,15 @@ TEST_P(BinderRpc, CannotMixBindersBetweenUnrelatedSocketConnections) { proc1.rootIface->repeatBinder(proc2.rootBinder, &outBinder).transactionError()); } +TEST_P(BinderRpc, CannotMixBindersBetweenTwoConnectionsToTheSameServer) { + auto proc = createRpcTestSocketServerProcess(1 /*threads*/, 2 /*connections*/); + + sp<IBinder> outBinder; + EXPECT_EQ(INVALID_OPERATION, + proc.rootIface->repeatBinder(proc.proc.connections.at(1).root, &outBinder) + .transactionError()); +} + TEST_P(BinderRpc, CannotSendRegularBinderOverSocketBinder) { auto proc = createRpcTestSocketServerProcess(1); @@ -856,7 +884,7 @@ TEST_P(BinderRpc, Die) { EXPECT_EQ(DEAD_OBJECT, proc.rootIface->die(doDeathCleanup).transactionError()) << "Do death cleanup: " << doDeathCleanup; - proc.proc.expectInvalid = true; + proc.expectInvalid = true; } } |