audio: Fix IStreamIn.getActiveMicrophones test

Since "active" means "used by the stream for acquiring data,"
it was unreasonable to expect the list of active microphones
to be non-empty prior to actually starting data acquisition.
This change adds running of 'burst' commands before calling
'getActiveMicrophones'.

To reuse existing code some refactorings have been made.
Added 'AudioInputFlags::HOTWORD_TAP' to the list of port
config flags for which I/O testing is not performed.

Bug: 328010709
Bug: 328362233
Test: atest VtsHalAudioCoreTargetTest
(cherry picked from https://android-review.googlesource.com/q/commit:eee5499ba8381b50a29a541c2077b040a642fa18)
Merged-In: I876c0b6d7365e104ec9ed8cf5033a83f822006b6
Change-Id: I876c0b6d7365e104ec9ed8cf5033a83f822006b6
diff --git a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
index 8b08945..039695b 100644
--- a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
+++ b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp
@@ -2883,6 +2883,177 @@
     std::unique_ptr<WithStream<Stream>> mStream;
 };
 
+class StreamLogicDefaultDriver : public StreamLogicDriver {
+  public:
+    StreamLogicDefaultDriver(std::shared_ptr<StateSequence> commands, size_t frameSizeBytes)
+        : mCommands(commands), mFrameSizeBytes(frameSizeBytes) {
+        mCommands->rewind();
+    }
+
+    // The three methods below is intended to be called after the worker
+    // thread has joined, thus no extra synchronization is needed.
+    bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; }
+    bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; }
+    std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; }
+
+    bool done() override { return mCommands->done(); }
+    TransitionTrigger getNextTrigger(int maxDataSize, int* actualSize) override {
+        auto trigger = mCommands->getTrigger();
+        if (StreamDescriptor::Command* command = std::get_if<StreamDescriptor::Command>(&trigger);
+            command != nullptr) {
+            if (command->getTag() == StreamDescriptor::Command::Tag::burst) {
+                if (actualSize != nullptr) {
+                    // In the output scenario, reduce slightly the fmqByteCount to verify
+                    // that the HAL module always consumes all data from the MQ.
+                    if (maxDataSize > static_cast<int>(mFrameSizeBytes)) {
+                        LOG(DEBUG) << __func__ << ": reducing data size by " << mFrameSizeBytes;
+                        maxDataSize -= mFrameSizeBytes;
+                    }
+                    *actualSize = maxDataSize;
+                }
+                command->set<StreamDescriptor::Command::Tag::burst>(maxDataSize);
+            } else {
+                if (actualSize != nullptr) *actualSize = 0;
+            }
+        }
+        return trigger;
+    }
+    bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; }
+    bool processValidReply(const StreamDescriptor::Reply& reply) override {
+        if (reply.observable.frames != StreamDescriptor::Position::UNKNOWN) {
+            if (mPreviousFrames.has_value()) {
+                if (reply.observable.frames > mPreviousFrames.value()) {
+                    mObservablePositionIncrease = true;
+                } else if (reply.observable.frames < mPreviousFrames.value()) {
+                    mRetrogradeObservablePosition = true;
+                }
+            }
+            mPreviousFrames = reply.observable.frames;
+        }
+
+        auto expected = mCommands->getExpectedStates();
+        if (expected.count(reply.state) == 0) {
+            std::string s =
+                    std::string("Unexpected transition from the state ")
+                            .append(mPreviousState.has_value() ? toString(mPreviousState.value())
+                                                               : "<initial state>")
+                            .append(" to ")
+                            .append(toString(reply.state))
+                            .append(" (expected one of ")
+                            .append(::android::internal::ToString(expected))
+                            .append(") caused by the ")
+                            .append(toString(mCommands->getTrigger()));
+            LOG(ERROR) << __func__ << ": " << s;
+            mUnexpectedTransition = std::move(s);
+            return false;
+        }
+        mCommands->advance(reply.state);
+        mPreviousState = reply.state;
+        return true;
+    }
+
+  protected:
+    std::shared_ptr<StateSequence> mCommands;
+    const size_t mFrameSizeBytes;
+    std::optional<StreamDescriptor::State> mPreviousState;
+    std::optional<int64_t> mPreviousFrames;
+    bool mObservablePositionIncrease = false;
+    bool mRetrogradeObservablePosition = false;
+    std::string mUnexpectedTransition;
+};
+
+// Defined later together with state transition sequences.
+std::shared_ptr<StateSequence> makeBurstCommands(bool isSync);
+
+// Certain types of ports can not be used without special preconditions.
+static bool skipStreamIoTestForMixPortConfig(const AudioPortConfig& portConfig) {
+    return (portConfig.flags.value().getTag() == AudioIoFlags::input &&
+            isAnyBitPositionFlagSet(portConfig.flags.value().template get<AudioIoFlags::input>(),
+                                    {AudioInputFlags::MMAP_NOIRQ, AudioInputFlags::VOIP_TX,
+                                     AudioInputFlags::HW_HOTWORD, AudioInputFlags::HOTWORD_TAP})) ||
+           (portConfig.flags.value().getTag() == AudioIoFlags::output &&
+            isAnyBitPositionFlagSet(
+                    portConfig.flags.value().template get<AudioIoFlags::output>(),
+                    {AudioOutputFlags::MMAP_NOIRQ, AudioOutputFlags::VOIP_RX,
+                     AudioOutputFlags::COMPRESS_OFFLOAD, AudioOutputFlags::INCALL_MUSIC}));
+}
+
+template <typename Stream>
+class StreamFixtureWithWorker {
+  public:
+    explicit StreamFixtureWithWorker(bool isSync) : mIsSync(isSync) {}
+
+    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {
+        mStream = std::make_unique<StreamFixture<Stream>>();
+        ASSERT_NO_FATAL_FAILURE(
+                mStream->SetUpStreamForDevicePort(module, moduleConfig, devicePort));
+        MaybeSetSkipTestReason();
+    }
+
+    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& mixPort,
+               const AudioPort& devicePort) {
+        mStream = std::make_unique<StreamFixture<Stream>>();
+        ASSERT_NO_FATAL_FAILURE(
+                mStream->SetUpStreamForPortsPair(module, moduleConfig, mixPort, devicePort));
+        MaybeSetSkipTestReason();
+    }
+
+    void SendBurstCommands(bool validatePosition = true) {
+        ASSERT_NO_FATAL_FAILURE(StartWorkerToSendBurstCommands());
+        ASSERT_NO_FATAL_FAILURE(JoinWorkerAfterBurstCommands(validatePosition));
+    }
+
+    void StartWorkerToSendBurstCommands() {
+        const StreamContext* context = mStream->getStreamContext();
+        mWorkerDriver = std::make_unique<StreamLogicDefaultDriver>(makeBurstCommands(mIsSync),
+                                                                   context->getFrameSizeBytes());
+        mWorker = std::make_unique<typename IOTraits<Stream>::Worker>(
+                *context, mWorkerDriver.get(), mStream->getStreamEventReceiver());
+        LOG(DEBUG) << __func__ << ": starting " << IOTraits<Stream>::directionStr << " worker...";
+        ASSERT_TRUE(mWorker->start());
+    }
+
+    void JoinWorkerAfterBurstCommands(bool validatePosition = true) {
+        // Must call 'prepareToClose' before attempting to join because the stream may be stuck.
+        std::shared_ptr<IStreamCommon> common;
+        ASSERT_IS_OK(mStream->getStream()->getStreamCommon(&common));
+        ASSERT_IS_OK(common->prepareToClose());
+        LOG(DEBUG) << __func__ << ": joining " << IOTraits<Stream>::directionStr << " worker...";
+        mWorker->join();
+        EXPECT_FALSE(mWorker->hasError()) << mWorker->getError();
+        EXPECT_EQ("", mWorkerDriver->getUnexpectedStateTransition());
+        if (validatePosition) {
+            if (IOTraits<Stream>::is_input) {
+                EXPECT_TRUE(mWorkerDriver->hasObservablePositionIncrease());
+            }
+            EXPECT_FALSE(mWorkerDriver->hasRetrogradeObservablePosition());
+        }
+        mWorker.reset();
+        mWorkerDriver.reset();
+    }
+
+    void TeardownPatch() { mStream->TeardownPatch(); }
+
+    const AudioDevice& getDevice() const { return mStream->getDevice(); }
+    Stream* getStream() const { return mStream->getStream(); }
+    std::string skipTestReason() const {
+        return !mSkipTestReason.empty() ? mSkipTestReason : mStream->skipTestReason();
+    }
+
+  private:
+    void MaybeSetSkipTestReason() {
+        if (skipStreamIoTestForMixPortConfig(mStream->getPortConfig())) {
+            mSkipTestReason = "Mix port config is not supported for stream I/O tests";
+        }
+    }
+
+    const bool mIsSync;
+    std::string mSkipTestReason;
+    std::unique_ptr<StreamFixture<Stream>> mStream;
+    std::unique_ptr<StreamLogicDefaultDriver> mWorkerDriver;
+    std::unique_ptr<typename IOTraits<Stream>::Worker> mWorker;
+};
+
 template <typename Stream>
 class AudioStream : public AudioCoreModule {
   public:
@@ -3288,10 +3459,12 @@
         if (micDevicePorts.empty()) continue;
         atLeastOnePort = true;
         SCOPED_TRACE(port.toString());
-        StreamFixture<IStreamIn> stream;
-        ASSERT_NO_FATAL_FAILURE(stream.SetUpStreamForPortsPair(module.get(), moduleConfig.get(),
-                                                               port, micDevicePorts[0]));
+        StreamFixtureWithWorker<IStreamIn> stream(true /*isSync*/);
+        ASSERT_NO_FATAL_FAILURE(
+                stream.SetUp(module.get(), moduleConfig.get(), port, micDevicePorts[0]));
         if (!stream.skipTestReason().empty()) continue;
+
+        ASSERT_NO_FATAL_FAILURE(stream.SendBurstCommands(false /*validatePosition*/));
         std::vector<MicrophoneDynamicInfo> activeMics;
         EXPECT_IS_OK(stream.getStream()->getActiveMicrophones(&activeMics));
         EXPECT_FALSE(activeMics.empty());
@@ -3305,6 +3478,7 @@
             EXPECT_NE(0UL, mic.channelMapping.size())
                     << "No channels specified for the microphone \"" << mic.id << "\"";
         }
+
         stream.TeardownPatch();
         // Now the port of the stream is not connected, check that there are no active microphones.
         std::vector<MicrophoneDynamicInfo> emptyMics;
@@ -3682,85 +3856,6 @@
     }
 }
 
-class StreamLogicDefaultDriver : public StreamLogicDriver {
-  public:
-    StreamLogicDefaultDriver(std::shared_ptr<StateSequence> commands, size_t frameSizeBytes)
-        : mCommands(commands), mFrameSizeBytes(frameSizeBytes) {
-        mCommands->rewind();
-    }
-
-    // The three methods below is intended to be called after the worker
-    // thread has joined, thus no extra synchronization is needed.
-    bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; }
-    bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; }
-    std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; }
-
-    bool done() override { return mCommands->done(); }
-    TransitionTrigger getNextTrigger(int maxDataSize, int* actualSize) override {
-        auto trigger = mCommands->getTrigger();
-        if (StreamDescriptor::Command* command = std::get_if<StreamDescriptor::Command>(&trigger);
-            command != nullptr) {
-            if (command->getTag() == StreamDescriptor::Command::Tag::burst) {
-                if (actualSize != nullptr) {
-                    // In the output scenario, reduce slightly the fmqByteCount to verify
-                    // that the HAL module always consumes all data from the MQ.
-                    if (maxDataSize > static_cast<int>(mFrameSizeBytes)) {
-                        LOG(DEBUG) << __func__ << ": reducing data size by " << mFrameSizeBytes;
-                        maxDataSize -= mFrameSizeBytes;
-                    }
-                    *actualSize = maxDataSize;
-                }
-                command->set<StreamDescriptor::Command::Tag::burst>(maxDataSize);
-            } else {
-                if (actualSize != nullptr) *actualSize = 0;
-            }
-        }
-        return trigger;
-    }
-    bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; }
-    bool processValidReply(const StreamDescriptor::Reply& reply) override {
-        if (reply.observable.frames != StreamDescriptor::Position::UNKNOWN) {
-            if (mPreviousFrames.has_value()) {
-                if (reply.observable.frames > mPreviousFrames.value()) {
-                    mObservablePositionIncrease = true;
-                } else if (reply.observable.frames < mPreviousFrames.value()) {
-                    mRetrogradeObservablePosition = true;
-                }
-            }
-            mPreviousFrames = reply.observable.frames;
-        }
-
-        auto expected = mCommands->getExpectedStates();
-        if (expected.count(reply.state) == 0) {
-            std::string s =
-                    std::string("Unexpected transition from the state ")
-                            .append(mPreviousState.has_value() ? toString(mPreviousState.value())
-                                                               : "<initial state>")
-                            .append(" to ")
-                            .append(toString(reply.state))
-                            .append(" (expected one of ")
-                            .append(::android::internal::ToString(expected))
-                            .append(") caused by the ")
-                            .append(toString(mCommands->getTrigger()));
-            LOG(ERROR) << __func__ << ": " << s;
-            mUnexpectedTransition = std::move(s);
-            return false;
-        }
-        mCommands->advance(reply.state);
-        mPreviousState = reply.state;
-        return true;
-    }
-
-  protected:
-    std::shared_ptr<StateSequence> mCommands;
-    const size_t mFrameSizeBytes;
-    std::optional<StreamDescriptor::State> mPreviousState;
-    std::optional<int64_t> mPreviousFrames;
-    bool mObservablePositionIncrease = false;
-    bool mRetrogradeObservablePosition = false;
-    std::string mUnexpectedTransition;
-};
-
 enum {
     NAMED_CMD_NAME,
     NAMED_CMD_DELAY_MS,
@@ -3792,19 +3887,7 @@
         }
         for (const auto& portConfig : allPortConfigs) {
             SCOPED_TRACE(portConfig.toString());
-            // Certain types of ports can not be used without special preconditions.
-            if ((IOTraits<Stream>::is_input &&
-                 isAnyBitPositionFlagSet(
-                         portConfig.flags.value().template get<AudioIoFlags::Tag::input>(),
-                         {AudioInputFlags::MMAP_NOIRQ, AudioInputFlags::VOIP_TX,
-                          AudioInputFlags::HW_HOTWORD})) ||
-                (!IOTraits<Stream>::is_input &&
-                 isAnyBitPositionFlagSet(
-                         portConfig.flags.value().template get<AudioIoFlags::Tag::output>(),
-                         {AudioOutputFlags::MMAP_NOIRQ, AudioOutputFlags::VOIP_RX,
-                          AudioOutputFlags::COMPRESS_OFFLOAD, AudioOutputFlags::INCALL_MUSIC}))) {
-                continue;
-            }
+            if (skipStreamIoTestForMixPortConfig(portConfig)) continue;
             const bool isNonBlocking =
                     IOTraits<Stream>::is_input
                             ? false
@@ -4616,8 +4699,9 @@
 template <typename Stream>
 class WithRemoteSubmix {
   public:
-    WithRemoteSubmix() = default;
-    explicit WithRemoteSubmix(AudioDeviceAddress address) : mAddress(address) {}
+    WithRemoteSubmix() : mStream(true /*isSync*/) {}
+    explicit WithRemoteSubmix(AudioDeviceAddress address)
+        : mStream(true /*isSync*/), mAddress(address) {}
     WithRemoteSubmix(const WithRemoteSubmix&) = delete;
     WithRemoteSubmix& operator=(const WithRemoteSubmix&) = delete;
 
@@ -4637,57 +4721,31 @@
     void SetUp(IModule* module, ModuleConfig* moduleConfig) {
         auto devicePort = getRemoteSubmixAudioPort(moduleConfig, mAddress);
         ASSERT_TRUE(devicePort.has_value()) << "Device port for remote submix device not found";
-        ASSERT_NO_FATAL_FAILURE(SetUp(module, moduleConfig, *devicePort));
+        ASSERT_NO_FATAL_FAILURE(mStream.SetUp(module, moduleConfig, *devicePort));
+        mAddress = mStream.getDevice().address;
     }
 
-    void SendBurstCommandsStartWorker() {
-        const StreamContext* context = mStream->getStreamContext();
-        mWorkerDriver = std::make_unique<StreamLogicDefaultDriver>(makeBurstCommands(true),
-                                                                   context->getFrameSizeBytes());
-        mWorker = std::make_unique<typename IOTraits<Stream>::Worker>(
-                *context, mWorkerDriver.get(), mStream->getStreamEventReceiver());
-        LOG(DEBUG) << __func__ << ": starting " << IOTraits<Stream>::directionStr << " worker...";
-        ASSERT_TRUE(mWorker->start());
+    void StartWorkerToSendBurstCommands() {
+        ASSERT_NO_FATAL_FAILURE(mStream.StartWorkerToSendBurstCommands());
     }
 
-    void SendBurstCommandsJoinWorker() {
-        // Must call 'prepareToClose' before attempting to join because the stream may be
-        // stuck due to absence of activity from the other side of the remote submix pipe.
-        std::shared_ptr<IStreamCommon> common;
-        ASSERT_IS_OK(mStream->getStream()->getStreamCommon(&common));
-        ASSERT_IS_OK(common->prepareToClose());
-        LOG(DEBUG) << __func__ << ": joining " << IOTraits<Stream>::directionStr << " worker...";
-        mWorker->join();
-        EXPECT_FALSE(mWorker->hasError()) << mWorker->getError();
-        EXPECT_EQ("", mWorkerDriver->getUnexpectedStateTransition());
-        if (IOTraits<Stream>::is_input) {
-            EXPECT_TRUE(mWorkerDriver->hasObservablePositionIncrease());
-        }
-        EXPECT_FALSE(mWorkerDriver->hasRetrogradeObservablePosition());
-        mWorker.reset();
-        mWorkerDriver.reset();
+    void JoinWorkerAfterBurstCommands() {
+        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands());
     }
 
     void SendBurstCommands() {
-        ASSERT_NO_FATAL_FAILURE(SendBurstCommandsStartWorker());
-        ASSERT_NO_FATAL_FAILURE(SendBurstCommandsJoinWorker());
+        ASSERT_NO_FATAL_FAILURE(mStream.StartWorkerToSendBurstCommands());
+        ASSERT_NO_FATAL_FAILURE(mStream.JoinWorkerAfterBurstCommands());
     }
 
     std::optional<AudioDeviceAddress> getAudioDeviceAddress() const { return mAddress; }
-    std::string skipTestReason() const { return mStream->skipTestReason(); }
+    std::string skipTestReason() const { return mStream.skipTestReason(); }
 
   private:
-    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {
-        mStream = std::make_unique<StreamFixture<Stream>>();
-        ASSERT_NO_FATAL_FAILURE(
-                mStream->SetUpStreamForDevicePort(module, moduleConfig, devicePort));
-        mAddress = mStream->getDevice().address;
-    }
+    void SetUp(IModule* module, ModuleConfig* moduleConfig, const AudioPort& devicePort) {}
 
+    StreamFixtureWithWorker<Stream> mStream;
     std::optional<AudioDeviceAddress> mAddress;
-    std::unique_ptr<StreamFixture<Stream>> mStream;
-    std::unique_ptr<StreamLogicDefaultDriver> mWorkerDriver;
-    std::unique_ptr<typename IOTraits<Stream>::Worker> mWorker;
 };
 
 class AudioModuleRemoteSubmix : public AudioCoreModule {
@@ -4737,10 +4795,10 @@
     ASSERT_EQ("", streamIn.skipTestReason());
 
     // Start writing into the output stream.
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommandsStartWorker());
+    ASSERT_NO_FATAL_FAILURE(streamOut.StartWorkerToSendBurstCommands());
     // Simultaneously, read from the input stream.
     ASSERT_NO_FATAL_FAILURE(streamIn.SendBurstCommands());
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommandsJoinWorker());
+    ASSERT_NO_FATAL_FAILURE(streamOut.JoinWorkerAfterBurstCommands());
 }
 
 TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
@@ -4758,15 +4816,15 @@
         ASSERT_EQ("", streamIns[i]->skipTestReason());
     }
     // Start writing into the output stream.
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommandsStartWorker());
+    ASSERT_NO_FATAL_FAILURE(streamOut.StartWorkerToSendBurstCommands());
     // Simultaneously, read from input streams.
     for (size_t i = 0; i < streamInCount; i++) {
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SendBurstCommandsStartWorker());
+        ASSERT_NO_FATAL_FAILURE(streamIns[i]->StartWorkerToSendBurstCommands());
     }
     for (size_t i = 0; i < streamInCount; i++) {
-        ASSERT_NO_FATAL_FAILURE(streamIns[i]->SendBurstCommandsJoinWorker());
+        ASSERT_NO_FATAL_FAILURE(streamIns[i]->JoinWorkerAfterBurstCommands());
     }
-    ASSERT_NO_FATAL_FAILURE(streamOut.SendBurstCommandsJoinWorker());
+    ASSERT_NO_FATAL_FAILURE(streamOut.JoinWorkerAfterBurstCommands());
     // Clean up input streams in the reverse order because the device connection is owned
     // by the first one.
     for (size_t i = streamInCount; i != 0; --i) {