diff options
-rw-r--r-- | libs/binder/RpcSession.cpp | 24 | ||||
-rw-r--r-- | libs/binder/RpcState.cpp | 10 | ||||
-rw-r--r-- | libs/binder/include/binder/RpcSession.h | 2 | ||||
-rw-r--r-- | libs/binder/tests/IBinderRpcTest.aidl | 1 | ||||
-rw-r--r-- | libs/binder/tests/binderRpcTest.cpp | 69 |
5 files changed, 73 insertions, 33 deletions
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp index b2d1a1a869..4a6362a1ab 100644 --- a/libs/binder/RpcSession.cpp +++ b/libs/binder/RpcSession.cpp @@ -541,13 +541,27 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co (session->mClientConnectionsOffset + 1) % session->mClientConnections.size(); } - // USE SERVING SOCKET (for nested transaction) - // - // asynchronous calls cannot be nested + // USE SERVING SOCKET (e.g. nested transaction) if (use != ConnectionUse::CLIENT_ASYNC) { + sp<RpcConnection> exclusiveServer; // server connections are always assigned to a thread - findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections, - 0 /* index hint */); + findConnection(tid, &exclusiveServer, nullptr /*available*/, + session->mServerConnections, 0 /* index hint */); + + // asynchronous calls cannot be nested, we currently allow ref count + // calls to be nested (so that you can use this without having extra + // threads). Note 'drainCommands' is used so that these ref counts can't + // build up. + if (exclusiveServer != nullptr) { + if (exclusiveServer->allowNested) { + // guaranteed to be processed as nested command + exclusive = exclusiveServer; + } else if (use == ConnectionUse::CLIENT_REFCOUNT && available == nullptr) { + // prefer available socket, but if we don't have one, don't + // wait for one + exclusive = exclusiveServer; + } + } } // if our thread is already using a connection, prioritize using that diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp index 5a2015691f..050f4fbeb3 100644 --- a/libs/binder/RpcState.cpp +++ b/libs/binder/RpcState.cpp @@ -633,6 +633,7 @@ processTransactInternalTailCall: // TODO(b/182939933): heap allocation just for lookup in mNodeForAddress, // maybe add an RpcAddress 'view' if the type remains 'heavy' auto addr = RpcAddress::fromRawEmbedded(&transaction->address); + bool oneway = transaction->flags & IBinder::FLAG_ONEWAY; status_t replyStatus = OK; sp<IBinder> target; @@ -661,7 +662,7 @@ processTransactInternalTailCall: addr.toString().c_str()); (void)session->shutdownAndWait(false); replyStatus = BAD_VALUE; - } else if (transaction->flags & IBinder::FLAG_ONEWAY) { + } else if (oneway) { std::unique_lock<std::mutex> _l(mNodeMutex); auto it = mNodeForAddress.find(addr); if (it->second.binder.promote() != target) { @@ -718,7 +719,12 @@ processTransactInternalTailCall: data.markForRpc(session); if (target) { + bool origAllowNested = connection->allowNested; + connection->allowNested = !oneway; + replyStatus = target->transact(transaction->code, data, &reply, transaction->flags); + + connection->allowNested = origAllowNested; } else { LOG_RPC_DETAIL("Got special transaction %u", transaction->code); @@ -754,7 +760,7 @@ processTransactInternalTailCall: } } - if (transaction->flags & IBinder::FLAG_ONEWAY) { + if (oneway) { if (replyStatus != OK) { ALOGW("Oneway call failed with error: %d", replyStatus); } diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h index 27baa9d8d9..e40154bb4a 100644 --- a/libs/binder/include/binder/RpcSession.h +++ b/libs/binder/include/binder/RpcSession.h @@ -191,6 +191,8 @@ private: // whether this or another thread is currently using this fd to make // or receive transactions. std::optional<pid_t> exclusiveTid; + + bool allowNested = false; }; status_t readId(); diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl index b0c8b2d8b3..9e1078870c 100644 --- a/libs/binder/tests/IBinderRpcTest.aidl +++ b/libs/binder/tests/IBinderRpcTest.aidl @@ -55,6 +55,7 @@ interface IBinderRpcTest { oneway void sleepMsAsync(int ms); void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value); + oneway void doCallbackAsync(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value); void die(boolean cleanup); void scheduleShutdown(); diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp index a79295a792..69682b0ae5 100644 --- a/libs/binder/tests/binderRpcTest.cpp +++ b/libs/binder/tests/binderRpcTest.cpp @@ -214,7 +214,8 @@ public: if (delayed) { std::thread([=]() { ALOGE("Executing delayed callback: '%s'", value.c_str()); - (void)doCallback(callback, oneway, false, value); + Status status = doCallback(callback, oneway, false, value); + ALOGE("Delayed callback status: '%s'", status.toString8().c_str()); }).detach(); return Status::ok(); } @@ -226,6 +227,11 @@ public: return callback->sendCallback(value); } + Status doCallbackAsync(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed, + const std::string& value) override { + return doCallback(callback, oneway, delayed, value); + } + Status die(bool cleanup) override { if (cleanup) { exit(1); @@ -978,31 +984,42 @@ TEST_P(BinderRpc, OnewayCallExhaustion) { TEST_P(BinderRpc, Callbacks) { const static std::string kTestString = "good afternoon!"; - for (bool oneway : {true, false}) { - for (bool delayed : {true, false}) { - auto proc = createRpcTestSocketServerProcess(1, 1, 1); - auto cb = sp<MyBinderRpcCallback>::make(); - - EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString)); - - using std::literals::chrono_literals::operator""s; - std::unique_lock<std::mutex> _l(cb->mMutex); - cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); }); - - EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed; - if (cb->mValues.empty()) continue; - EXPECT_EQ(cb->mValues.at(0), kTestString) - << "oneway: " << oneway << "delayed: " << delayed; - - // since we are severing the connection, we need to go ahead and - // tell the server to shutdown and exit so that waitpid won't hang - EXPECT_OK(proc.rootIface->scheduleShutdown()); - - // since this session has a reverse connection w/ a threadpool, we - // need to manually shut it down - EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); - - proc.expectAlreadyShutdown = true; + for (bool callIsOneway : {true, false}) { + for (bool callbackIsOneway : {true, false}) { + for (bool delayed : {true, false}) { + auto proc = createRpcTestSocketServerProcess(1, 1, 1); + auto cb = sp<MyBinderRpcCallback>::make(); + + if (callIsOneway) { + EXPECT_OK(proc.rootIface->doCallbackAsync(cb, callbackIsOneway, delayed, + kTestString)); + } else { + EXPECT_OK( + proc.rootIface->doCallback(cb, callbackIsOneway, delayed, kTestString)); + } + + using std::literals::chrono_literals::operator""s; + std::unique_lock<std::mutex> _l(cb->mMutex); + cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); }); + + EXPECT_EQ(cb->mValues.size(), 1) + << "callIsOneway: " << callIsOneway + << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed; + if (cb->mValues.empty()) continue; + EXPECT_EQ(cb->mValues.at(0), kTestString) + << "callIsOneway: " << callIsOneway + << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed; + + // since we are severing the connection, we need to go ahead and + // tell the server to shutdown and exit so that waitpid won't hang + EXPECT_OK(proc.rootIface->scheduleShutdown()); + + // since this session has a reverse connection w/ a threadpool, we + // need to manually shut it down + EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true)); + + proc.expectAlreadyShutdown = true; + } } } } |