diff options
| author | 2021-08-09 23:24:59 +0000 | |
|---|---|---|
| committer | 2021-08-09 23:24:59 +0000 | |
| commit | 93b2cc9617f617da3b2406c1faeb73dfed40f023 (patch) | |
| tree | b80ab9de55eb138547560885928d6845f95b0d24 | |
| parent | bf5fdedf6f49c5dce1725699484b5a1a26761573 (diff) | |
| parent | f6d4229b65253d2ff91ecbc93105a8114d0d3584 (diff) | |
Merge changes from topic "binder_rpc_non_blocking"
* changes:
binder: RpcTranpsortCtx::newTransport Add FdTrigger arg.
binder: RpcSession handle post-connect
binder: RPC uses non-blocking sockets.
binder: Refactor: move FdTrigger to its own file / class.
binder: RpcSession initialize mShutdownTrigger earlier
| -rw-r--r-- | libs/binder/Android.bp | 1 | ||||
| -rw-r--r-- | libs/binder/FdTrigger.cpp | 62 | ||||
| -rw-r--r-- | libs/binder/FdTrigger.h | 56 | ||||
| -rw-r--r-- | libs/binder/RpcServer.cpp | 19 | ||||
| -rw-r--r-- | libs/binder/RpcSession.cpp | 169 | ||||
| -rw-r--r-- | libs/binder/RpcState.cpp | 8 | ||||
| -rw-r--r-- | libs/binder/RpcTransportRaw.cpp | 57 | ||||
| -rw-r--r-- | libs/binder/Utils.cpp | 16 | ||||
| -rw-r--r-- | libs/binder/Utils.h | 5 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcServer.h | 3 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcSession.h | 47 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcTransport.h | 47 |
12 files changed, 287 insertions, 203 deletions
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp index f34672ced1..34121d2bb2 100644 --- a/libs/binder/Android.bp +++ b/libs/binder/Android.bp @@ -104,6 +104,7 @@ cc_library { "BpBinder.cpp", "BufferedTextOutput.cpp", "Debug.cpp", + "FdTrigger.cpp", "IInterface.cpp", "IMemory.cpp", "IPCThreadState.cpp", diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp new file mode 100644 index 0000000000..e38ac63a21 --- /dev/null +++ b/libs/binder/FdTrigger.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "FdTrigger" +#include <log/log.h> + +#include <poll.h> + +#include <android-base/macros.h> + +#include "FdTrigger.h" +namespace android { + +std::unique_ptr<FdTrigger> FdTrigger::make() { + auto ret = std::make_unique<FdTrigger>(); + if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { + ALOGE("Could not create pipe %s", strerror(errno)); + return nullptr; + } + return ret; +} + +void FdTrigger::trigger() { + mWrite.reset(); +} + +bool FdTrigger::isTriggered() { + return mWrite == -1; +} + +status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { + while (true) { + pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, + {.fd = mRead.get(), .events = POLLHUP, .revents = 0}}; + int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); + if (ret < 0) { + return -errno; + } + if (ret == 0) { + continue; + } + if (pfd[1].revents & POLLHUP) { + return -ECANCELED; + } + return pfd[0].revents & event ? OK : DEAD_OBJECT; + } +} + +} // namespace android diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h new file mode 100644 index 0000000000..984e685ae1 --- /dev/null +++ b/libs/binder/FdTrigger.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> + +#include <android-base/unique_fd.h> +#include <utils/Errors.h> + +namespace android { + +/** This is not a pipe. */ +class FdTrigger { +public: + /** Returns nullptr for error case */ + static std::unique_ptr<FdTrigger> make(); + + /** + * Close the write end of the pipe so that the read end receives POLLHUP. + * Not threadsafe. + */ + void trigger(); + + /** + * Whether this has been triggered. + */ + bool isTriggered(); + + /** + * Poll for a read event. + * + * event - for pollfd + * + * Return: + * true - time to read! + * false - trigger happened + */ + status_t triggerablePoll(base::borrowed_fd fd, int16_t event); + +private: + base::unique_fd mWrite; + base::unique_fd mRead; +}; +} // namespace android diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 4fa99c0e45..a20445b7d7 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -29,6 +29,7 @@ #include <binder/RpcTransportRaw.h> #include <log/log.h> +#include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" #include "RpcWireFormat.h" @@ -156,7 +157,7 @@ void RpcServer::join() { LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; - mShutdownTrigger = RpcSession::FdTrigger::make(); + mShutdownTrigger = FdTrigger::make(); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler"); mCtx = mRpcTransportCtxFactory->newServerCtx(); @@ -167,7 +168,7 @@ void RpcServer::join() { status_t status; while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) { unique_fd clientFd(TEMP_FAILURE_RETRY( - accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC))); + accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC | SOCK_NONBLOCK))); if (clientFd < 0) { ALOGE("Could not accept4 socket: %s", strerror(errno)); @@ -259,7 +260,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie status_t status = OK; int clientFdForLog = clientFd.get(); - auto client = server->mCtx->newTransport(std::move(clientFd)); + auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get()); if (client == nullptr) { ALOGE("Dropping accept4()-ed socket because sslAccept fails"); status = DEAD_OBJECT; @@ -270,8 +271,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; if (status == OK) { - status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header, - sizeof(header)); + status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header, + sizeof(header)); if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); @@ -296,8 +297,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie .version = protocolVersion, }; - status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response, - sizeof(response)); + status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response, + sizeof(response)); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return @@ -387,8 +388,8 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) { LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str()); LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server."); - unique_fd serverFd( - TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); + unique_fd serverFd(TEMP_FAILURE_RETRY( + socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0))); if (serverFd == -1) { int savedErrno = errno; ALOGE("Could not create socket: %s", strerror(savedErrno)); diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index 107210b543..4c47005c7a 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -35,9 +35,11 @@ #include <jni.h> #include <utils/String8.h> +#include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" #include "RpcWireFormat.h" +#include "Utils.h" #ifdef __GLIBC__ extern "C" pid_t gettid(); @@ -133,12 +135,18 @@ status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_ fd = request(); if (!fd.ok()) return BAD_VALUE; } + if (auto res = setNonBlocking(fd); !res.ok()) { + ALOGE("setupPreconnectedClient: %s", res.error().message().c_str()); + return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code(); + } return initAndAddConnection(std::move(fd), sessionId, incoming); }); } status_t RpcSession::addNullDebuggingClient() { // Note: only works on raw sockets. + if (auto status = initShutdownTrigger(); status != OK) return status; + unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC))); if (serverFd == -1) { @@ -152,7 +160,7 @@ status_t RpcSession::addNullDebuggingClient() { ALOGE("Unable to create RpcTransportCtx for null debugging client"); return NO_MEMORY; } - auto server = ctx->newTransport(std::move(serverFd)); + auto server = ctx->newTransport(std::move(serverFd), mShutdownTrigger.get()); if (server == nullptr) { ALOGE("Unable to set up RpcTransport"); return UNKNOWN_ERROR; @@ -216,91 +224,6 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) { return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address); } -std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() { - auto ret = std::make_unique<RpcSession::FdTrigger>(); - if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { - ALOGE("Could not create pipe %s", strerror(errno)); - return nullptr; - } - return ret; -} - -void RpcSession::FdTrigger::trigger() { - mWrite.reset(); -} - -bool RpcSession::FdTrigger::isTriggered() { - return mWrite == -1; -} - -status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) { - return triggerablePoll(rpcTransport->pollSocket(), event); -} - -status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { - while (true) { - pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, - {.fd = mRead.get(), .events = POLLHUP, .revents = 0}}; - int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); - if (ret < 0) { - return -errno; - } - if (ret == 0) { - continue; - } - if (pfd[1].revents & POLLHUP) { - return -ECANCELED; - } - return pfd[0].revents & event ? OK : DEAD_OBJECT; - } -} - -status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport, - const void* data, size_t size) { - const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); - const uint8_t* end = buffer + size; - - MAYBE_WAIT_IN_FLAKE_MODE; - - status_t status; - while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) { - auto writeSize = rpcTransport->send(buffer, end - buffer); - if (!writeSize.ok()) { - LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str()); - return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code(); - } - - if (*writeSize == 0) return DEAD_OBJECT; - - buffer += *writeSize; - if (buffer == end) return OK; - } - return status; -} - -status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data, - size_t size) { - uint8_t* buffer = reinterpret_cast<uint8_t*>(data); - uint8_t* end = buffer + size; - - MAYBE_WAIT_IN_FLAKE_MODE; - - status_t status; - while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) { - auto readSize = rpcTransport->recv(buffer, end - buffer); - if (!readSize.ok()) { - LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str()); - return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code(); - } - - if (*readSize == 0) return DEAD_OBJECT; // EOF - - buffer += *readSize; - if (buffer == end) return OK; - } - return status; -} - status_t RpcSession::readId() { { std::lock_guard<std::mutex> _l(mMutex); @@ -484,6 +407,7 @@ status_t RpcSession::setupClient( "Must only setup session once, but already has %zu clients", mOutgoingConnections.size()); } + if (auto status = initShutdownTrigger(); status != OK) return status; if (status_t status = connectAndInit(RpcAddress::zero(), false /*incoming*/); status != OK) return status; @@ -550,8 +474,8 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, for (size_t tries = 0; tries < 5; tries++) { if (tries > 0) usleep(10000); - unique_fd serverFd( - TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0))); + unique_fd serverFd(TEMP_FAILURE_RETRY( + socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0))); if (serverFd == -1) { int savedErrno = errno; ALOGE("Could not create socket at %s: %s", addr.toString().c_str(), @@ -564,10 +488,34 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, ALOGW("Connection reset on %s", addr.toString().c_str()); continue; } - int savedErrno = errno; - ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(), - strerror(savedErrno)); - return -savedErrno; + if (errno != EAGAIN && errno != EINPROGRESS) { + int savedErrno = errno; + ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(), + strerror(savedErrno)); + return -savedErrno; + } + // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or + // EINPROGRESS (for others). Call poll() and getsockopt() to get the error. + status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT); + if (pollStatus != OK) { + ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s", + statusToString(pollStatus).c_str()); + return pollStatus; + } + int soError; + socklen_t soErrorLen = sizeof(soError); + int ret = getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &soError, &soErrorLen); + if (ret == -1) { + int savedErrno = errno; + ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s", + strerror(savedErrno)); + return -savedErrno; + } + if (soError != 0) { + ALOGE("After connect(), getsockopt() returns error for socket at %s: %s", + addr.toString().c_str(), strerror(soError)); + return -soError; + } } LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get()); @@ -580,13 +528,14 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) { + LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr); auto ctx = mRpcTransportCtxFactory->newClientCtx(); if (ctx == nullptr) { ALOGE("Unable to create client RpcTransportCtx with %s sockets", mRpcTransportCtxFactory->toCString()); return NO_MEMORY; } - auto server = ctx->newTransport(std::move(fd)); + auto server = ctx->newTransport(std::move(fd), mShutdownTrigger.get()); if (server == nullptr) { ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString()); return UNKNOWN_ERROR; @@ -602,16 +551,12 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessio if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING; - auto sentHeader = server->send(&header, sizeof(header)); - if (!sentHeader.ok()) { + auto sendHeaderStatus = + server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header)); + if (sendHeaderStatus != OK) { ALOGE("Could not write connection header to socket: %s", - sentHeader.error().message().c_str()); - return -sentHeader.error().code(); - } - if (*sentHeader != sizeof(header)) { - ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd", - *sentHeader, sizeof(header)); - return UNKNOWN_ERROR; + statusToString(sendHeaderStatus).c_str()); + return sendHeaderStatus; } LOG_RPC_DETAIL("Socket at client: header sent"); @@ -652,19 +597,21 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran return OK; } +status_t RpcSession::initShutdownTrigger() { + // first client connection added, but setForServer not called, so + // initializaing for a client. + if (mShutdownTrigger == nullptr) { + mShutdownTrigger = FdTrigger::make(); + mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); + if (mShutdownTrigger == nullptr) return INVALID_OPERATION; + } + return OK; +} + status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) { sp<RpcConnection> connection = sp<RpcConnection>::make(); { std::lock_guard<std::mutex> _l(mMutex); - - // first client connection added, but setForServer not called, so - // initializaing for a client. - if (mShutdownTrigger == nullptr) { - mShutdownTrigger = FdTrigger::make(); - mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make(); - if (mShutdownTrigger == nullptr) return INVALID_OPERATION; - } - connection->rpcTransport = std::move(rpcTransport); connection->exclusiveTid = gettid(); mOutgoingConnections.push_back(connection); diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 23382c3858..b58f1b38ff 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -283,8 +283,8 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, } if (status_t status = - session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(), - data, size); + connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(), + data, size); status != OK) { LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); @@ -305,8 +305,8 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, } if (status_t status = - session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(), - data, size); + connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(), + data, size); status != OK) { LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp index 2fc19455ed..d77fc52c61 100644 --- a/libs/binder/RpcTransportRaw.cpp +++ b/libs/binder/RpcTransportRaw.cpp @@ -17,8 +17,11 @@ #define LOG_TAG "RpcRawTransport" #include <log/log.h> +#include <poll.h> + #include <binder/RpcTransportRaw.h> +#include "FdTrigger.h" #include "RpcState.h" using android::base::ErrnoError; @@ -32,14 +35,14 @@ namespace { class RpcTransportRaw : public RpcTransport { public: explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {} - Result<size_t> send(const void *buf, size_t size) override { + Result<size_t> send(const void* buf, size_t size) { ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL)); if (ret < 0) { return ErrnoError() << "send()"; } return ret; } - Result<size_t> recv(void *buf, size_t size) override { + Result<size_t> recv(void* buf, size_t size) { ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL)); if (ret < 0) { return ErrnoError() << "recv()"; @@ -47,14 +50,56 @@ public: return ret; } Result<size_t> peek(void *buf, size_t size) override { - ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK | MSG_DONTWAIT)); + ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK)); if (ret < 0) { return ErrnoError() << "recv(MSG_PEEK)"; } return ret; } - bool pending() override { return false; } - android::base::borrowed_fd pollSocket() const override { return mSocket; } + + status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override { + const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); + const uint8_t* end = buffer + size; + + MAYBE_WAIT_IN_FLAKE_MODE; + + status_t status; + while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) { + auto writeSize = this->send(buffer, end - buffer); + if (!writeSize.ok()) { + LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str()); + return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code(); + } + + if (*writeSize == 0) return DEAD_OBJECT; + + buffer += *writeSize; + if (buffer == end) return OK; + } + return status; + } + + status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override { + uint8_t* buffer = reinterpret_cast<uint8_t*>(data); + uint8_t* end = buffer + size; + + MAYBE_WAIT_IN_FLAKE_MODE; + + status_t status; + while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) { + auto readSize = this->recv(buffer, end - buffer); + if (!readSize.ok()) { + LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str()); + return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code(); + } + + if (*readSize == 0) return DEAD_OBJECT; // EOF + + buffer += *readSize; + if (buffer == end) return OK; + } + return status; + } private: android::base::unique_fd mSocket; @@ -63,7 +108,7 @@ private: // RpcTransportCtx with TLS disabled. class RpcTransportCtxRaw : public RpcTransportCtx { public: - std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd) const { + std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd, FdTrigger*) const { return std::make_unique<RpcTransportRaw>(std::move(fd)); } }; diff --git a/libs/binder/Utils.cpp b/libs/binder/Utils.cpp index 90a4502ec5..d2a5be1102 100644 --- a/libs/binder/Utils.cpp +++ b/libs/binder/Utils.cpp @@ -18,10 +18,24 @@ #include <string.h> +using android::base::ErrnoError; +using android::base::Result; + namespace android { void zeroMemory(uint8_t* data, size_t size) { memset(data, 0, size); } -} // namespace android +Result<void> setNonBlocking(android::base::borrowed_fd fd) { + int flags = TEMP_FAILURE_RETRY(fcntl(fd.get(), F_GETFL)); + if (flags == -1) { + return ErrnoError() << "Could not get flags for fd"; + } + if (int ret = TEMP_FAILURE_RETRY(fcntl(fd.get(), F_SETFL, flags | O_NONBLOCK)); ret == -1) { + return ErrnoError() << "Could not set non-blocking flag for fd"; + } + return {}; +} + +} // namespace android diff --git a/libs/binder/Utils.h b/libs/binder/Utils.h index f94b158404..1e383da095 100644 --- a/libs/binder/Utils.h +++ b/libs/binder/Utils.h @@ -17,9 +17,14 @@ #include <cstdint> #include <stddef.h> +#include <android-base/result.h> +#include <android-base/unique_fd.h> + namespace android { // avoid optimizations void zeroMemory(uint8_t* data, size_t size); +android::base::Result<void> setNonBlocking(android::base::borrowed_fd fd); + } // namespace android diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index f79d85f5af..bf3e7e07f5 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -32,6 +32,7 @@ namespace android { +class FdTrigger; class RpcSocketAddress; /** @@ -190,7 +191,7 @@ private: sp<IBinder> mRootObject; wp<IBinder> mRootObjectWeak; std::map<RpcAddress, sp<RpcSession>> mSessions; - std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger; + std::unique_ptr<FdTrigger> mShutdownTrigger; std::condition_variable mShutdownCv; std::unique_ptr<RpcTransportCtx> mCtx; }; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 7ed6e4373e..761c50d822 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -39,6 +39,7 @@ class RpcServer; class RpcSocketAddress; class RpcState; class RpcTransport; +class FdTrigger; constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0; constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000; @@ -161,50 +162,6 @@ private: friend RpcState; explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory); - /** This is not a pipe. */ - struct FdTrigger { - /** Returns nullptr for error case */ - static std::unique_ptr<FdTrigger> make(); - - /** - * Close the write end of the pipe so that the read end receives POLLHUP. - * Not threadsafe. - */ - void trigger(); - - /** - * Whether this has been triggered. - */ - bool isTriggered(); - - /** - * Poll for a read event. - * - * event - for pollfd - * - * Return: - * true - time to read! - * false - trigger happened - */ - status_t triggerablePoll(base::borrowed_fd fd, int16_t event); - - /** - * Read (or write), but allow to be interrupted by this trigger. - * - * Return: - * true - succeeded in completely processing 'size' - * false - interrupted (failure or trigger) - */ - status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size); - status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size); - - private: - status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event); - - base::unique_fd mWrite; - base::unique_fd mRead; - }; - class EventListener : public virtual RefBase { public: virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0; @@ -271,6 +228,8 @@ private: std::unique_ptr<RpcTransport> rpcTransport); [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection); + status_t initShutdownTrigger(); + enum class ConnectionUse { CLIENT, CLIENT_ASYNC, diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h index 11646006c7..1b6951986e 100644 --- a/libs/binder/include/binder/RpcTransport.h +++ b/libs/binder/include/binder/RpcTransport.h @@ -23,42 +23,30 @@ #include <android-base/result.h> #include <android-base/unique_fd.h> +#include <utils/Errors.h> namespace android { +class FdTrigger; + // Represents a socket connection. class RpcTransport { public: virtual ~RpcTransport() = default; - // replacement of ::send(). errno may not be set if TLS is enabled. - virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0; - - // replacement of ::recv(). errno may not be set if TLS is enabled. - virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0; - - // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled. - // - // Implementation details: - // - For TLS, this may invoke syscalls and read data from the transport - // into an internal buffer in userspace. After that, pending() == true. - // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer; - // pending() is always false. + // replacement of ::recv(MSG_PEEK). Error code may not be set if TLS is enabled. virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0; - // Returns true if there are data pending in a userspace buffer that RpcTransport holds. - // - // Implementation details: - // - For TLS, this does not invoke any syscalls or read any data from the - // transport. This only returns whether there are data pending in the internal buffer in - // userspace. - // - For raw sockets, this always returns false. - virtual bool pending() = 0; - - // Returns fd for polling. - // - // Do not directly read / write on this raw fd! - [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0; + /** + * Read (or write), but allow to be interrupted by a trigger. + * + * Return: + * OK - succeeded in completely processing 'size' + * error - interrupted (failure or trigger) + */ + virtual status_t interruptableWriteFully(FdTrigger *fdTrigger, const void *buf, + size_t size) = 0; + virtual status_t interruptableReadFully(FdTrigger *fdTrigger, void *buf, size_t size) = 0; protected: RpcTransport() = default; @@ -68,8 +56,13 @@ protected: class RpcTransportCtx { public: virtual ~RpcTransportCtx() = default; + + // Create a new RpcTransport object. + // + // Implemenion details: for TLS, this function may incur I/O. |fdTrigger| may be used + // to interrupt I/O. This function blocks until handshake is finished. [[nodiscard]] virtual std::unique_ptr<RpcTransport> newTransport( - android::base::unique_fd fd) const = 0; + android::base::unique_fd fd, FdTrigger *fdTrigger) const = 0; protected: RpcTransportCtx() = default; |