summaryrefslogtreecommitdiff
path: root/libs/binder/RpcSession.cpp
diff options
context:
space:
mode:
author Steven Moreland <smoreland@google.com> 2021-07-16 16:42:26 +0000
committer Gerrit Code Review <noreply-gerritcodereview@google.com> 2021-07-16 16:42:26 +0000
commit1bbd2eef582ceba6e038b59e74cb5f84ffe69821 (patch)
tree792b0f139a28bddd7a891382d3979252a3aa2763 /libs/binder/RpcSession.cpp
parentf94b148bbd15df6a74679f578527f4a404e973a1 (diff)
parent4313d7efe7c49a77c3d8c603377a58caa1fa256d (diff)
Merge changes Ie1fc2d92,Ie66e92cc,I09a4520a,Ia13d0dc1
* changes: binderRpcTest: ServerProcess opts in struct libbinder: respect 'reverse' cncts are 'incoming' libbinder: 'RpcSession::addIncomingConnection' libbinder: send is non-blocking and interruptible
Diffstat (limited to 'libs/binder/RpcSession.cpp')
-rw-r--r--libs/binder/RpcSession.cpp99
1 files changed, 63 insertions, 36 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index ee5e8bb5bb..f637804dca 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -175,9 +175,11 @@ bool RpcSession::FdTrigger::isTriggered() {
return mWrite == -1;
}
-status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
+status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
while (true) {
- pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
+ pollfd pfd[]{{.fd = fd.get(),
+ .events = static_cast<int16_t>(event | POLLHUP),
+ .revents = 0},
{.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
if (ret < 0) {
@@ -189,10 +191,31 @@ status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
if (pfd[1].revents & POLLHUP) {
return -ECANCELED;
}
- return pfd[0].revents & POLLIN ? OK : DEAD_OBJECT;
+ return pfd[0].revents & event ? OK : DEAD_OBJECT;
}
}
+status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data,
+ size_t size) {
+ const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+ const uint8_t* end = buffer + size;
+
+ MAYBE_WAIT_IN_FLAKE_MODE;
+
+ status_t status;
+ while ((status = triggerablePoll(fd, POLLOUT)) == OK) {
+ ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
+ if (writeSize == 0) return DEAD_OBJECT;
+
+ if (writeSize < 0) {
+ return -errno;
+ }
+ buffer += writeSize;
+ if (buffer == end) return OK;
+ }
+ return status;
+}
+
status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
size_t size) {
uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
@@ -201,7 +224,7 @@ status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, voi
MAYBE_WAIT_IN_FLAKE_MODE;
status_t status;
- while ((status = triggerablePollRead(fd)) == OK) {
+ while ((status = triggerablePoll(fd, POLLIN)) == OK) {
ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
if (readSize == 0) return DEAD_OBJECT; // EOF
@@ -330,7 +353,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
mOutgoingConnections.size());
}
- if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false;
+ if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false;
// TODO(b/189955605): we should add additional sessions dynamically
// instead of all at once.
@@ -351,7 +374,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
// we've already setup one client
for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
// TODO(b/189955605): shutdown existing connections?
- if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
+ if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false;
}
// TODO(b/189955605): we should add additional sessions dynamically
@@ -361,14 +384,14 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
// any requests at all.
for (size_t i = 0; i < mMaxThreads; i++) {
- if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false;
+ if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false;
}
return true;
}
bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
- bool reverse) {
+ bool incoming) {
for (size_t tries = 0; tries < 5; tries++) {
if (tries > 0) usleep(10000);
@@ -395,7 +418,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
RpcConnectionHeader header{.options = 0};
memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
- if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE;
+ if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
int savedErrno = errno;
@@ -406,33 +429,8 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
- if (reverse) {
- std::mutex mutex;
- std::condition_variable joinCv;
- std::unique_lock<std::mutex> lock(mutex);
- std::thread thread;
- sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this);
- bool ownershipTransferred = false;
- thread = std::thread([&]() {
- std::unique_lock<std::mutex> threadLock(mutex);
- unique_fd fd = std::move(serverFd);
- // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
- sp<RpcSession> session = thiz;
- session->preJoinThreadOwnership(std::move(thread));
-
- // only continue once we have a response or the connection fails
- auto setupResult = session->preJoinSetup(std::move(fd));
-
- ownershipTransferred = true;
- threadLock.unlock();
- joinCv.notify_one();
- // do not use & vars below
-
- RpcSession::join(std::move(session), std::move(setupResult));
- });
- joinCv.wait(lock, [&] { return ownershipTransferred; });
- LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
- return true;
+ if (incoming) {
+ return addIncomingConnection(std::move(serverFd));
} else {
return addOutgoingConnection(std::move(serverFd), true);
}
@@ -442,6 +440,35 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
return false;
}
+bool RpcSession::addIncomingConnection(unique_fd fd) {
+ std::mutex mutex;
+ std::condition_variable joinCv;
+ std::unique_lock<std::mutex> lock(mutex);
+ std::thread thread;
+ sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this);
+ bool ownershipTransferred = false;
+ thread = std::thread([&]() {
+ std::unique_lock<std::mutex> threadLock(mutex);
+ unique_fd movedFd = std::move(fd);
+ // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
+ sp<RpcSession> session = thiz;
+ session->preJoinThreadOwnership(std::move(thread));
+
+ // only continue once we have a response or the connection fails
+ auto setupResult = session->preJoinSetup(std::move(movedFd));
+
+ ownershipTransferred = true;
+ threadLock.unlock();
+ joinCv.notify_one();
+ // do not use & vars below
+
+ RpcSession::join(std::move(session), std::move(setupResult));
+ });
+ joinCv.wait(lock, [&] { return ownershipTransferred; });
+ LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
+ return true;
+}
+
bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
sp<RpcConnection> connection = sp<RpcConnection>::make();
{