diff options
| -rw-r--r-- | cmds/service/Android.bp | 18 | ||||
| -rw-r--r-- | cmds/service/service.cpp | 7 | ||||
| -rw-r--r-- | libs/binder/IServiceManager.cpp | 18 | ||||
| -rw-r--r-- | libs/binder/RpcSession.cpp | 25 | ||||
| -rw-r--r-- | libs/binder/ServiceManagerHost.cpp | 7 | ||||
| -rw-r--r-- | libs/binder/ServiceManagerHost.h | 5 | ||||
| -rw-r--r-- | libs/binder/include/binder/IServiceManager.h | 11 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcSession.h | 15 | ||||
| -rw-r--r-- | libs/binder/tests/binderHostDeviceTest.cpp | 4 | ||||
| -rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 38 |
10 files changed, 125 insertions, 23 deletions
diff --git a/cmds/service/Android.bp b/cmds/service/Android.bp index 3e8e3f67f8..21ac11b4cf 100644 --- a/cmds/service/Android.bp +++ b/cmds/service/Android.bp @@ -52,3 +52,21 @@ cc_binary { "-Werror", ], } + +cc_binary_host { + name: "aservice", + + srcs: ["service.cpp"], + + shared_libs: [ + "libcutils", + "libutils", + "libbinder", + ], + + cflags: [ + "-DXP_UNIX", + "-Wall", + "-Werror", + ], +} diff --git a/cmds/service/service.cpp b/cmds/service/service.cpp index 18b6b58a9e..0b00c2da08 100644 --- a/cmds/service/service.cpp +++ b/cmds/service/service.cpp @@ -50,6 +50,7 @@ static String16 get_interface_name(sp<IBinder> service) { if (service != nullptr) { Parcel data, reply; + data.markForBinder(service); status_t err = service->transact(IBinder::INTERFACE_TRANSACTION, data, &reply); if (err == NO_ERROR) { return reply.readString16(); @@ -96,6 +97,9 @@ int main(int argc, char* const argv[]) #ifdef VENDORSERVICES ProcessState::initWithDriver("/dev/vndbinder"); #endif +#ifndef __ANDROID__ + setDefaultServiceManager(createRpcDelegateServiceManager({.maxOutgoingThreads = 1})); +#endif sp<IServiceManager> sm = defaultServiceManager(); fflush(stdout); if (sm == nullptr) { @@ -138,6 +142,7 @@ int main(int argc, char* const argv[]) int32_t code = atoi(argv[optind++]); if (service != nullptr && ifName.size() > 0) { Parcel data, reply; + data.markForBinder(service); // the interface name is first data.writeInterfaceToken(ifName); @@ -229,7 +234,7 @@ int main(int argc, char* const argv[]) int afd = ashmem_create_region("test", statbuf.st_size); void* ptr = mmap(NULL, statbuf.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, afd, 0); - read(fd, ptr, statbuf.st_size); + (void)read(fd, ptr, statbuf.st_size); close(fd); data.writeFileDescriptor(afd, true /* take ownership */); } else if (strcmp(argv[optind], "nfd") == 0) { diff --git a/libs/binder/IServiceManager.cpp b/libs/binder/IServiceManager.cpp index aff9e0d48d..81e61daae1 100644 --- a/libs/binder/IServiceManager.cpp +++ b/libs/binder/IServiceManager.cpp @@ -448,21 +448,27 @@ std::optional<IServiceManager::ConnectionInfo> ServiceManagerShim::getConnection // on-device service manager. class ServiceManagerHostShim : public ServiceManagerShim { public: - using ServiceManagerShim::ServiceManagerShim; + ServiceManagerHostShim(const sp<AidlServiceManager>& impl, + const RpcDelegateServiceManagerOptions& options) + : ServiceManagerShim(impl), mOptions(options) {} // ServiceManagerShim::getService is based on checkService, so no need to override it. sp<IBinder> checkService(const String16& name) const override { - return getDeviceService({String8(name).c_str()}); + return getDeviceService({String8(name).c_str()}, mOptions); } protected: // Override realGetService for ServiceManagerShim::waitForService. Status realGetService(const std::string& name, sp<IBinder>* _aidl_return) { - *_aidl_return = getDeviceService({"-g", name}); + *_aidl_return = getDeviceService({"-g", name}, mOptions); return Status::ok(); } + +private: + RpcDelegateServiceManagerOptions mOptions; }; -sp<IServiceManager> createRpcDelegateServiceManager() { - auto binder = getDeviceService({"manager"}); +sp<IServiceManager> createRpcDelegateServiceManager( + const RpcDelegateServiceManagerOptions& options) { + auto binder = getDeviceService({"manager"}, options); if (binder == nullptr) { ALOGE("getDeviceService(\"manager\") returns null"); return nullptr; @@ -472,7 +478,7 @@ sp<IServiceManager> createRpcDelegateServiceManager() { ALOGE("getDeviceService(\"manager\") returns non service manager"); return nullptr; } - return sp<ServiceManagerHostShim>::make(interface); + return sp<ServiceManagerHostShim>::make(interface, options); } #endif diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 486b67b62c..9eef3e8914 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -90,6 +90,20 @@ size_t RpcSession::getMaxIncomingThreads() { return mMaxIncomingThreads; } +void RpcSession::setMaxOutgoingThreads(size_t threads) { + std::lock_guard<std::mutex> _l(mMutex); + LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(), + "Must set max outgoing threads before setting up connections, but has %zu " + "client(s) and %zu server(s)", + mConnections.mOutgoing.size(), mConnections.mIncoming.size()); + mMaxOutgoingThreads = threads; +} + +size_t RpcSession::getMaxOutgoingThreads() { + std::lock_guard<std::mutex> _l(mMutex); + return mMaxOutgoingThreads; +} + bool RpcSession::setProtocolVersion(uint32_t version) { if (version >= RPC_WIRE_PROTOCOL_VERSION_NEXT && version != RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL) { @@ -473,6 +487,12 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< return status; } + size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads); + ALOGI_IF(outgoingThreads != numThreadsAvailable, + "Server hints client to start %zu outgoing threads, but client will only start %zu " + "because it is preconfigured to start at most %zu outgoing threads.", + numThreadsAvailable, outgoingThreads, mMaxOutgoingThreads); + // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once - the other side should be responsible for setting // up additional connections. We need to create at least one (unless 0 are @@ -480,7 +500,10 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector< // any requests at all. // we've already setup one client - for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { + LOG_RPC_DETAIL("RpcSession::setupClient() instantiating %zu outgoing (server max: %zu) and %zu " + "incoming threads", + outgoingThreads, numThreadsAvailable, mMaxIncomingThreads); + for (size_t i = 0; i + 1 < outgoingThreads; i++) { if (status_t status = connectAndInit(mId, false /*incoming*/); status != OK) return status; } diff --git a/libs/binder/ServiceManagerHost.cpp b/libs/binder/ServiceManagerHost.cpp index 27cc563adc..194254ac69 100644 --- a/libs/binder/ServiceManagerHost.cpp +++ b/libs/binder/ServiceManagerHost.cpp @@ -124,7 +124,8 @@ void cleanupCommandResult(const void* id, void* obj, void* /* cookie */) { } // namespace -sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs) { +sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs, + const RpcDelegateServiceManagerOptions& options) { std::vector<std::string> prefix{"adb", "shell", "servicedispatcher"}; serviceDispatcherArgs.insert(serviceDispatcherArgs.begin(), prefix.begin(), prefix.end()); @@ -158,6 +159,10 @@ sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs) { LOG_ALWAYS_FATAL_IF(!forwardResult->hostPort().has_value()); auto rpcSession = RpcSession::make(); + if (options.maxOutgoingThreads.has_value()) { + rpcSession->setMaxOutgoingThreads(*options.maxOutgoingThreads); + } + if (status_t status = rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort()); status != OK) { ALOGE("Unable to set up inet client on host port %u: %s", *forwardResult->hostPort(), diff --git a/libs/binder/ServiceManagerHost.h b/libs/binder/ServiceManagerHost.h index e59724c391..c5310dac20 100644 --- a/libs/binder/ServiceManagerHost.h +++ b/libs/binder/ServiceManagerHost.h @@ -21,11 +21,14 @@ namespace android { +struct RpcDelegateServiceManagerOptions; + // Get a service on device by running servicedispatcher with the given args, e.g. // getDeviceService({"foo"}); // Return nullptr on any error. // When the returned binder object is destroyed, remove adb forwarding and kills // the long-running servicedispatcher process. -sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs); +sp<IBinder> getDeviceService(std::vector<std::string>&& serviceDispatcherArgs, + const RpcDelegateServiceManagerOptions& options); } // namespace android diff --git a/libs/binder/include/binder/IServiceManager.h b/libs/binder/include/binder/IServiceManager.h index a48075dad1..240e3c2b26 100644 --- a/libs/binder/include/binder/IServiceManager.h +++ b/libs/binder/include/binder/IServiceManager.h @@ -188,7 +188,16 @@ bool checkPermission(const String16& permission, pid_t pid, uid_t uid, // // ... // } // Resources are cleaned up when the object is destroyed. -sp<IServiceManager> createRpcDelegateServiceManager(); +// +// For each returned binder object, at most |maxOutgoingThreads| outgoing threads are instantiated. +// Hence, only |maxOutgoingThreads| calls can be made simultaneously. Additional calls are blocked +// if there are |maxOutgoingThreads| ongoing calls. See RpcSession::setMaxOutgoingThreads. +// If |maxOutgoingThreads| is not set, default is |RpcSession::kDefaultMaxOutgoingThreads|. +struct RpcDelegateServiceManagerOptions { + std::optional<size_t> maxOutgoingThreads; +}; +sp<IServiceManager> createRpcDelegateServiceManager( + const RpcDelegateServiceManagerOptions& options); #endif } // namespace android diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index e64572b234..f5505da128 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -50,6 +50,8 @@ constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION = RPC_WIRE_PROTOCOL_VERSION_EXPERIM */ class RpcSession final : public virtual RefBase { public: + static constexpr size_t kDefaultMaxOutgoingThreads = 10; + // Create an RpcSession with default configuration (raw sockets). static sp<RpcSession> make(); @@ -72,6 +74,18 @@ public: size_t getMaxIncomingThreads(); /** + * Set the maximum number of outgoing threads allowed to be made. + * By default, this is |kDefaultMaxOutgoingThreads|. This must be called before setting up this + * connection as a client. + * + * This limits the number of outgoing threads on top of the remote peer setting. This RpcSession + * will only instantiate |min(maxOutgoingThreads, remoteMaxThreads)| outgoing threads, where + * |remoteMaxThreads| can be retrieved from the remote peer via |getRemoteMaxThreads()|. + */ + void setMaxOutgoingThreads(size_t threads); + size_t getMaxOutgoingThreads(); + + /** * By default, the minimum of the supported versions of the client and the * server will be used. Usually, this API should only be used for debugging. */ @@ -308,6 +322,7 @@ private: std::mutex mMutex; // for all below size_t mMaxIncomingThreads = 0; + size_t mMaxOutgoingThreads = kDefaultMaxOutgoingThreads; std::optional<uint32_t> mProtocolVersion; std::condition_variable mAvailableConnectionCv; // for mWaitingThreads diff --git a/libs/binder/tests/binderHostDeviceTest.cpp b/libs/binder/tests/binderHostDeviceTest.cpp index eec3b447de..464da60dde 100644 --- a/libs/binder/tests/binderHostDeviceTest.cpp +++ b/libs/binder/tests/binderHostDeviceTest.cpp @@ -65,7 +65,9 @@ MATCHER_P(StatusEq, expected, (negation ? "not " : "") + statusToString(expected void initHostRpcServiceManagerOnce() { static std::once_flag gSmOnce; - std::call_once(gSmOnce, [] { setDefaultServiceManager(createRpcDelegateServiceManager()); }); + std::call_once(gSmOnce, [] { + setDefaultServiceManager(createRpcDelegateServiceManager({.maxOutgoingThreads = 1})); + }); } // Test for host service manager. diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index 2011801f27..d68c6fffd1 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -481,6 +481,7 @@ public: size_t numThreads = 1; size_t numSessions = 1; size_t numIncomingConnections = 0; + size_t numOutgoingConnections = SIZE_MAX; }; static inline std::string PrintParamInfo(const testing::TestParamInfo<ParamType>& info) { @@ -614,6 +615,7 @@ public: for (const auto& session : sessions) { session->setMaxIncomingThreads(options.numIncomingConnections); + session->setMaxOutgoingThreads(options.numOutgoingConnections); switch (socketType) { case SocketType::PRECONNECTED: @@ -655,6 +657,9 @@ public: return ret; } + + void testThreadPoolOverSaturated(sp<IBinderRpcTest> iface, size_t numCalls, + size_t sleepMs = 500); }; TEST_P(BinderRpc, Ping) { @@ -996,28 +1001,39 @@ TEST_P(BinderRpc, ThreadPoolGreaterThanEqualRequested) { for (auto& t : ts) t.join(); } -TEST_P(BinderRpc, ThreadPoolOverSaturated) { - constexpr size_t kNumThreads = 10; - constexpr size_t kNumCalls = kNumThreads + 3; - constexpr size_t kSleepMs = 500; - - auto proc = createRpcTestSocketServerProcess({.numThreads = kNumThreads}); - +void BinderRpc::testThreadPoolOverSaturated(sp<IBinderRpcTest> iface, size_t numCalls, + size_t sleepMs) { size_t epochMsBefore = epochMillis(); std::vector<std::thread> ts; - for (size_t i = 0; i < kNumCalls; i++) { - ts.push_back(std::thread([&] { proc.rootIface->sleepMs(kSleepMs); })); + for (size_t i = 0; i < numCalls; i++) { + ts.push_back(std::thread([&] { iface->sleepMs(sleepMs); })); } for (auto& t : ts) t.join(); size_t epochMsAfter = epochMillis(); - EXPECT_GE(epochMsAfter, epochMsBefore + 2 * kSleepMs); + EXPECT_GE(epochMsAfter, epochMsBefore + 2 * sleepMs); // Potential flake, but make sure calls are handled in parallel. - EXPECT_LE(epochMsAfter, epochMsBefore + 3 * kSleepMs); + EXPECT_LE(epochMsAfter, epochMsBefore + 3 * sleepMs); +} + +TEST_P(BinderRpc, ThreadPoolOverSaturated) { + constexpr size_t kNumThreads = 10; + constexpr size_t kNumCalls = kNumThreads + 3; + auto proc = createRpcTestSocketServerProcess({.numThreads = kNumThreads}); + testThreadPoolOverSaturated(proc.rootIface, kNumCalls); +} + +TEST_P(BinderRpc, ThreadPoolLimitOutgoing) { + constexpr size_t kNumThreads = 20; + constexpr size_t kNumOutgoingConnections = 10; + constexpr size_t kNumCalls = kNumOutgoingConnections + 3; + auto proc = createRpcTestSocketServerProcess( + {.numThreads = kNumThreads, .numOutgoingConnections = kNumOutgoingConnections}); + testThreadPoolOverSaturated(proc.rootIface, kNumCalls); } TEST_P(BinderRpc, ThreadingStressTest) { |