diff options
author | 2021-07-16 16:42:26 +0000 | |
---|---|---|
committer | 2021-07-16 16:42:26 +0000 | |
commit | 1bbd2eef582ceba6e038b59e74cb5f84ffe69821 (patch) | |
tree | 792b0f139a28bddd7a891382d3979252a3aa2763 /libs/binder/RpcSession.cpp | |
parent | f94b148bbd15df6a74679f578527f4a404e973a1 (diff) | |
parent | 4313d7efe7c49a77c3d8c603377a58caa1fa256d (diff) |
Merge changes Ie1fc2d92,Ie66e92cc,I09a4520a,Ia13d0dc1
* changes:
binderRpcTest: ServerProcess opts in struct
libbinder: respect 'reverse' cncts are 'incoming'
libbinder: 'RpcSession::addIncomingConnection'
libbinder: send is non-blocking and interruptible
Diffstat (limited to 'libs/binder/RpcSession.cpp')
-rw-r--r-- | libs/binder/RpcSession.cpp | 99 |
1 files changed, 63 insertions, 36 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index ee5e8bb5bb..f637804dca 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -175,9 +175,11 @@ bool RpcSession::FdTrigger::isTriggered() { return mWrite == -1; } -status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { +status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { while (true) { - pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0}, + pollfd pfd[]{{.fd = fd.get(), + .events = static_cast<int16_t>(event | POLLHUP), + .revents = 0}, {.fd = mRead.get(), .events = POLLHUP, .revents = 0}}; int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); if (ret < 0) { @@ -189,10 +191,31 @@ status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) { if (pfd[1].revents & POLLHUP) { return -ECANCELED; } - return pfd[0].revents & POLLIN ? OK : DEAD_OBJECT; + return pfd[0].revents & event ? OK : DEAD_OBJECT; } } +status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data, + size_t size) { + const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); + const uint8_t* end = buffer + size; + + MAYBE_WAIT_IN_FLAKE_MODE; + + status_t status; + while ((status = triggerablePoll(fd, POLLOUT)) == OK) { + ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL)); + if (writeSize == 0) return DEAD_OBJECT; + + if (writeSize < 0) { + return -errno; + } + buffer += writeSize; + if (buffer == end) return OK; + } + return status; +} + status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data, size_t size) { uint8_t* buffer = reinterpret_cast<uint8_t*>(data); @@ -201,7 +224,7 @@ status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, voi MAYBE_WAIT_IN_FLAKE_MODE; status_t status; - while ((status = triggerablePollRead(fd)) == OK) { + while ((status = triggerablePoll(fd, POLLIN)) == OK) { ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL)); if (readSize == 0) return DEAD_OBJECT; // EOF @@ -330,7 +353,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { mOutgoingConnections.size()); } - if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false; + if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false; // TODO(b/189955605): we should add additional sessions dynamically // instead of all at once. @@ -351,7 +374,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // we've already setup one client for (size_t i = 0; i + 1 < numThreadsAvailable; i++) { // TODO(b/189955605): shutdown existing connections? - if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false; + if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false; } // TODO(b/189955605): we should add additional sessions dynamically @@ -361,14 +384,14 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) { // any requests at all. for (size_t i = 0; i < mMaxThreads; i++) { - if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false; + if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false; } return true; } bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id, - bool reverse) { + bool incoming) { for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); @@ -395,7 +418,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp RpcConnectionHeader header{.options = 0}; memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress)); - if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE; + if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING; if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) { int savedErrno = errno; @@ -406,33 +429,8 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); - if (reverse) { - std::mutex mutex; - std::condition_variable joinCv; - std::unique_lock<std::mutex> lock(mutex); - std::thread thread; - sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); - bool ownershipTransferred = false; - thread = std::thread([&]() { - std::unique_lock<std::mutex> threadLock(mutex); - unique_fd fd = std::move(serverFd); - // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) - sp<RpcSession> session = thiz; - session->preJoinThreadOwnership(std::move(thread)); - - // only continue once we have a response or the connection fails - auto setupResult = session->preJoinSetup(std::move(fd)); - - ownershipTransferred = true; - threadLock.unlock(); - joinCv.notify_one(); - // do not use & vars below - - RpcSession::join(std::move(session), std::move(setupResult)); - }); - joinCv.wait(lock, [&] { return ownershipTransferred; }); - LOG_ALWAYS_FATAL_IF(!ownershipTransferred); - return true; + if (incoming) { + return addIncomingConnection(std::move(serverFd)); } else { return addOutgoingConnection(std::move(serverFd), true); } @@ -442,6 +440,35 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp return false; } +bool RpcSession::addIncomingConnection(unique_fd fd) { + std::mutex mutex; + std::condition_variable joinCv; + std::unique_lock<std::mutex> lock(mutex); + std::thread thread; + sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this); + bool ownershipTransferred = false; + thread = std::thread([&]() { + std::unique_lock<std::mutex> threadLock(mutex); + unique_fd movedFd = std::move(fd); + // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) + sp<RpcSession> session = thiz; + session->preJoinThreadOwnership(std::move(thread)); + + // only continue once we have a response or the connection fails + auto setupResult = session->preJoinSetup(std::move(movedFd)); + + ownershipTransferred = true; + threadLock.unlock(); + joinCv.notify_one(); + // do not use & vars below + + RpcSession::join(std::move(session), std::move(setupResult)); + }); + joinCv.wait(lock, [&] { return ownershipTransferred; }); + LOG_ALWAYS_FATAL_IF(!ownershipTransferred); + return true; +} + bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { |