diff options
| -rw-r--r-- | libs/binder/Android.bp | 1 | ||||
| -rw-r--r-- | libs/binder/FdTrigger.cpp | 62 | ||||
| -rw-r--r-- | libs/binder/FdTrigger.h | 56 | ||||
| -rw-r--r-- | libs/binder/RpcServer.cpp | 11 | ||||
| -rw-r--r-- | libs/binder/RpcSession.cpp | 101 | ||||
| -rw-r--r-- | libs/binder/RpcState.cpp | 8 | ||||
| -rw-r--r-- | libs/binder/RpcTransportRaw.cpp | 53 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcServer.h | 3 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcSession.h | 45 | ||||
| -rw-r--r-- | libs/binder/include/binder/RpcTransport.h | 40 |
10 files changed, 202 insertions, 178 deletions
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp index f34672ced1..34121d2bb2 100644 --- a/libs/binder/Android.bp +++ b/libs/binder/Android.bp @@ -104,6 +104,7 @@ cc_library { "BpBinder.cpp", "BufferedTextOutput.cpp", "Debug.cpp", + "FdTrigger.cpp", "IInterface.cpp", "IMemory.cpp", "IPCThreadState.cpp", diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp new file mode 100644 index 0000000000..e38ac63a21 --- /dev/null +++ b/libs/binder/FdTrigger.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "FdTrigger" +#include <log/log.h> + +#include <poll.h> + +#include <android-base/macros.h> + +#include "FdTrigger.h" +namespace android { + +std::unique_ptr<FdTrigger> FdTrigger::make() { + auto ret = std::make_unique<FdTrigger>(); + if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { + ALOGE("Could not create pipe %s", strerror(errno)); + return nullptr; + } + return ret; +} + +void FdTrigger::trigger() { + mWrite.reset(); +} + +bool FdTrigger::isTriggered() { + return mWrite == -1; +} + +status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { + while (true) { + pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, + {.fd = mRead.get(), .events = POLLHUP, .revents = 0}}; + int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); + if (ret < 0) { + return -errno; + } + if (ret == 0) { + continue; + } + if (pfd[1].revents & POLLHUP) { + return -ECANCELED; + } + return pfd[0].revents & event ? OK : DEAD_OBJECT; + } +} + +} // namespace android diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h new file mode 100644 index 0000000000..984e685ae1 --- /dev/null +++ b/libs/binder/FdTrigger.h @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <memory> + +#include <android-base/unique_fd.h> +#include <utils/Errors.h> + +namespace android { + +/** This is not a pipe. */ +class FdTrigger { +public: + /** Returns nullptr for error case */ + static std::unique_ptr<FdTrigger> make(); + + /** + * Close the write end of the pipe so that the read end receives POLLHUP. + * Not threadsafe. + */ + void trigger(); + + /** + * Whether this has been triggered. + */ + bool isTriggered(); + + /** + * Poll for a read event. + * + * event - for pollfd + * + * Return: + * true - time to read! + * false - trigger happened + */ + status_t triggerablePoll(base::borrowed_fd fd, int16_t event); + +private: + base::unique_fd mWrite; + base::unique_fd mRead; +}; +} // namespace android diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp index 4fa99c0e45..66483edffb 100644 --- a/libs/binder/RpcServer.cpp +++ b/libs/binder/RpcServer.cpp @@ -29,6 +29,7 @@ #include <binder/RpcTransportRaw.h> #include <log/log.h> +#include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" #include "RpcWireFormat.h" @@ -156,7 +157,7 @@ void RpcServer::join() { LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join."); LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined"); mJoinThreadRunning = true; - mShutdownTrigger = RpcSession::FdTrigger::make(); + mShutdownTrigger = FdTrigger::make(); LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler"); mCtx = mRpcTransportCtxFactory->newServerCtx(); @@ -270,8 +271,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie RpcConnectionHeader header; if (status == OK) { - status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header, - sizeof(header)); + status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header, + sizeof(header)); if (status != OK) { ALOGE("Failed to read ID for client connecting to RPC server: %s", statusToString(status).c_str()); @@ -296,8 +297,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie .version = protocolVersion, }; - status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response, - sizeof(response)); + status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response, + sizeof(response)); if (status != OK) { ALOGE("Failed to send new session response: %s", statusToString(status).c_str()); // still need to cleanup before we can return diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index fe12ed4de4..c756f2e8fb 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -35,6 +35,7 @@ #include <jni.h> #include <utils/String8.h> +#include "FdTrigger.h" #include "RpcSocketAddress.h" #include "RpcState.h" #include "RpcWireFormat.h" @@ -218,91 +219,6 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) { return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address); } -std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() { - auto ret = std::make_unique<RpcSession::FdTrigger>(); - if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) { - ALOGE("Could not create pipe %s", strerror(errno)); - return nullptr; - } - return ret; -} - -void RpcSession::FdTrigger::trigger() { - mWrite.reset(); -} - -bool RpcSession::FdTrigger::isTriggered() { - return mWrite == -1; -} - -status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) { - return triggerablePoll(rpcTransport->pollSocket(), event); -} - -status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) { - while (true) { - pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0}, - {.fd = mRead.get(), .events = POLLHUP, .revents = 0}}; - int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1)); - if (ret < 0) { - return -errno; - } - if (ret == 0) { - continue; - } - if (pfd[1].revents & POLLHUP) { - return -ECANCELED; - } - return pfd[0].revents & event ? OK : DEAD_OBJECT; - } -} - -status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport, - const void* data, size_t size) { - const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); - const uint8_t* end = buffer + size; - - MAYBE_WAIT_IN_FLAKE_MODE; - - status_t status; - while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) { - auto writeSize = rpcTransport->send(buffer, end - buffer); - if (!writeSize.ok()) { - LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str()); - return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code(); - } - - if (*writeSize == 0) return DEAD_OBJECT; - - buffer += *writeSize; - if (buffer == end) return OK; - } - return status; -} - -status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data, - size_t size) { - uint8_t* buffer = reinterpret_cast<uint8_t*>(data); - uint8_t* end = buffer + size; - - MAYBE_WAIT_IN_FLAKE_MODE; - - status_t status; - while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) { - auto readSize = rpcTransport->recv(buffer, end - buffer); - if (!readSize.ok()) { - LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str()); - return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code(); - } - - if (*readSize == 0) return DEAD_OBJECT; // EOF - - buffer += *readSize; - if (buffer == end) return OK; - } - return status; -} - status_t RpcSession::readId() { { std::lock_guard<std::mutex> _l(mMutex); @@ -584,6 +500,7 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) { + LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr); auto ctx = mRpcTransportCtxFactory->newClientCtx(); if (ctx == nullptr) { ALOGE("Unable to create client RpcTransportCtx with %s sockets", @@ -606,16 +523,12 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessio if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING; - auto sentHeader = server->send(&header, sizeof(header)); - if (!sentHeader.ok()) { + auto sendHeaderStatus = + server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header)); + if (sendHeaderStatus != OK) { ALOGE("Could not write connection header to socket: %s", - sentHeader.error().message().c_str()); - return -sentHeader.error().code(); - } - if (*sentHeader != sizeof(header)) { - ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd", - *sentHeader, sizeof(header)); - return UNKNOWN_ERROR; + statusToString(sendHeaderStatus).c_str()); + return sendHeaderStatus; } LOG_RPC_DETAIL("Socket at client: header sent"); diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 23382c3858..b58f1b38ff 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -283,8 +283,8 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection, } if (status_t status = - session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(), - data, size); + connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(), + data, size); status != OK) { LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); @@ -305,8 +305,8 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection, } if (status_t status = - session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(), - data, size); + connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(), + data, size); status != OK) { LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size, connection->rpcTransport.get(), statusToString(status).c_str()); diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp index 2fc19455ed..995c54234f 100644 --- a/libs/binder/RpcTransportRaw.cpp +++ b/libs/binder/RpcTransportRaw.cpp @@ -17,8 +17,11 @@ #define LOG_TAG "RpcRawTransport" #include <log/log.h> +#include <poll.h> + #include <binder/RpcTransportRaw.h> +#include "FdTrigger.h" #include "RpcState.h" using android::base::ErrnoError; @@ -32,14 +35,14 @@ namespace { class RpcTransportRaw : public RpcTransport { public: explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {} - Result<size_t> send(const void *buf, size_t size) override { + Result<size_t> send(const void* buf, size_t size) { ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL)); if (ret < 0) { return ErrnoError() << "send()"; } return ret; } - Result<size_t> recv(void *buf, size_t size) override { + Result<size_t> recv(void* buf, size_t size) { ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL)); if (ret < 0) { return ErrnoError() << "recv()"; @@ -53,8 +56,50 @@ public: } return ret; } - bool pending() override { return false; } - android::base::borrowed_fd pollSocket() const override { return mSocket; } + + status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override { + const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data); + const uint8_t* end = buffer + size; + + MAYBE_WAIT_IN_FLAKE_MODE; + + status_t status; + while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) { + auto writeSize = this->send(buffer, end - buffer); + if (!writeSize.ok()) { + LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str()); + return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code(); + } + + if (*writeSize == 0) return DEAD_OBJECT; + + buffer += *writeSize; + if (buffer == end) return OK; + } + return status; + } + + status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override { + uint8_t* buffer = reinterpret_cast<uint8_t*>(data); + uint8_t* end = buffer + size; + + MAYBE_WAIT_IN_FLAKE_MODE; + + status_t status; + while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) { + auto readSize = this->recv(buffer, end - buffer); + if (!readSize.ok()) { + LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str()); + return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code(); + } + + if (*readSize == 0) return DEAD_OBJECT; // EOF + + buffer += *readSize; + if (buffer == end) return OK; + } + return status; + } private: android::base::unique_fd mSocket; diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h index f79d85f5af..bf3e7e07f5 100644 --- a/libs/binder/include/binder/RpcServer.h +++ b/libs/binder/include/binder/RpcServer.h @@ -32,6 +32,7 @@ namespace android { +class FdTrigger; class RpcSocketAddress; /** @@ -190,7 +191,7 @@ private: sp<IBinder> mRootObject; wp<IBinder> mRootObjectWeak; std::map<RpcAddress, sp<RpcSession>> mSessions; - std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger; + std::unique_ptr<FdTrigger> mShutdownTrigger; std::condition_variable mShutdownCv; std::unique_ptr<RpcTransportCtx> mCtx; }; diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 33206237cb..761c50d822 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -39,6 +39,7 @@ class RpcServer; class RpcSocketAddress; class RpcState; class RpcTransport; +class FdTrigger; constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0; constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000; @@ -161,50 +162,6 @@ private: friend RpcState; explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory); - /** This is not a pipe. */ - struct FdTrigger { - /** Returns nullptr for error case */ - static std::unique_ptr<FdTrigger> make(); - - /** - * Close the write end of the pipe so that the read end receives POLLHUP. - * Not threadsafe. - */ - void trigger(); - - /** - * Whether this has been triggered. - */ - bool isTriggered(); - - /** - * Poll for a read event. - * - * event - for pollfd - * - * Return: - * true - time to read! - * false - trigger happened - */ - status_t triggerablePoll(base::borrowed_fd fd, int16_t event); - - /** - * Read (or write), but allow to be interrupted by this trigger. - * - * Return: - * true - succeeded in completely processing 'size' - * false - interrupted (failure or trigger) - */ - status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size); - status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size); - - private: - status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event); - - base::unique_fd mWrite; - base::unique_fd mRead; - }; - class EventListener : public virtual RefBase { public: virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0; diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h index 11646006c7..afca5854d9 100644 --- a/libs/binder/include/binder/RpcTransport.h +++ b/libs/binder/include/binder/RpcTransport.h @@ -23,42 +23,30 @@ #include <android-base/result.h> #include <android-base/unique_fd.h> +#include <utils/Errors.h> namespace android { +class FdTrigger; + // Represents a socket connection. class RpcTransport { public: virtual ~RpcTransport() = default; - // replacement of ::send(). errno may not be set if TLS is enabled. - virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0; - - // replacement of ::recv(). errno may not be set if TLS is enabled. - virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0; - - // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled. - // - // Implementation details: - // - For TLS, this may invoke syscalls and read data from the transport - // into an internal buffer in userspace. After that, pending() == true. - // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer; - // pending() is always false. + // replacement of ::recv(MSG_PEEK). Error code may not be set if TLS is enabled. virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0; - // Returns true if there are data pending in a userspace buffer that RpcTransport holds. - // - // Implementation details: - // - For TLS, this does not invoke any syscalls or read any data from the - // transport. This only returns whether there are data pending in the internal buffer in - // userspace. - // - For raw sockets, this always returns false. - virtual bool pending() = 0; - - // Returns fd for polling. - // - // Do not directly read / write on this raw fd! - [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0; + /** + * Read (or write), but allow to be interrupted by a trigger. + * + * Return: + * OK - succeeded in completely processing 'size' + * error - interrupted (failure or trigger) + */ + virtual status_t interruptableWriteFully(FdTrigger *fdTrigger, const void *buf, + size_t size) = 0; + virtual status_t interruptableReadFully(FdTrigger *fdTrigger, void *buf, size_t size) = 0; protected: RpcTransport() = default; |