diff options
| -rw-r--r-- | media/libaah_rtp/Android.mk | 3 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player.h | 43 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player_core.cpp | 184 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player_ring_buffer.cpp | 17 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player_substream.cpp | 8 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_group.cpp | 181 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_group.h | 81 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_packet.cpp | 42 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_packet.h | 27 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_player.cpp | 79 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_player.h | 10 | ||||
| -rw-r--r-- | media/libaah_rtp/utils.cpp | 49 | ||||
| -rw-r--r-- | media/libaah_rtp/utils.h | 74 | 
13 files changed, 555 insertions, 243 deletions
diff --git a/media/libaah_rtp/Android.mk b/media/libaah_rtp/Android.mk index 44351edef622..361c073d3093 100644 --- a/media/libaah_rtp/Android.mk +++ b/media/libaah_rtp/Android.mk @@ -17,7 +17,8 @@ LOCAL_SRC_FILES := \      aah_tx_group.cpp \      aah_tx_packet.cpp \      aah_tx_player.cpp \ -    pipe_event.cpp +    pipe_event.cpp \ +    utils.cpp  LOCAL_C_INCLUDES := \      frameworks/base/include \ diff --git a/media/libaah_rtp/aah_rx_player.h b/media/libaah_rtp/aah_rx_player.h index 53361b47707f..c9d0eb49978d 100644 --- a/media/libaah_rtp/aah_rx_player.h +++ b/media/libaah_rtp/aah_rx_player.h @@ -31,6 +31,7 @@  #include "aah_decoder_pump.h"  #include "pipe_event.h" +#include "utils.h"  namespace android { @@ -173,8 +174,7 @@ class AAH_RXPlayer : public MediaPlayerInterface {          bool           waiting_for_fast_start_;          bool           fetched_first_packet_; -        uint64_t       rtp_activity_timeout_; -        bool           rtp_activity_timeout_valid_; +        Timeout        rtp_activity_timeout_;          DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer);      }; @@ -194,9 +194,26 @@ class AAH_RXPlayer : public MediaPlayerInterface {          bool     isAboutToUnderflow();          uint32_t getSSRC()      const { return ssrc_; } -        uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } +        uint8_t  getProgramID() const { return (ssrc_ >> 5) & 0x1F; }          status_t getStatus() const { return status_; } +        void clearInactivityTimeout() { +            inactivity_timeout_.setTimeout(-1); +        } + +        void resetInactivityTimeout() { +            inactivity_timeout_.setTimeout(kInactivityTimeoutMsec); +        } + +        bool shouldExpire() { +            // Substreams should always have a positive time until timeout.  A +            // timeout value of 0 indicates that the timer has expired, while a +            // negative timeout (normally meaning no timeout) is used by some of +            // the core code to implement a mark and sweep pattern for cleaning +            // out no longer relevant substreams. +            return (inactivity_timeout_.msecTillTimeout() <= 0); +        } +        protected:          virtual ~Substream(); @@ -228,8 +245,10 @@ class AAH_RXPlayer : public MediaPlayerInterface {          uint32_t            aux_data_expected_size_;          sp<AAH_DecoderPump> decoder_; +        Timeout             inactivity_timeout_; -        static int64_t      kAboutToUnderflowThreshold; +        static const int64_t    kAboutToUnderflowThreshold; +        static const int        kInactivityTimeoutMsec;          DISALLOW_EVIL_CONSTRUCTORS(Substream);      }; @@ -247,7 +266,8 @@ class AAH_RXPlayer : public MediaPlayerInterface {      void                processRingBuffer();      void                processCommandPacket(PacketBuffer* pb);      bool                processGaps(); -    int                 computeNextGapRetransmitTimeout(); +    void                setGapStatus(GapStatus status); +    void                cleanoutExpiredSubstreams();      void                fetchAudioFlinger();      PipeEvent           wakeup_work_thread_evt_; @@ -268,7 +288,9 @@ class AAH_RXPlayer : public MediaPlayerInterface {      SeqNoGap            current_gap_;      GapStatus           current_gap_status_; -    uint64_t            next_retrans_req_time_; +    Timeout             next_retrans_req_timeout_; + +    Timeout             ss_cleanout_timeout_;      RXRingBuffer        ring_buffer_;      SubstreamVec        substreams_; @@ -282,15 +304,14 @@ class AAH_RXPlayer : public MediaPlayerInterface {      static const uint32_t kRetransRequestMagic;      static const uint32_t kFastStartRequestMagic;      static const uint32_t kRetransNAKMagic; -    static const uint32_t kGapRerequestTimeoutUSec; -    static const uint32_t kFastStartTimeoutUSec; -    static const uint32_t kRTPActivityTimeoutUSec; +    static const uint32_t kGapRerequestTimeoutMsec; +    static const uint32_t kFastStartTimeoutMsec; +    static const uint32_t kRTPActivityTimeoutMsec; +    static const uint32_t kSSCleanoutTimeoutMsec;      static const uint32_t INVOKE_GET_MASTER_VOLUME = 3;      static const uint32_t INVOKE_SET_MASTER_VOLUME = 4; -    static uint64_t monotonicUSecNow(); -      DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer);  }; diff --git a/media/libaah_rtp/aah_rx_player_core.cpp b/media/libaah_rtp/aah_rx_player_core.cpp index 616df0fb3684..33a548a0a3a8 100644 --- a/media/libaah_rtp/aah_rx_player_core.cpp +++ b/media/libaah_rtp/aah_rx_player_core.cpp @@ -37,9 +37,10 @@ const uint32_t AAH_RXPlayer::kRetransNAKMagic =      FOURCC('T','n','a','k');  const uint32_t AAH_RXPlayer::kFastStartRequestMagic =      FOURCC('T','f','s','t'); -const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000; -const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000; -const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000; +const uint32_t AAH_RXPlayer::kGapRerequestTimeoutMsec = 75; +const uint32_t AAH_RXPlayer::kFastStartTimeoutMsec = 800; +const uint32_t AAH_RXPlayer::kRTPActivityTimeoutMsec = 10000; +const uint32_t AAH_RXPlayer::kSSCleanoutTimeoutMsec = 1000;  static inline int16_t fetchInt16(uint8_t* data) {      return static_cast<int16_t>(U16_AT(data)); @@ -53,20 +54,10 @@ static inline int64_t fetchInt64(uint8_t* data) {      return static_cast<int64_t>(U64_AT(data));  } -uint64_t AAH_RXPlayer::monotonicUSecNow() { -    struct timespec now; -    int res = clock_gettime(CLOCK_MONOTONIC, &now); -    CHECK(res >= 0); - -    uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000; -    ret += now.tv_nsec / 1000; - -    return ret; -} -  status_t AAH_RXPlayer::startWorkThread() {      status_t res;      stopWorkThread(); +    ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec);      res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO);      if (res != OK) { @@ -125,7 +116,7 @@ void AAH_RXPlayer::resetPipeline() {      substreams_.clear(); -    current_gap_status_ = kGS_NoGap; +    setGapStatus(kGS_NoGap);  }  bool AAH_RXPlayer::setupSocket() { @@ -231,25 +222,26 @@ bool AAH_RXPlayer::threadLoop() {      while (!thread_wrapper_->exitPending()) {          // Step 1: Wait until there is something to do. -        int gap_timeout = computeNextGapRetransmitTimeout(); +        int gap_timeout = next_retrans_req_timeout_.msecTillTimeout();          int ring_timeout = ring_buffer_.computeInactivityTimeout(); +        int ss_cleanout_timeout = ss_cleanout_timeout_.msecTillTimeout();          int timeout = -1;          if (!ring_timeout) {              LOGW("RTP inactivity timeout reached, resetting pipeline.");              resetPipeline(); -            timeout = gap_timeout; -        } else { -            if (gap_timeout < 0) { -                timeout = ring_timeout; -            } else if (ring_timeout < 0) { -                timeout = gap_timeout; -            } else { -                timeout = (gap_timeout < ring_timeout) ? gap_timeout -                                                       : ring_timeout; -            } +            continue; +        } + +        if (!ss_cleanout_timeout) { +            cleanoutExpiredSubstreams(); +            continue;          } +        timeout = minTimeout(gap_timeout, timeout); +        timeout = minTimeout(ring_timeout, timeout); +        timeout = minTimeout(ss_cleanout_timeout, timeout); +          if ((0 != timeout) && (!process_more_right_now)) {              // Set up the events to wait on.  Start with the wakeup pipe.              memset(&poll_fds, 0, sizeof(poll_fds)); @@ -564,7 +556,7 @@ void AAH_RXPlayer::processRingBuffer() {                  }              } -            // Is this a command packet?  If so, its not necessarily associate +            // Is this a command packet?  If so, its not necessarily associated              // with one particular substream.  Just give it to the command              // packet handler and then move on.              if (4 == payload_type) { @@ -620,7 +612,7 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {      }      uint8_t trtp_version =  data[12]; -    uint8_t trtp_flags   =  data[13]       & 0xF; +    uint8_t trtp_flags   =  data[13] & 0xF;      if (1 != trtp_version) {          LOGV("Dropping packet, bad trtp version %hhu", trtp_version); @@ -643,32 +635,85 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {          return;      } +    bool do_cleanup_pass = false;      uint16_t command_id = U16_AT(data + offset); +    offset += 2;      switch (command_id) {          case TRTPControlPacket::kCommandNop: +            // Note: NOPs are frequently used to carry timestamp transformation +            // updates.  If there was a timestamp transform attached to this +            // payload, it was already taken care of by processRX.              break;          case TRTPControlPacket::kCommandEOS: +            // TODO need to differentiate between flush and EOS.  Substreams +            // which have hit EOS need a chance to drain before being destroyed. +          case TRTPControlPacket::kCommandFlush: { -            uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; +            uint8_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;              LOGI("*** %s flushing program_id=%d",                   __PRETTY_FUNCTION__, program_id); -            Vector<uint32_t> substreams_to_remove; +            // Flag any programs with the given program ID for cleanup.              for (size_t i = 0; i < substreams_.size(); ++i) { -                sp<Substream> iter = substreams_.valueAt(i); -                if (iter->getProgramID() == program_id) { -                    iter->shutdown(); -                    substreams_to_remove.add(iter->getSSRC()); +                const sp<Substream>& stream = substreams_.valueAt(i); +                if (stream->getProgramID() == program_id) { +                    stream->clearInactivityTimeout();                  }              } -            for (size_t i = 0; i < substreams_to_remove.size(); ++i) { -                substreams_.removeItem(substreams_to_remove[i]); +            // Make sure we do our cleanup pass at the end of this. +            do_cleanup_pass = true; +        } break; + +        case TRTPControlPacket::kCommandAPU: { +            // Active program update packet.  Go over all of our substreams and +            // either reset the inactivity timer for the substreams listed in +            // this update packet, or clear the inactivity timer for the +            // substreams not listed in this update packet.  A cleared +            // inactivity timer will flag a substream for deletion in the +            // cleanup pass at the end of this function. + +            // The packet must contain at least the 1 byte numActivePrograms +            // field. +            if (amt < offset + 1) { +                return;              } +            uint8_t numActivePrograms = data[offset++]; + +            // If the payload is not long enough to contain the list it promises +            // to have, just skip it. +            if (amt < (offset + numActivePrograms)) { +                return; +            } + +            // Clear all inactivity timers. +            for (size_t i = 0; i < substreams_.size(); ++i) { +                const sp<Substream>& stream = substreams_.valueAt(i); +                stream->clearInactivityTimeout(); +            } + +            // Now go over the list of active programs and reset the inactivity +            // timers for those streams which are currently in the active +            // program update packet. +            for (uint8_t j = 0; j < numActivePrograms; ++j) { +                uint8_t pid = (data[offset + j] & 0x1F); +                for (size_t i = 0; i < substreams_.size(); ++i) { +                    const sp<Substream>& stream = substreams_.valueAt(i); +                    if (stream->getProgramID() == pid) { +                        stream->resetInactivityTimeout(); +                    } +                } +            } + +            // Make sure we do our cleanup pass at the end of this. +            do_cleanup_pass = true;          } break;      } + +    if (do_cleanup_pass) +        cleanoutExpiredSubstreams();  }  bool AAH_RXPlayer::processGaps() { @@ -705,18 +750,18 @@ bool AAH_RXPlayer::processGaps() {          // this gap and move on.          if (!send_retransmit_request &&             (kGS_NoGap != current_gap_status_) && -           (0 == computeNextGapRetransmitTimeout())) { - +           (0 == next_retrans_req_timeout_.msecTillTimeout())) {              // If out current gap is the fast-start gap, don't bother to skip it              // because substreams look like the are about to underflow.              if ((kGS_FastStartGap != gap_status) ||                  (current_gap_.end_seq_ != gap.end_seq_)) { +                  for (size_t i = 0; i < substreams_.size(); ++i) {                      if (substreams_.valueAt(i)->isAboutToUnderflow()) { -                        LOGV("About to underflow, giving up on gap [%hu, %hu]", +                        LOGI("About to underflow, giving up on gap [%hu, %hu]",                                  gap.start_seq_, gap.end_seq_);                          ring_buffer_.processNAK(); -                        current_gap_status_ = kGS_NoGap; +                        setGapStatus(kGS_NoGap);                          return true;                      }                  } @@ -727,7 +772,7 @@ bool AAH_RXPlayer::processGaps() {              send_retransmit_request = true;          }      } else { -        current_gap_status_ = kGS_NoGap; +        setGapStatus(kGS_NoGap);      }      if (send_retransmit_request) { @@ -738,7 +783,7 @@ bool AAH_RXPlayer::processGaps() {              (current_gap_.end_seq_ == gap.end_seq_)) {              LOGV("Fast start is taking forever; giving up.");              ring_buffer_.processNAK(); -            current_gap_status_ = kGS_NoGap; +            setGapStatus(kGS_NoGap);              return true;          } @@ -777,32 +822,53 @@ bool AAH_RXPlayer::processGaps() {          // Update the current gap info.          current_gap_ = gap; -        current_gap_status_ = gap_status; -        next_retrans_req_time_ = monotonicUSecNow() + -                               ((kGS_FastStartGap == current_gap_status_) -                                ? kFastStartTimeoutUSec -                                : kGapRerequestTimeoutUSec); +        setGapStatus(gap_status);      }      return false;  } -// Compute when its time to send the next gap retransmission in milliseconds. -// Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit -// right now. -int AAH_RXPlayer::computeNextGapRetransmitTimeout() { -    if (kGS_NoGap == current_gap_status_) { -        return -1; -    } +void AAH_RXPlayer::setGapStatus(GapStatus status) { +    current_gap_status_ = status; + +    switch(current_gap_status_) { +        case kGS_NormalGap: +            next_retrans_req_timeout_.setTimeout(kGapRerequestTimeoutMsec); +            break; -    int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow(); +        case kGS_FastStartGap: +            next_retrans_req_timeout_.setTimeout(kFastStartTimeoutMsec); +            break; -    timeout_delta /= 1000; -    if (timeout_delta <= 0) { -        return 0; +        case kGS_NoGap: +        default: +            next_retrans_req_timeout_.setTimeout(-1); +            break;      } +} + +void AAH_RXPlayer::cleanoutExpiredSubstreams() { +    static const size_t kMaxPerPass = 32; +    uint32_t to_remove[kMaxPerPass]; +    size_t cnt, i; + +    do { +        for (i = 0, cnt = 0; +            (i < substreams_.size()) && (cnt < kMaxPerPass); +            ++i) { +            const sp<Substream>& stream = substreams_.valueAt(i); +            if (stream->shouldExpire()) { +                to_remove[cnt++] = stream->getSSRC(); +            } +        } + +        for (i = 0; i < cnt; ++i) { +            LOGI("Purging substream with SSRC 0x%08x", to_remove[i]); +            substreams_.removeItem(to_remove[i]); +        } +    } while (cnt >= kMaxPerPass); -    return static_cast<uint32_t>(timeout_delta); +    ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec);  }  }  // namespace android diff --git a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp index 22467f79e8d1..b59185060c4b 100644 --- a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp +++ b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp @@ -53,7 +53,7 @@ void AAH_RXPlayer::RXRingBuffer::reset() {      rd_seq_known_ = false;      waiting_for_fast_start_ = true;      fetched_first_packet_ = false; -    rtp_activity_timeout_valid_ = false; +    rtp_activity_timeout_.setTimeout(-1);  }  bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, @@ -62,8 +62,7 @@ bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,      CHECK(NULL != ring_);      CHECK(NULL != buf); -    rtp_activity_timeout_valid_ = true; -    rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec; +    rtp_activity_timeout_.setTimeout(kRTPActivityTimeoutMsec);      // If the ring buffer is totally reset (we have never received a single      // payload) then we don't know the rd sequence number and this should be @@ -328,17 +327,7 @@ void AAH_RXPlayer::RXRingBuffer::processNAK(SeqNoGap* nak) {  int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() {      AutoMutex lock(&lock_); - -    if (!rtp_activity_timeout_valid_) { -        return -1; -    } - -    uint64_t now = monotonicUSecNow(); -    if (rtp_activity_timeout_ <= now) { -        return 0; -    } - -    return (rtp_activity_timeout_ - now) / 1000; +    return rtp_activity_timeout_.msecTillTimeout();  }  AAH_RXPlayer::PacketBuffer* diff --git a/media/libaah_rtp/aah_rx_player_substream.cpp b/media/libaah_rtp/aah_rx_player_substream.cpp index 3e3a95c47620..c72215b0d3eb 100644 --- a/media/libaah_rtp/aah_rx_player_substream.cpp +++ b/media/libaah_rtp/aah_rx_player_substream.cpp @@ -33,8 +33,9 @@  namespace android { -int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = +const int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold =      50ull * 1000; +const int AAH_RXPlayer::Substream::kInactivityTimeoutMsec = 10000;  AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {      ssrc_ = ssrc; @@ -54,6 +55,7 @@ AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {      // cleanupBufferInProgress will reset most of the internal state variables.      // Just need to make sure that buffer_in_progress_ is NULL before calling.      cleanupBufferInProgress(); +    resetInactivityTimeout();  }  AAH_RXPlayer::Substream::~Substream() { @@ -108,6 +110,8 @@ void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf,          return;      } +    resetInactivityTimeout(); +      // Do we have a buffer in progress already?  If so, abort the buffer.  In      // theory, this should never happen.  If there were a discontinutity in the      // stream, the discon in the seq_nos at the RTP level should have already @@ -362,6 +366,8 @@ void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf,          return;      } +    resetInactivityTimeout(); +      if (NULL == buffer_in_progress_) {          LOGV("TRTP Receiver skipping payload continuation; no buffer currently"               " in progress."); diff --git a/media/libaah_rtp/aah_tx_group.cpp b/media/libaah_rtp/aah_tx_group.cpp index 22dd0260a994..2e3df0a73d5a 100644 --- a/media/libaah_rtp/aah_tx_group.cpp +++ b/media/libaah_rtp/aah_tx_group.cpp @@ -29,6 +29,7 @@  #include "aah_tx_group.h"  #include "aah_tx_player.h" +#include "utils.h"  //#define DROP_PACKET_TEST  #ifdef DROP_PACKET_TEST @@ -101,15 +102,17 @@ static ssize_t droptest_recvfrom(int sockfd, void *buf,  namespace android {  const int AAH_TXGroup::kRetryTrimIntervalMsec = 100; -const int AAH_TXGroup::kHeartbeatIntervalMsec = 1000; +const int AAH_TXGroup::kHeartbeatIntervalMsec = 500;  const int AAH_TXGroup::kTXGroupLingerTimeMsec = 10000;  const int AAH_TXGroup::kUnicastClientTimeoutMsec = 5000;  const size_t AAH_TXGroup::kRetryBufferCapacity = 100; -const size_t AAH_TXGroup::kMaxUnicastTargets = 16; +const size_t AAH_TXGroup::kMaxAllowedUnicastTargets = 16;  const size_t AAH_TXGroup::kInitialUnicastTargetCapacity = 4; -const size_t AAH_TXGroup::kMaxAllowedTXGroups = 16; +const size_t AAH_TXGroup::kMaxAllowedTXGroups = 8;  const size_t AAH_TXGroup::kInitialActiveTXGroupsCapacity = 4; +const size_t AAH_TXGroup::kMaxAllowedPlayerClients = 4; +const size_t AAH_TXGroup::kInitialPlayerClientCapacity = 2;  const uint32_t AAH_TXGroup::kCNC_RetryRequestID     = 'Treq';  const uint32_t AAH_TXGroup::kCNC_FastStartRequestID = 'Tfst'; @@ -124,33 +127,24 @@ sp<AAH_TXGroup::CmdAndControlRXer>  AAH_TXGroup::mCmdAndControlRXer;  uint32_t                            AAH_TXGroup::sNextEpoch;  bool                                AAH_TXGroup::sNextEpochValid = false; -static inline bool matchSockaddrs(const struct sockaddr_in* a, -                                  const struct sockaddr_in* b) { -    CHECK(NULL != a); -    CHECK(NULL != b); -    return ((a->sin_family      == b->sin_family)      && -            (a->sin_addr.s_addr == b->sin_addr.s_addr) && -            (a->sin_port        == b->sin_port)); -} -  AAH_TXGroup::AAH_TXGroup()      : mRetryBuffer(kRetryBufferCapacity)  {      // Initialize members with no constructor to sensible defaults. -    mClientRefCount = 0;      mTRTPSeqNumber = 0;      mNextProgramID = 1;      mEpoch = getNextEpoch();      mMulticastTargetValid = false;      mSocket = -1;      mCmdAndControlPort = 0; -    mClientRefCount = 0;      mUnicastTargets.setCapacity(kInitialUnicastTargetCapacity); +    mActiveClients.setCapacity(kInitialPlayerClientCapacity); +    mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);  }  AAH_TXGroup::~AAH_TXGroup() { -    CHECK(0 == mClientRefCount); +    CHECK(mActiveClients.size() == 0);      if (mSocket >= 0) {          ::close(mSocket); @@ -237,7 +231,8 @@ bailout:      return ret_val;  } -sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) { +sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port, +                                      const sp<AAH_TXPlayer>& client) {      sp<AAH_TXGroup> ret_val;      // If port is non-zero, we are creating a new group.  Otherwise, we are @@ -250,7 +245,13 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {          for (size_t i = 0; i < sActiveTXGroups.size(); ++i) {              if (port == sActiveTXGroups[i]->getCmdAndControlPort()) {                  ret_val = sActiveTXGroups[i]; -                ret_val->addClientReference(); + +                if (!ret_val->registerClient(client)) { +                    // No need to log an error, registerClient has already done +                    // so for us. +                    ret_val = NULL; +                } +                  break;              }          } @@ -312,6 +313,13 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {                  }              } +            // Register the client with the newly created group. +            if (!ret_val->registerClient(client)) { +                // No need to log an error, registerClient has already done so +                // for us. +                goto bailout; +            } +              // Make sure we are at least at minimum capacity in the              // ActiveTXGroups vector.              if (sActiveTXGroups.capacity() < kInitialActiveTXGroupsCapacity) { @@ -321,11 +329,10 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {              // Add ourselves to the list of active TXGroups.              if (sActiveTXGroups.add(ret_val) < 0) {                  LOGE("Failed to add new TX Group to Active Group list"); +                ret_val->unregisterClient(client);                  goto bailout;              } -            ret_val->addClientReference(); -              LOGI("Created TX Group with C&C Port %hu.  %d/%d groups now"                   " active.", ret_val->getCmdAndControlPort(),                   sActiveTXGroups.size(), kMaxAllowedTXGroups); @@ -342,7 +349,8 @@ bailout:      return sp<AAH_TXGroup>(NULL);  } -sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target) { +sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target, +                                      const sp<AAH_TXPlayer>& client) {      // Hold the static lock while we search for a TX Group which has the      // multicast target passed to us.      Mutex::Autolock lock(sLock); @@ -359,32 +367,71 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target) {          }      } -    if (ret_val != NULL) -        ret_val->addClientReference(); +    if (ret_val != NULL) { +        if (!ret_val->registerClient(client)) { +            // No need to log an error, registerClient has already done so for +            // us. +            ret_val = NULL; +        } +    }      return ret_val;  } -void AAH_TXGroup::dropClientReference() { +void AAH_TXGroup::unregisterClient(const sp<AAH_TXPlayer>& client) {      Mutex::Autolock lock(mLock); -    CHECK(mClientRefCount > 0); -    --mClientRefCount; -    if (!mClientRefCount) { +    LOGI("TXPlayer leaving TXGroup listening on C&C port %hu", +         mCmdAndControlPort); + +    bool found_it = false; +    for (size_t i = 0; i < mActiveClients.size(); ++i) { +        if (mActiveClients[i].get() == client.get()) { +            found_it = true; +            mActiveClients.removeAt(i); +            break; +        } +    } +    CHECK(found_it); + +    if (!mActiveClients.size()) {          mCleanupTimeout.setTimeout(kTXGroupLingerTimeMsec);      }  } -void AAH_TXGroup::addClientReference() { +bool AAH_TXGroup::registerClient(const sp<AAH_TXPlayer>& client) { +    // ASSERT holding sLock      Mutex::Autolock lock(mLock); -    ++mClientRefCount; + +    CHECK(client != NULL); + +    // Check the client limit. +    if (mActiveClients.size() >= kMaxAllowedPlayerClients) { +        LOGE("Cannot register new client with C&C group listening on port %hu." +             "  %d/%d clients are already active", mCmdAndControlPort, +             mActiveClients.size(), kMaxAllowedPlayerClients); +        return false; +    } + +    // Try to add the client to the list. +    if (mActiveClients.add(client) < 0) { +        LOGE("Failed to register new client with C&C group listening on port" +             " %hu.  %d/%d clients are currently active", mCmdAndControlPort, +             mActiveClients.size(), kMaxAllowedPlayerClients); +        return false; +    } + +    // Assign our new client's program ID, cancel the cleanup timeout and get +    // out. +    client->setProgramID(getNewProgramID());      mCleanupTimeout.setTimeout(-1); +    return true;  }  bool AAH_TXGroup::shouldExpire() {      Mutex::Autolock lock(mLock); -    if (mClientRefCount) { +    if (mActiveClients.size()) {          return false;      } @@ -395,9 +442,12 @@ bool AAH_TXGroup::shouldExpire() {      return true;  } -uint16_t AAH_TXGroup::getNewProgramID() { -    int tmp = android_atomic_inc(&mNextProgramID); -    return static_cast<uint16_t>(tmp & 0xFFFF); +uint8_t AAH_TXGroup::getNewProgramID() { +    uint8_t tmp; +    do { +        tmp = static_cast<uint8_t>(android_atomic_inc(&mNextProgramID) & 0x1F); +    } while(!tmp); +    return tmp;  }  status_t AAH_TXGroup::sendPacket(const sp<TRTPPacket>& packet) { @@ -449,13 +499,10 @@ status_t AAH_TXGroup::sendPacket_l(const sp<TRTPPacket>& packet) {              LOGI("TXGroup on port %hu removing client at %d.%d.%d.%d:%hu due to"                   " timeout.  Now serving %d/%d unicast clients.",                   mCmdAndControlPort, IP_PRINTF_HELPER(addr), port, -                 mUnicastTargets.size(), kMaxUnicastTargets); +                 mUnicastTargets.size(), kMaxAllowedUnicastTargets);          }      } -    // reset our heartbeat timer. -    mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec); -      // Done; see comments sendToTarget discussing error handling behavior      return OK;  } @@ -562,19 +609,22 @@ void AAH_TXGroup::sendHeartbeatIfNeeded() {      Mutex::Autolock lock(mLock);      if (!mHeartbeatTimeout.msecTillTimeout()) { -        sp<TRTPControlPacket> packet = new TRTPControlPacket(); +        sp<TRTPActiveProgramUpdatePacket> packet = +            new TRTPActiveProgramUpdatePacket();          if (packet != NULL) { -            packet->setCommandID(TRTPControlPacket::kCommandNop); +            for (size_t i = 0; i < mActiveClients.size(); ++i) { +                packet->pushProgramID(mActiveClients[i]->getProgramID()); +            } -            // Note: the act of calling sendPacket will reset our heartbeat -            // timer.              sendPacket_l(packet);          } else {              LOGE("Failed to allocate TRTP packet for heartbeat on TX Group with"                   " C&C port %hu", mCmdAndControlPort); -            mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);          } + +        // reset our heartbeat timer. +        mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);      }  } @@ -828,7 +878,7 @@ void AAH_TXGroup::handleJoinGroup(const struct sockaddr_in* src_addr) {      // Looks like we have a new client.  Check to see if we have room to add it      // before proceeding.  If not, send a NAK back so it knows to signal an      // error to its application level. -    if (mUnicastTargets.size() >= kMaxUnicastTargets) { +    if (mUnicastTargets.size() >= kMaxAllowedUnicastTargets) {          uint32_t nak_payload = htonl(kCNC_NakJoinGroupID);          if (sendto(mSocket, &nak_payload, sizeof(nak_payload), @@ -865,7 +915,7 @@ void AAH_TXGroup::handleJoinGroup(const struct sockaddr_in* src_addr) {      LOGI("TXGroup on port %hu added new client at %d.%d.%d.%d:%hu.  "           "Now serving %d/%d unicast clients.",           mCmdAndControlPort, IP_PRINTF_HELPER(addr), port, -         mUnicastTargets.size(), kMaxUnicastTargets); +         mUnicastTargets.size(), kMaxAllowedUnicastTargets);  }  void AAH_TXGroup::handleLeaveGroup(const struct sockaddr_in* src_addr) { @@ -888,41 +938,13 @@ void AAH_TXGroup::handleLeaveGroup(const struct sockaddr_in* src_addr) {              LOGI("TXGroup on port %hu removing client at %d.%d.%d.%d:%hu due to"                   " leave request.  Now serving %d/%d unicast clients.",                   mCmdAndControlPort, IP_PRINTF_HELPER(addr), port, -                 mUnicastTargets.size(), kMaxUnicastTargets); +                 mUnicastTargets.size(), kMaxAllowedUnicastTargets);              return;          }      }  } -void AAH_TXGroup::Timeout::setTimeout(int msec) { -    if (msec < 0) { -        mSystemEndTime = 0; -        return; -    } - -    mSystemEndTime = systemTime() + (static_cast<nsecs_t>(msec) * 1000000); -} - -int AAH_TXGroup::Timeout::msecTillTimeout(nsecs_t nowTime) { -    if (!mSystemEndTime) { -        return -1; -    } - -    if (mSystemEndTime < nowTime) { -        return 0; -    } - -    nsecs_t delta = mSystemEndTime - nowTime; -    delta += 999999; -    delta /= 1000000; -    if (delta > 0x7FFFFFFF) { -        return 0x7FFFFFFF; -    } - -    return static_cast<int>(delta); -} -  AAH_TXGroup::CmdAndControlRXer::CmdAndControlRXer() {      mWakeupEventFD = -1;  } @@ -1003,25 +1025,16 @@ bool AAH_TXGroup::CmdAndControlRXer::threadLoop() {              // Check the heartbeat timeout for this group.              tmp = txGroups[i]->mHeartbeatTimeout.msecTillTimeout(now); -            if (static_cast<unsigned int>(tmp) < -                static_cast<unsigned int>(nextTimeout)) { -                nextTimeout = tmp; -            } +            nextTimeout = minTimeout(nextTimeout, tmp);              // Check the cleanup timeout for this group.              tmp = txGroups[i]->mCleanupTimeout.msecTillTimeout(now); -            if (static_cast<unsigned int>(tmp) < -                static_cast<unsigned int>(nextTimeout)) { -                nextTimeout = tmp; -            } +            nextTimeout = minTimeout(nextTimeout, tmp);          }          // Take into account the common trim timeout.          tmp = mTrimRetryTimeout.msecTillTimeout(now); -        if (static_cast<unsigned int>(tmp) < -            static_cast<unsigned int>(nextTimeout)) { -            nextTimeout = tmp; -        } +        nextTimeout = minTimeout(nextTimeout, tmp);      }      // Step 3: OK - time to wait for there to be something to do.  Release our diff --git a/media/libaah_rtp/aah_tx_group.h b/media/libaah_rtp/aah_tx_group.h index 4736ea0473d9..276e81933f4b 100644 --- a/media/libaah_rtp/aah_tx_group.h +++ b/media/libaah_rtp/aah_tx_group.h @@ -26,12 +26,15 @@  #include <utils/Vector.h>  #include "aah_tx_packet.h" +#include "utils.h"  #define IP_PRINTF_HELPER(a) ((a >> 24) & 0xFF), ((a >> 16) & 0xFF), \                              ((a >>  8) & 0xFF),  (a        & 0xFF)  namespace android { +class AAH_TXPlayer; +  template <typename T> class CircularBuffer {    public:      CircularBuffer(size_t capacity); @@ -69,26 +72,36 @@ class AAH_TXGroup : public virtual RefBase {      // Obtain the instance of the TXGroup whose command and control socket is      // currently listening on the specified port.  Alternatively, if port is 0,      // create a new TXGroup with an ephemerally bound command and control port. -    static sp<AAH_TXGroup> getGroup(uint16_t port); +    static sp<AAH_TXGroup> getGroup(uint16_t port, +                                    const sp<AAH_TXPlayer>& client);      // Obtain the instance of the TXGroup whose multicast transmit target is      // currently set to target, or NULL if no such group exists.  To create a      // new transmit group with a new multicast target address, call getGroup(0)      // followed by setMulticastTXTarget. -    static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target); - -    // AAH_TXGroups successfully obtained using calls to getGroup will have a -    // client reference placed on them on behalf of the caller.  When the caller -    // is finished using the group, they must call dropClientReference to remove -    // this reference.  Once call client references have been released from a TX -    // Group, the group will linger in the system for a short period of time -    // before finally expiring and being cleaned up by the command and control -    // thread. -    //  +    static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target, +                                    const sp<AAH_TXPlayer>& client); + +    // AAH_TXGroups successfully obtained using calls to getGroup will hold a +    // reference back to the client passed to getGroup.  When the client is +    // finished using the group, it must call unregisterClient to release this +    // reference. +    // +    // While there exists active clients of transmit group, the TX group will +    // periodically send heartbeat messages to receiver clients containing the +    // program IDs of the currently active TX Player clients so that receivers +    // have a chance to clean up orphaned programs in the case where all EOS +    // messages got dropped on the way to the receiver. +    // +    // Once all clients references have been released from a TX Group, the group +    // will linger in the system for a short period of time before finally +    // expiring and being cleaned up by the command and control thread. +    //      // TODO : someday, expose the AAH_TXGroup as a top level object in the      // android MediaAPIs so that applications may explicitly manage TX group      // lifecycles instead of relying on this timeout/cleanup mechanism. -    void dropClientReference(); +    void unregisterClient(const sp<AAH_TXPlayer>& client); +      // Fetch the UDP port on which this TXGroup is listening for command and      // control messages.  No need to hold any locks for this, the port is @@ -96,9 +109,6 @@ class AAH_TXGroup : public virtual RefBase {      // afterwards.      uint16_t getCmdAndControlPort() const { return mCmdAndControlPort; } -    // Used by players to obtain a new program ID for this retransmission group. -    uint16_t getNewProgramID(); -      // Assign a TRTP sequence number to the supplied packet and send it to all      // registered clients.  Then place the packet into the RetryBuffer to      // service future client retry requests. @@ -114,30 +124,6 @@ class AAH_TXGroup : public virtual RefBase {      ~AAH_TXGroup();    private: -    // Definition of a helper class used to track things like when we need to -    // transmit a heartbeat, or when we will need to wake up and trim the retry -    // buffers. -    class Timeout { -      public: -        Timeout() : mSystemEndTime(0) { } - -        // Set a timeout which should occur msec milliseconds from now. -        // Negative values will cancel any current timeout; -        void setTimeout(int msec); - -        // Return the number of milliseconds until the timeout occurs, or -1 if -        // no timeout is scheduled. -        int msecTillTimeout(nsecs_t nowTime); -        int msecTillTimeout() { return msecTillTimeout(systemTime()); } - -      private: -        // The systemTime() at which the timeout will be complete, or 0 if no -        // timeout is currently scheduled. -        nsecs_t mSystemEndTime; - -        DISALLOW_EVIL_CONSTRUCTORS(Timeout); -    }; -      // Definition of the singleton command and control receiver who will handle      // requests from RX clients.  Requests include things like unicast group      // management as well as retransmission requests. @@ -228,8 +214,11 @@ class AAH_TXGroup : public virtual RefBase {                              const uint8_t* payload,                              size_t length); -    // Add a client ref to this TX Group and reset its cleanup timer. -    void addClientReference(); +    // Register a player client to this TX Group and reset its cleanup timer. +    bool registerClient(const sp<AAH_TXPlayer>& client); + +    // Obtain a new program ID for a newly registered client. +    uint8_t getNewProgramID();      // Test used by the C&C thread to see if its time to expire and cleanup a      // client. @@ -263,12 +252,12 @@ class AAH_TXGroup : public virtual RefBase {      // Lock we use to serialize access to instance variables.      Mutex mLock; -     +      // The list of packets we hold for servicing retry requests.      RetryBuffer mRetryBuffer; -    // The number of TX Player clients currently using this TX group. -    uint32_t mClientRefCount; +    // The current set of active TX Player clients using this TX group. +    Vector< sp<AAH_TXPlayer> > mActiveClients;      // The sequence number to assign to the next transmitted TRTP packet.      uint16_t mTRTPSeqNumber; @@ -322,10 +311,12 @@ class AAH_TXGroup : public virtual RefBase {      static const int kUnicastClientTimeoutMsec;      static const size_t kRetryBufferCapacity; -    static const size_t kMaxUnicastTargets; +    static const size_t kMaxAllowedUnicastTargets;      static const size_t kInitialUnicastTargetCapacity;      static const size_t kMaxAllowedTXGroups;      static const size_t kInitialActiveTXGroupsCapacity; +    static const size_t kMaxAllowedPlayerClients; +    static const size_t kInitialPlayerClientCapacity;      static const uint32_t kCNC_RetryRequestID;      static const uint32_t kCNC_FastStartRequestID; diff --git a/media/libaah_rtp/aah_tx_packet.cpp b/media/libaah_rtp/aah_tx_packet.cpp index c7ad3e0d2c96..d41ba8c1703c 100644 --- a/media/libaah_rtp/aah_tx_packet.cpp +++ b/media/libaah_rtp/aah_tx_packet.cpp @@ -74,7 +74,7 @@ void TRTPPacket::setEpoch(uint32_t val) {      }  } -void TRTPPacket::setProgramID(uint16_t val) { +void TRTPPacket::setProgramID(uint8_t val) {      CHECK(!mIsPacked);      mProgramID = val;  } @@ -341,4 +341,44 @@ bool TRTPControlPacket::pack() {      return true;  } +bool TRTPActiveProgramUpdatePacket::pushProgramID(uint8_t id) { +    if (mProgramIDCnt >= kMaxProgramIDs) +        return false; + +    mProgramIDs[mProgramIDCnt++] = id; +    return true; +} + +bool TRTPActiveProgramUpdatePacket::pack() { +    if (mIsPacked) { +        return false; +    } + +    // Active program update packets contain a 2-byte command ID, 1 byte of +    // length, and length bytes of program IDs. +    int packetLen = kRTPHeaderLen + +                    TRTPHeaderLen() + +                    mProgramIDCnt + +                    3; + +    mPacket = new uint8_t[packetLen]; +    if (!mPacket) { +        return false; +    } + +    mPacketLen = packetLen; + +    uint8_t* cur = mPacket; + +    writeTRTPHeader(cur, true, packetLen); +    writeU16(cur, mCommandID); +    writeU8(cur,  mProgramIDCnt); +    for (uint8_t i = 0; i < mProgramIDCnt; ++i) { +        writeU8(cur,  mProgramIDs[i]); +    } + +    mIsPacked = true; +    return true; +} +  }  // namespace android diff --git a/media/libaah_rtp/aah_tx_packet.h b/media/libaah_rtp/aah_tx_packet.h index b9f0fe6d2dea..670bf1afcac4 100644 --- a/media/libaah_rtp/aah_tx_packet.h +++ b/media/libaah_rtp/aah_tx_packet.h @@ -62,7 +62,7 @@ class TRTPPacket : public RefBase {      int64_t getPTS() const;      void setEpoch(uint32_t val); -    void setProgramID(uint16_t val); +    void setProgramID(uint8_t val);      void setSubstreamID(uint16_t val);      void setClockTransform(const LinearTransform& trans); @@ -103,7 +103,7 @@ class TRTPPacket : public RefBase {      bool mPTSValid;      int64_t  mPTS;      uint32_t mEpoch; -    uint16_t mProgramID; +    uint8_t mProgramID;      uint16_t mSubstreamID;      LinearTransform mClockTranform;      bool mClockTranformValid; @@ -177,16 +177,37 @@ class TRTPControlPacket : public TRTPPacket {          kCommandNop   = 1,          kCommandFlush = 2,          kCommandEOS   = 3, +        kCommandAPU   = 4,      };      void setCommandID(TRTPCommandID val);      virtual bool pack(); -  private: +  protected: +    explicit TRTPControlPacket(TRTPCommandID commandID) +        : TRTPPacket(kHeaderTypeControl) +        , mCommandID(commandID) {} +      TRTPCommandID mCommandID;  }; +class TRTPActiveProgramUpdatePacket : public TRTPControlPacket { +  public: +    TRTPActiveProgramUpdatePacket() +        : TRTPControlPacket(kCommandAPU) +        , mProgramIDCnt(0) {} + +    virtual bool pack(); +    bool pushProgramID(uint8_t id); + +  private: +    static const uint8_t kMaxProgramIDs = 31; + +    uint8_t mProgramIDCnt; +    uint8_t mProgramIDs[kMaxProgramIDs]; +}; +  }  // namespace android  #endif  // __AAH_TX_PLAYER_H__ diff --git a/media/libaah_rtp/aah_tx_player.cpp b/media/libaah_rtp/aah_tx_player.cpp index c11e5024946b..476687ae39b7 100644 --- a/media/libaah_rtp/aah_tx_player.cpp +++ b/media/libaah_rtp/aah_tx_player.cpp @@ -34,6 +34,7 @@  #include "aah_tx_packet.h"  #include "aah_tx_player.h" +#include "utils.h"  namespace android { @@ -52,6 +53,7 @@ static const int64_t kAAHBufferTimeUs = 1000000LL;  const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs =      kAAHBufferTimeUs * 1100; +const int AAH_TXPlayer::kPauseTSUpdateResendTimeoutMsec = 250;  const int32_t AAH_TXPlayer::kInvokeGetCNCPort = 0xB33977;  sp<MediaPlayerBase> createAAH_TXPlayer() { @@ -521,6 +523,7 @@ status_t AAH_TXPlayer::play_l() {  status_t AAH_TXPlayer::stop() {      status_t ret = pause(); +    mPauseTSUpdateResendTimeout.setTimeout(-1);      sendEOS_l();      return ret;  } @@ -586,11 +589,22 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) {      mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1;      // send a packet announcing the new transform +    sendTSUpdateNop_l(); + +    // if we are paused, schedule a periodic resend of the TS update, JiC the +    // receiveing client misses it. +    if (mPlayRateIsPaused) { +        mPauseTSUpdateResendTimeout.setTimeout(kPauseTSUpdateResendTimeoutMsec); +    } else { +        mPauseTSUpdateResendTimeout.setTimeout(-1); +    } +} + +void AAH_TXPlayer::sendEOS_l() {      if (mAAH_TXGroup != NULL) {          sp<TRTPControlPacket> packet = new TRTPControlPacket();          if (packet != NULL) { -            packet->setClockTransform(mCurrentClockTransform); -            packet->setCommandID(TRTPControlPacket::kCommandNop); +            packet->setCommandID(TRTPControlPacket::kCommandEOS);              sendPacket_l(packet);          } else {              LOGD("Failed to allocate TRTP packet at %s:%d", __FILE__, __LINE__); @@ -598,11 +612,12 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) {      }  } -void AAH_TXPlayer::sendEOS_l() { -    if (mAAH_TXGroup != NULL) { +void AAH_TXPlayer::sendTSUpdateNop_l() { +    if ((mAAH_TXGroup != NULL) && mCurrentClockTransformValid) {          sp<TRTPControlPacket> packet = new TRTPControlPacket();          if (packet != NULL) { -            packet->setCommandID(TRTPControlPacket::kCommandEOS); +            packet->setClockTransform(mCurrentClockTransform); +            packet->setCommandID(TRTPControlPacket::kCommandFlush);              sendPacket_l(packet);          } else {              LOGD("Failed to allocate TRTP packet at %s:%d", __FILE__, __LINE__); @@ -754,20 +769,20 @@ void AAH_TXPlayer::reset_l() {      mIsSeeking = false;      mSeekTimeUs = 0; +    mPauseTSUpdateResendTimeout.setTimeout(-1); +      mUri.setTo("");      mUriHeaders.clear();      mFileSource.clear();      mBitrate = -1; -    mProgramID = 0;      if (mAAH_TXGroup != NULL) { -        LOGI("TXPlayer leaving TXGroup listening on C&C port %hu", -             mAAH_TXGroup->getCmdAndControlPort()); -        mAAH_TXGroup->dropClientReference(); +        mAAH_TXGroup->unregisterClient(sp<AAH_TXPlayer>(this));          mAAH_TXGroup = NULL;      } +    mProgramID = 0;      mLastQueuedMediaTimePTSValid = false;      mCurrentClockTransformValid = false; @@ -873,6 +888,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(      uint32_t addr = ntohl(endpoint->sin_addr.s_addr);      uint16_t port = ntohs(endpoint->sin_port); +    sp<AAH_TXPlayer> thiz(this);      if ((addr & 0xF0000000) == 0xE0000000) {          // Starting in multicast mode?  We need to have a specified port to          // multicast to, so sanity check that first.  Then search for an @@ -884,10 +900,11 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(              return BAD_VALUE;          } -        mAAH_TXGroup = AAH_TXGroup::getGroup(endpoint); +        mAAH_TXGroup = AAH_TXGroup::getGroup(endpoint, thiz);          if (mAAH_TXGroup == NULL) {              // No pre-existing group.  Make a new one. -            mAAH_TXGroup = AAH_TXGroup::getGroup(static_cast<uint16_t>(0)); +            mAAH_TXGroup = AAH_TXGroup::getGroup(static_cast<uint16_t>(0), +                                                 thiz);              // Still no group?  Thats bad.  We probably have exceeded our limit              // on the number of simultaneous TX groups. @@ -907,7 +924,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(      } else if (addr == INADDR_ANY) {          // Starting in unicast mode.  A port of 0 means we need to create a new          // group, a non-zero port means that we want to join an existing one. -        mAAH_TXGroup = AAH_TXGroup::getGroup(port); +        mAAH_TXGroup = AAH_TXGroup::getGroup(port, thiz);          if (mAAH_TXGroup == NULL) {              if (port) { @@ -929,7 +946,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(      }      CHECK(mAAH_TXGroup != NULL); -    mProgramID = mAAH_TXGroup->getNewProgramID(); +    CHECK(mProgramID != 0);      return OK;  } @@ -1111,6 +1128,8 @@ void AAH_TXPlayer::onPumpAudio() {          // of good options here.  For now, signal an error up to the app level          // and shut down the transmission pump.          int64_t commonTimeNow; +        int64_t mediaTimeNow; +        bool mediaTimeNowValid = false;          if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {              // Failed to get common time; either the service is down or common              // time is not synced.  Raise an error and shutdown the player. @@ -1121,20 +1140,34 @@ void AAH_TXPlayer::onPumpAudio() {              break;          } -        if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) { -            int64_t mediaTimeNow; -            bool conversionResult = mCurrentClockTransform.doReverseTransform( -                                        commonTimeNow, -                                        &mediaTimeNow); -            CHECK(conversionResult); +        if (mCurrentClockTransformValid) { +            mediaTimeNowValid = mCurrentClockTransform.doReverseTransform( +                                    commonTimeNow, +                                    &mediaTimeNow); +            CHECK(mediaTimeNowValid); +        } -            if ((mediaTimeNow + -                 kAAHBufferTimeUs - -                 mLastQueuedMediaTimePTS) <= 0) { -                break; +        // Has our pause-timestamp-update timer fired?  If so, take appropriate +        // action. +        if (!mPauseTSUpdateResendTimeout.msecTillTimeout()) { +            if (mPlayRateIsPaused) { +                // Send the update and schedule the next update. +                sendTSUpdateNop_l(); +                mPauseTSUpdateResendTimeout.setTimeout( +                        kPauseTSUpdateResendTimeoutMsec); +            } else { +                // Not paused; cancel the timer so it does not bug us anymore. +                mPauseTSUpdateResendTimeout.setTimeout(-1);              }          } +        // Stop if we have reached our buffer threshold. +        if (mediaTimeNowValid && +            mLastQueuedMediaTimePTSValid && +           (mediaTimeNow + kAAHBufferTimeUs - mLastQueuedMediaTimePTS) <= 0) { +            break; +        } +          MediaSource::ReadOptions options;          if (mIsSeeking) {              options.setSeekTo(mSeekTimeUs); diff --git a/media/libaah_rtp/aah_tx_player.h b/media/libaah_rtp/aah_tx_player.h index 210e8ff22e0a..05dcd73322f3 100644 --- a/media/libaah_rtp/aah_tx_player.h +++ b/media/libaah_rtp/aah_tx_player.h @@ -66,6 +66,9 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {      virtual status_t    setRetransmitEndpoint(const struct sockaddr_in*                                                endpoint); +    void    setProgramID(uint8_t programID) { mProgramID = programID; } +    uint8_t getProgramID() const            { return mProgramID; } +      static const int64_t kAAHRetryKeepAroundTimeNs;      static const int32_t kInvokeGetCNCPort; @@ -103,6 +106,7 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {      status_t seekTo_l(int64_t timeUs);      void updateClockTransform_l(bool pause);      void sendEOS_l(); +    void sendTSUpdateNop_l();      void cancelPlayerEvents(bool keepBufferingGoing = false);      void reset_l();      void notifyListener_l(int msg, int ext1 = 0, int ext2 = 0); @@ -139,6 +143,8 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {      bool mIsSeeking;      int64_t mSeekTimeUs; +    Timeout mPauseTSUpdateResendTimeout; +      sp<TimedEventQueue::Event> mPumpAudioEvent;      bool mPumpAudioEventPending; @@ -162,9 +168,11 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {      bool             mPlayRateIsPaused;      CCHelper         mCCHelper; -    uint16_t mProgramID; +    uint8_t mProgramID;      uint8_t mTRTPVolume; +    static const int kPauseTSUpdateResendTimeoutMsec; +      DISALLOW_EVIL_CONSTRUCTORS(AAH_TXPlayer);  }; diff --git a/media/libaah_rtp/utils.cpp b/media/libaah_rtp/utils.cpp new file mode 100644 index 000000000000..3ed259944cf1 --- /dev/null +++ b/media/libaah_rtp/utils.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils.h" + +namespace android { + +void Timeout::setTimeout(int msec) { +    if (msec < 0) { +        mSystemEndTime = 0; +        return; +    } + +    mSystemEndTime = systemTime() + (static_cast<nsecs_t>(msec) * 1000000); +} + +int Timeout::msecTillTimeout(nsecs_t nowTime) { +    if (!mSystemEndTime) { +        return -1; +    } + +    if (mSystemEndTime < nowTime) { +        return 0; +    } + +    nsecs_t delta = mSystemEndTime - nowTime; +    delta += 999999; +    delta /= 1000000; +    if (delta > 0x7FFFFFFF) { +        return 0x7FFFFFFF; +    } + +    return static_cast<int>(delta); +} + +}  // namespace android diff --git a/media/libaah_rtp/utils.h b/media/libaah_rtp/utils.h new file mode 100644 index 000000000000..1e3d326612f6 --- /dev/null +++ b/media/libaah_rtp/utils.h @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __UTILS_H__ +#define __UTILS_H__ + +#include <netinet/in.h> + +#include <media/stagefright/foundation/ABase.h> +#include <utils/Timers.h> + +namespace android { + +// Definition of a helper class used to track things like when we need to +// transmit a heartbeat, or when we will need to wake up and trim the retry +// buffers. +class Timeout { +  public: +    Timeout() : mSystemEndTime(0) { } + +    // Set a timeout which should occur msec milliseconds from now. +    // Negative values will cancel any current timeout; +    void setTimeout(int msec); + +    // Return the number of milliseconds until the timeout occurs, or -1 if +    // no timeout is scheduled. +    int msecTillTimeout(nsecs_t nowTime); +    int msecTillTimeout() { return msecTillTimeout(systemTime()); } + +  private: +    // The systemTime() at which the timeout will be complete, or 0 if no +    // timeout is currently scheduled. +    nsecs_t mSystemEndTime; + +    DISALLOW_EVIL_CONSTRUCTORS(Timeout); +}; + +inline bool matchSockaddrs(const struct sockaddr_in* a, +                           const struct sockaddr_in* b) { +    return ((a->sin_family      == b->sin_family)      && +            (a->sin_addr.s_addr == b->sin_addr.s_addr) && +            (a->sin_port        == b->sin_port)); +} + +// Return the minimum timeout between a and b where timeouts less than 0 are +// considered to be infinite +inline int minTimeout(int a, int b) { +    if (a < 0) { +        return b; +    } + +    if (b < 0) { +        return a; +    } + +    return ((a < b) ? a : b); +} + +}  // namespace android + +#endif  // __UTILS_H__  |