diff options
| -rw-r--r-- | media/libaah_rtp/Android.mk | 3 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player.h | 43 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player_core.cpp | 184 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player_ring_buffer.cpp | 17 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_rx_player_substream.cpp | 8 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_group.cpp | 181 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_group.h | 81 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_packet.cpp | 42 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_packet.h | 27 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_player.cpp | 79 | ||||
| -rw-r--r-- | media/libaah_rtp/aah_tx_player.h | 10 | ||||
| -rw-r--r-- | media/libaah_rtp/utils.cpp | 49 | ||||
| -rw-r--r-- | media/libaah_rtp/utils.h | 74 |
13 files changed, 555 insertions, 243 deletions
diff --git a/media/libaah_rtp/Android.mk b/media/libaah_rtp/Android.mk index 44351edef622..361c073d3093 100644 --- a/media/libaah_rtp/Android.mk +++ b/media/libaah_rtp/Android.mk @@ -17,7 +17,8 @@ LOCAL_SRC_FILES := \ aah_tx_group.cpp \ aah_tx_packet.cpp \ aah_tx_player.cpp \ - pipe_event.cpp + pipe_event.cpp \ + utils.cpp LOCAL_C_INCLUDES := \ frameworks/base/include \ diff --git a/media/libaah_rtp/aah_rx_player.h b/media/libaah_rtp/aah_rx_player.h index 53361b47707f..c9d0eb49978d 100644 --- a/media/libaah_rtp/aah_rx_player.h +++ b/media/libaah_rtp/aah_rx_player.h @@ -31,6 +31,7 @@ #include "aah_decoder_pump.h" #include "pipe_event.h" +#include "utils.h" namespace android { @@ -173,8 +174,7 @@ class AAH_RXPlayer : public MediaPlayerInterface { bool waiting_for_fast_start_; bool fetched_first_packet_; - uint64_t rtp_activity_timeout_; - bool rtp_activity_timeout_valid_; + Timeout rtp_activity_timeout_; DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer); }; @@ -194,9 +194,26 @@ class AAH_RXPlayer : public MediaPlayerInterface { bool isAboutToUnderflow(); uint32_t getSSRC() const { return ssrc_; } - uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } + uint8_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; } status_t getStatus() const { return status_; } + void clearInactivityTimeout() { + inactivity_timeout_.setTimeout(-1); + } + + void resetInactivityTimeout() { + inactivity_timeout_.setTimeout(kInactivityTimeoutMsec); + } + + bool shouldExpire() { + // Substreams should always have a positive time until timeout. A + // timeout value of 0 indicates that the timer has expired, while a + // negative timeout (normally meaning no timeout) is used by some of + // the core code to implement a mark and sweep pattern for cleaning + // out no longer relevant substreams. + return (inactivity_timeout_.msecTillTimeout() <= 0); + } + protected: virtual ~Substream(); @@ -228,8 +245,10 @@ class AAH_RXPlayer : public MediaPlayerInterface { uint32_t aux_data_expected_size_; sp<AAH_DecoderPump> decoder_; + Timeout inactivity_timeout_; - static int64_t kAboutToUnderflowThreshold; + static const int64_t kAboutToUnderflowThreshold; + static const int kInactivityTimeoutMsec; DISALLOW_EVIL_CONSTRUCTORS(Substream); }; @@ -247,7 +266,8 @@ class AAH_RXPlayer : public MediaPlayerInterface { void processRingBuffer(); void processCommandPacket(PacketBuffer* pb); bool processGaps(); - int computeNextGapRetransmitTimeout(); + void setGapStatus(GapStatus status); + void cleanoutExpiredSubstreams(); void fetchAudioFlinger(); PipeEvent wakeup_work_thread_evt_; @@ -268,7 +288,9 @@ class AAH_RXPlayer : public MediaPlayerInterface { SeqNoGap current_gap_; GapStatus current_gap_status_; - uint64_t next_retrans_req_time_; + Timeout next_retrans_req_timeout_; + + Timeout ss_cleanout_timeout_; RXRingBuffer ring_buffer_; SubstreamVec substreams_; @@ -282,15 +304,14 @@ class AAH_RXPlayer : public MediaPlayerInterface { static const uint32_t kRetransRequestMagic; static const uint32_t kFastStartRequestMagic; static const uint32_t kRetransNAKMagic; - static const uint32_t kGapRerequestTimeoutUSec; - static const uint32_t kFastStartTimeoutUSec; - static const uint32_t kRTPActivityTimeoutUSec; + static const uint32_t kGapRerequestTimeoutMsec; + static const uint32_t kFastStartTimeoutMsec; + static const uint32_t kRTPActivityTimeoutMsec; + static const uint32_t kSSCleanoutTimeoutMsec; static const uint32_t INVOKE_GET_MASTER_VOLUME = 3; static const uint32_t INVOKE_SET_MASTER_VOLUME = 4; - static uint64_t monotonicUSecNow(); - DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer); }; diff --git a/media/libaah_rtp/aah_rx_player_core.cpp b/media/libaah_rtp/aah_rx_player_core.cpp index 616df0fb3684..33a548a0a3a8 100644 --- a/media/libaah_rtp/aah_rx_player_core.cpp +++ b/media/libaah_rtp/aah_rx_player_core.cpp @@ -37,9 +37,10 @@ const uint32_t AAH_RXPlayer::kRetransNAKMagic = FOURCC('T','n','a','k'); const uint32_t AAH_RXPlayer::kFastStartRequestMagic = FOURCC('T','f','s','t'); -const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000; -const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000; -const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000; +const uint32_t AAH_RXPlayer::kGapRerequestTimeoutMsec = 75; +const uint32_t AAH_RXPlayer::kFastStartTimeoutMsec = 800; +const uint32_t AAH_RXPlayer::kRTPActivityTimeoutMsec = 10000; +const uint32_t AAH_RXPlayer::kSSCleanoutTimeoutMsec = 1000; static inline int16_t fetchInt16(uint8_t* data) { return static_cast<int16_t>(U16_AT(data)); @@ -53,20 +54,10 @@ static inline int64_t fetchInt64(uint8_t* data) { return static_cast<int64_t>(U64_AT(data)); } -uint64_t AAH_RXPlayer::monotonicUSecNow() { - struct timespec now; - int res = clock_gettime(CLOCK_MONOTONIC, &now); - CHECK(res >= 0); - - uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000; - ret += now.tv_nsec / 1000; - - return ret; -} - status_t AAH_RXPlayer::startWorkThread() { status_t res; stopWorkThread(); + ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec); res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO); if (res != OK) { @@ -125,7 +116,7 @@ void AAH_RXPlayer::resetPipeline() { substreams_.clear(); - current_gap_status_ = kGS_NoGap; + setGapStatus(kGS_NoGap); } bool AAH_RXPlayer::setupSocket() { @@ -231,25 +222,26 @@ bool AAH_RXPlayer::threadLoop() { while (!thread_wrapper_->exitPending()) { // Step 1: Wait until there is something to do. - int gap_timeout = computeNextGapRetransmitTimeout(); + int gap_timeout = next_retrans_req_timeout_.msecTillTimeout(); int ring_timeout = ring_buffer_.computeInactivityTimeout(); + int ss_cleanout_timeout = ss_cleanout_timeout_.msecTillTimeout(); int timeout = -1; if (!ring_timeout) { LOGW("RTP inactivity timeout reached, resetting pipeline."); resetPipeline(); - timeout = gap_timeout; - } else { - if (gap_timeout < 0) { - timeout = ring_timeout; - } else if (ring_timeout < 0) { - timeout = gap_timeout; - } else { - timeout = (gap_timeout < ring_timeout) ? gap_timeout - : ring_timeout; - } + continue; + } + + if (!ss_cleanout_timeout) { + cleanoutExpiredSubstreams(); + continue; } + timeout = minTimeout(gap_timeout, timeout); + timeout = minTimeout(ring_timeout, timeout); + timeout = minTimeout(ss_cleanout_timeout, timeout); + if ((0 != timeout) && (!process_more_right_now)) { // Set up the events to wait on. Start with the wakeup pipe. memset(&poll_fds, 0, sizeof(poll_fds)); @@ -564,7 +556,7 @@ void AAH_RXPlayer::processRingBuffer() { } } - // Is this a command packet? If so, its not necessarily associate + // Is this a command packet? If so, its not necessarily associated // with one particular substream. Just give it to the command // packet handler and then move on. if (4 == payload_type) { @@ -620,7 +612,7 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) { } uint8_t trtp_version = data[12]; - uint8_t trtp_flags = data[13] & 0xF; + uint8_t trtp_flags = data[13] & 0xF; if (1 != trtp_version) { LOGV("Dropping packet, bad trtp version %hhu", trtp_version); @@ -643,32 +635,85 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) { return; } + bool do_cleanup_pass = false; uint16_t command_id = U16_AT(data + offset); + offset += 2; switch (command_id) { case TRTPControlPacket::kCommandNop: + // Note: NOPs are frequently used to carry timestamp transformation + // updates. If there was a timestamp transform attached to this + // payload, it was already taken care of by processRX. break; case TRTPControlPacket::kCommandEOS: + // TODO need to differentiate between flush and EOS. Substreams + // which have hit EOS need a chance to drain before being destroyed. + case TRTPControlPacket::kCommandFlush: { - uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; + uint8_t program_id = (U32_AT(data + 8) >> 5) & 0x1F; LOGI("*** %s flushing program_id=%d", __PRETTY_FUNCTION__, program_id); - Vector<uint32_t> substreams_to_remove; + // Flag any programs with the given program ID for cleanup. for (size_t i = 0; i < substreams_.size(); ++i) { - sp<Substream> iter = substreams_.valueAt(i); - if (iter->getProgramID() == program_id) { - iter->shutdown(); - substreams_to_remove.add(iter->getSSRC()); + const sp<Substream>& stream = substreams_.valueAt(i); + if (stream->getProgramID() == program_id) { + stream->clearInactivityTimeout(); } } - for (size_t i = 0; i < substreams_to_remove.size(); ++i) { - substreams_.removeItem(substreams_to_remove[i]); + // Make sure we do our cleanup pass at the end of this. + do_cleanup_pass = true; + } break; + + case TRTPControlPacket::kCommandAPU: { + // Active program update packet. Go over all of our substreams and + // either reset the inactivity timer for the substreams listed in + // this update packet, or clear the inactivity timer for the + // substreams not listed in this update packet. A cleared + // inactivity timer will flag a substream for deletion in the + // cleanup pass at the end of this function. + + // The packet must contain at least the 1 byte numActivePrograms + // field. + if (amt < offset + 1) { + return; } + uint8_t numActivePrograms = data[offset++]; + + // If the payload is not long enough to contain the list it promises + // to have, just skip it. + if (amt < (offset + numActivePrograms)) { + return; + } + + // Clear all inactivity timers. + for (size_t i = 0; i < substreams_.size(); ++i) { + const sp<Substream>& stream = substreams_.valueAt(i); + stream->clearInactivityTimeout(); + } + + // Now go over the list of active programs and reset the inactivity + // timers for those streams which are currently in the active + // program update packet. + for (uint8_t j = 0; j < numActivePrograms; ++j) { + uint8_t pid = (data[offset + j] & 0x1F); + for (size_t i = 0; i < substreams_.size(); ++i) { + const sp<Substream>& stream = substreams_.valueAt(i); + if (stream->getProgramID() == pid) { + stream->resetInactivityTimeout(); + } + } + } + + // Make sure we do our cleanup pass at the end of this. + do_cleanup_pass = true; } break; } + + if (do_cleanup_pass) + cleanoutExpiredSubstreams(); } bool AAH_RXPlayer::processGaps() { @@ -705,18 +750,18 @@ bool AAH_RXPlayer::processGaps() { // this gap and move on. if (!send_retransmit_request && (kGS_NoGap != current_gap_status_) && - (0 == computeNextGapRetransmitTimeout())) { - + (0 == next_retrans_req_timeout_.msecTillTimeout())) { // If out current gap is the fast-start gap, don't bother to skip it // because substreams look like the are about to underflow. if ((kGS_FastStartGap != gap_status) || (current_gap_.end_seq_ != gap.end_seq_)) { + for (size_t i = 0; i < substreams_.size(); ++i) { if (substreams_.valueAt(i)->isAboutToUnderflow()) { - LOGV("About to underflow, giving up on gap [%hu, %hu]", + LOGI("About to underflow, giving up on gap [%hu, %hu]", gap.start_seq_, gap.end_seq_); ring_buffer_.processNAK(); - current_gap_status_ = kGS_NoGap; + setGapStatus(kGS_NoGap); return true; } } @@ -727,7 +772,7 @@ bool AAH_RXPlayer::processGaps() { send_retransmit_request = true; } } else { - current_gap_status_ = kGS_NoGap; + setGapStatus(kGS_NoGap); } if (send_retransmit_request) { @@ -738,7 +783,7 @@ bool AAH_RXPlayer::processGaps() { (current_gap_.end_seq_ == gap.end_seq_)) { LOGV("Fast start is taking forever; giving up."); ring_buffer_.processNAK(); - current_gap_status_ = kGS_NoGap; + setGapStatus(kGS_NoGap); return true; } @@ -777,32 +822,53 @@ bool AAH_RXPlayer::processGaps() { // Update the current gap info. current_gap_ = gap; - current_gap_status_ = gap_status; - next_retrans_req_time_ = monotonicUSecNow() + - ((kGS_FastStartGap == current_gap_status_) - ? kFastStartTimeoutUSec - : kGapRerequestTimeoutUSec); + setGapStatus(gap_status); } return false; } -// Compute when its time to send the next gap retransmission in milliseconds. -// Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit -// right now. -int AAH_RXPlayer::computeNextGapRetransmitTimeout() { - if (kGS_NoGap == current_gap_status_) { - return -1; - } +void AAH_RXPlayer::setGapStatus(GapStatus status) { + current_gap_status_ = status; + + switch(current_gap_status_) { + case kGS_NormalGap: + next_retrans_req_timeout_.setTimeout(kGapRerequestTimeoutMsec); + break; - int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow(); + case kGS_FastStartGap: + next_retrans_req_timeout_.setTimeout(kFastStartTimeoutMsec); + break; - timeout_delta /= 1000; - if (timeout_delta <= 0) { - return 0; + case kGS_NoGap: + default: + next_retrans_req_timeout_.setTimeout(-1); + break; } +} + +void AAH_RXPlayer::cleanoutExpiredSubstreams() { + static const size_t kMaxPerPass = 32; + uint32_t to_remove[kMaxPerPass]; + size_t cnt, i; + + do { + for (i = 0, cnt = 0; + (i < substreams_.size()) && (cnt < kMaxPerPass); + ++i) { + const sp<Substream>& stream = substreams_.valueAt(i); + if (stream->shouldExpire()) { + to_remove[cnt++] = stream->getSSRC(); + } + } + + for (i = 0; i < cnt; ++i) { + LOGI("Purging substream with SSRC 0x%08x", to_remove[i]); + substreams_.removeItem(to_remove[i]); + } + } while (cnt >= kMaxPerPass); - return static_cast<uint32_t>(timeout_delta); + ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec); } } // namespace android diff --git a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp index 22467f79e8d1..b59185060c4b 100644 --- a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp +++ b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp @@ -53,7 +53,7 @@ void AAH_RXPlayer::RXRingBuffer::reset() { rd_seq_known_ = false; waiting_for_fast_start_ = true; fetched_first_packet_ = false; - rtp_activity_timeout_valid_ = false; + rtp_activity_timeout_.setTimeout(-1); } bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, @@ -62,8 +62,7 @@ bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf, CHECK(NULL != ring_); CHECK(NULL != buf); - rtp_activity_timeout_valid_ = true; - rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec; + rtp_activity_timeout_.setTimeout(kRTPActivityTimeoutMsec); // If the ring buffer is totally reset (we have never received a single // payload) then we don't know the rd sequence number and this should be @@ -328,17 +327,7 @@ void AAH_RXPlayer::RXRingBuffer::processNAK(SeqNoGap* nak) { int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() { AutoMutex lock(&lock_); - - if (!rtp_activity_timeout_valid_) { - return -1; - } - - uint64_t now = monotonicUSecNow(); - if (rtp_activity_timeout_ <= now) { - return 0; - } - - return (rtp_activity_timeout_ - now) / 1000; + return rtp_activity_timeout_.msecTillTimeout(); } AAH_RXPlayer::PacketBuffer* diff --git a/media/libaah_rtp/aah_rx_player_substream.cpp b/media/libaah_rtp/aah_rx_player_substream.cpp index 3e3a95c47620..c72215b0d3eb 100644 --- a/media/libaah_rtp/aah_rx_player_substream.cpp +++ b/media/libaah_rtp/aah_rx_player_substream.cpp @@ -33,8 +33,9 @@ namespace android { -int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = +const int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold = 50ull * 1000; +const int AAH_RXPlayer::Substream::kInactivityTimeoutMsec = 10000; AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { ssrc_ = ssrc; @@ -54,6 +55,7 @@ AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) { // cleanupBufferInProgress will reset most of the internal state variables. // Just need to make sure that buffer_in_progress_ is NULL before calling. cleanupBufferInProgress(); + resetInactivityTimeout(); } AAH_RXPlayer::Substream::~Substream() { @@ -108,6 +110,8 @@ void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf, return; } + resetInactivityTimeout(); + // Do we have a buffer in progress already? If so, abort the buffer. In // theory, this should never happen. If there were a discontinutity in the // stream, the discon in the seq_nos at the RTP level should have already @@ -362,6 +366,8 @@ void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf, return; } + resetInactivityTimeout(); + if (NULL == buffer_in_progress_) { LOGV("TRTP Receiver skipping payload continuation; no buffer currently" " in progress."); diff --git a/media/libaah_rtp/aah_tx_group.cpp b/media/libaah_rtp/aah_tx_group.cpp index 22dd0260a994..2e3df0a73d5a 100644 --- a/media/libaah_rtp/aah_tx_group.cpp +++ b/media/libaah_rtp/aah_tx_group.cpp @@ -29,6 +29,7 @@ #include "aah_tx_group.h" #include "aah_tx_player.h" +#include "utils.h" //#define DROP_PACKET_TEST #ifdef DROP_PACKET_TEST @@ -101,15 +102,17 @@ static ssize_t droptest_recvfrom(int sockfd, void *buf, namespace android { const int AAH_TXGroup::kRetryTrimIntervalMsec = 100; -const int AAH_TXGroup::kHeartbeatIntervalMsec = 1000; +const int AAH_TXGroup::kHeartbeatIntervalMsec = 500; const int AAH_TXGroup::kTXGroupLingerTimeMsec = 10000; const int AAH_TXGroup::kUnicastClientTimeoutMsec = 5000; const size_t AAH_TXGroup::kRetryBufferCapacity = 100; -const size_t AAH_TXGroup::kMaxUnicastTargets = 16; +const size_t AAH_TXGroup::kMaxAllowedUnicastTargets = 16; const size_t AAH_TXGroup::kInitialUnicastTargetCapacity = 4; -const size_t AAH_TXGroup::kMaxAllowedTXGroups = 16; +const size_t AAH_TXGroup::kMaxAllowedTXGroups = 8; const size_t AAH_TXGroup::kInitialActiveTXGroupsCapacity = 4; +const size_t AAH_TXGroup::kMaxAllowedPlayerClients = 4; +const size_t AAH_TXGroup::kInitialPlayerClientCapacity = 2; const uint32_t AAH_TXGroup::kCNC_RetryRequestID = 'Treq'; const uint32_t AAH_TXGroup::kCNC_FastStartRequestID = 'Tfst'; @@ -124,33 +127,24 @@ sp<AAH_TXGroup::CmdAndControlRXer> AAH_TXGroup::mCmdAndControlRXer; uint32_t AAH_TXGroup::sNextEpoch; bool AAH_TXGroup::sNextEpochValid = false; -static inline bool matchSockaddrs(const struct sockaddr_in* a, - const struct sockaddr_in* b) { - CHECK(NULL != a); - CHECK(NULL != b); - return ((a->sin_family == b->sin_family) && - (a->sin_addr.s_addr == b->sin_addr.s_addr) && - (a->sin_port == b->sin_port)); -} - AAH_TXGroup::AAH_TXGroup() : mRetryBuffer(kRetryBufferCapacity) { // Initialize members with no constructor to sensible defaults. - mClientRefCount = 0; mTRTPSeqNumber = 0; mNextProgramID = 1; mEpoch = getNextEpoch(); mMulticastTargetValid = false; mSocket = -1; mCmdAndControlPort = 0; - mClientRefCount = 0; mUnicastTargets.setCapacity(kInitialUnicastTargetCapacity); + mActiveClients.setCapacity(kInitialPlayerClientCapacity); + mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec); } AAH_TXGroup::~AAH_TXGroup() { - CHECK(0 == mClientRefCount); + CHECK(mActiveClients.size() == 0); if (mSocket >= 0) { ::close(mSocket); @@ -237,7 +231,8 @@ bailout: return ret_val; } -sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) { +sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port, + const sp<AAH_TXPlayer>& client) { sp<AAH_TXGroup> ret_val; // If port is non-zero, we are creating a new group. Otherwise, we are @@ -250,7 +245,13 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) { for (size_t i = 0; i < sActiveTXGroups.size(); ++i) { if (port == sActiveTXGroups[i]->getCmdAndControlPort()) { ret_val = sActiveTXGroups[i]; - ret_val->addClientReference(); + + if (!ret_val->registerClient(client)) { + // No need to log an error, registerClient has already done + // so for us. + ret_val = NULL; + } + break; } } @@ -312,6 +313,13 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) { } } + // Register the client with the newly created group. + if (!ret_val->registerClient(client)) { + // No need to log an error, registerClient has already done so + // for us. + goto bailout; + } + // Make sure we are at least at minimum capacity in the // ActiveTXGroups vector. if (sActiveTXGroups.capacity() < kInitialActiveTXGroupsCapacity) { @@ -321,11 +329,10 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) { // Add ourselves to the list of active TXGroups. if (sActiveTXGroups.add(ret_val) < 0) { LOGE("Failed to add new TX Group to Active Group list"); + ret_val->unregisterClient(client); goto bailout; } - ret_val->addClientReference(); - LOGI("Created TX Group with C&C Port %hu. %d/%d groups now" " active.", ret_val->getCmdAndControlPort(), sActiveTXGroups.size(), kMaxAllowedTXGroups); @@ -342,7 +349,8 @@ bailout: return sp<AAH_TXGroup>(NULL); } -sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target) { +sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target, + const sp<AAH_TXPlayer>& client) { // Hold the static lock while we search for a TX Group which has the // multicast target passed to us. Mutex::Autolock lock(sLock); @@ -359,32 +367,71 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target) { } } - if (ret_val != NULL) - ret_val->addClientReference(); + if (ret_val != NULL) { + if (!ret_val->registerClient(client)) { + // No need to log an error, registerClient has already done so for + // us. + ret_val = NULL; + } + } return ret_val; } -void AAH_TXGroup::dropClientReference() { +void AAH_TXGroup::unregisterClient(const sp<AAH_TXPlayer>& client) { Mutex::Autolock lock(mLock); - CHECK(mClientRefCount > 0); - --mClientRefCount; - if (!mClientRefCount) { + LOGI("TXPlayer leaving TXGroup listening on C&C port %hu", + mCmdAndControlPort); + + bool found_it = false; + for (size_t i = 0; i < mActiveClients.size(); ++i) { + if (mActiveClients[i].get() == client.get()) { + found_it = true; + mActiveClients.removeAt(i); + break; + } + } + CHECK(found_it); + + if (!mActiveClients.size()) { mCleanupTimeout.setTimeout(kTXGroupLingerTimeMsec); } } -void AAH_TXGroup::addClientReference() { +bool AAH_TXGroup::registerClient(const sp<AAH_TXPlayer>& client) { + // ASSERT holding sLock Mutex::Autolock lock(mLock); - ++mClientRefCount; + + CHECK(client != NULL); + + // Check the client limit. + if (mActiveClients.size() >= kMaxAllowedPlayerClients) { + LOGE("Cannot register new client with C&C group listening on port %hu." + " %d/%d clients are already active", mCmdAndControlPort, + mActiveClients.size(), kMaxAllowedPlayerClients); + return false; + } + + // Try to add the client to the list. + if (mActiveClients.add(client) < 0) { + LOGE("Failed to register new client with C&C group listening on port" + " %hu. %d/%d clients are currently active", mCmdAndControlPort, + mActiveClients.size(), kMaxAllowedPlayerClients); + return false; + } + + // Assign our new client's program ID, cancel the cleanup timeout and get + // out. + client->setProgramID(getNewProgramID()); mCleanupTimeout.setTimeout(-1); + return true; } bool AAH_TXGroup::shouldExpire() { Mutex::Autolock lock(mLock); - if (mClientRefCount) { + if (mActiveClients.size()) { return false; } @@ -395,9 +442,12 @@ bool AAH_TXGroup::shouldExpire() { return true; } -uint16_t AAH_TXGroup::getNewProgramID() { - int tmp = android_atomic_inc(&mNextProgramID); - return static_cast<uint16_t>(tmp & 0xFFFF); +uint8_t AAH_TXGroup::getNewProgramID() { + uint8_t tmp; + do { + tmp = static_cast<uint8_t>(android_atomic_inc(&mNextProgramID) & 0x1F); + } while(!tmp); + return tmp; } status_t AAH_TXGroup::sendPacket(const sp<TRTPPacket>& packet) { @@ -449,13 +499,10 @@ status_t AAH_TXGroup::sendPacket_l(const sp<TRTPPacket>& packet) { LOGI("TXGroup on port %hu removing client at %d.%d.%d.%d:%hu due to" " timeout. Now serving %d/%d unicast clients.", mCmdAndControlPort, IP_PRINTF_HELPER(addr), port, - mUnicastTargets.size(), kMaxUnicastTargets); + mUnicastTargets.size(), kMaxAllowedUnicastTargets); } } - // reset our heartbeat timer. - mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec); - // Done; see comments sendToTarget discussing error handling behavior return OK; } @@ -562,19 +609,22 @@ void AAH_TXGroup::sendHeartbeatIfNeeded() { Mutex::Autolock lock(mLock); if (!mHeartbeatTimeout.msecTillTimeout()) { - sp<TRTPControlPacket> packet = new TRTPControlPacket(); + sp<TRTPActiveProgramUpdatePacket> packet = + new TRTPActiveProgramUpdatePacket(); if (packet != NULL) { - packet->setCommandID(TRTPControlPacket::kCommandNop); + for (size_t i = 0; i < mActiveClients.size(); ++i) { + packet->pushProgramID(mActiveClients[i]->getProgramID()); + } - // Note: the act of calling sendPacket will reset our heartbeat - // timer. sendPacket_l(packet); } else { LOGE("Failed to allocate TRTP packet for heartbeat on TX Group with" " C&C port %hu", mCmdAndControlPort); - mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec); } + + // reset our heartbeat timer. + mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec); } } @@ -828,7 +878,7 @@ void AAH_TXGroup::handleJoinGroup(const struct sockaddr_in* src_addr) { // Looks like we have a new client. Check to see if we have room to add it // before proceeding. If not, send a NAK back so it knows to signal an // error to its application level. - if (mUnicastTargets.size() >= kMaxUnicastTargets) { + if (mUnicastTargets.size() >= kMaxAllowedUnicastTargets) { uint32_t nak_payload = htonl(kCNC_NakJoinGroupID); if (sendto(mSocket, &nak_payload, sizeof(nak_payload), @@ -865,7 +915,7 @@ void AAH_TXGroup::handleJoinGroup(const struct sockaddr_in* src_addr) { LOGI("TXGroup on port %hu added new client at %d.%d.%d.%d:%hu. " "Now serving %d/%d unicast clients.", mCmdAndControlPort, IP_PRINTF_HELPER(addr), port, - mUnicastTargets.size(), kMaxUnicastTargets); + mUnicastTargets.size(), kMaxAllowedUnicastTargets); } void AAH_TXGroup::handleLeaveGroup(const struct sockaddr_in* src_addr) { @@ -888,41 +938,13 @@ void AAH_TXGroup::handleLeaveGroup(const struct sockaddr_in* src_addr) { LOGI("TXGroup on port %hu removing client at %d.%d.%d.%d:%hu due to" " leave request. Now serving %d/%d unicast clients.", mCmdAndControlPort, IP_PRINTF_HELPER(addr), port, - mUnicastTargets.size(), kMaxUnicastTargets); + mUnicastTargets.size(), kMaxAllowedUnicastTargets); return; } } } -void AAH_TXGroup::Timeout::setTimeout(int msec) { - if (msec < 0) { - mSystemEndTime = 0; - return; - } - - mSystemEndTime = systemTime() + (static_cast<nsecs_t>(msec) * 1000000); -} - -int AAH_TXGroup::Timeout::msecTillTimeout(nsecs_t nowTime) { - if (!mSystemEndTime) { - return -1; - } - - if (mSystemEndTime < nowTime) { - return 0; - } - - nsecs_t delta = mSystemEndTime - nowTime; - delta += 999999; - delta /= 1000000; - if (delta > 0x7FFFFFFF) { - return 0x7FFFFFFF; - } - - return static_cast<int>(delta); -} - AAH_TXGroup::CmdAndControlRXer::CmdAndControlRXer() { mWakeupEventFD = -1; } @@ -1003,25 +1025,16 @@ bool AAH_TXGroup::CmdAndControlRXer::threadLoop() { // Check the heartbeat timeout for this group. tmp = txGroups[i]->mHeartbeatTimeout.msecTillTimeout(now); - if (static_cast<unsigned int>(tmp) < - static_cast<unsigned int>(nextTimeout)) { - nextTimeout = tmp; - } + nextTimeout = minTimeout(nextTimeout, tmp); // Check the cleanup timeout for this group. tmp = txGroups[i]->mCleanupTimeout.msecTillTimeout(now); - if (static_cast<unsigned int>(tmp) < - static_cast<unsigned int>(nextTimeout)) { - nextTimeout = tmp; - } + nextTimeout = minTimeout(nextTimeout, tmp); } // Take into account the common trim timeout. tmp = mTrimRetryTimeout.msecTillTimeout(now); - if (static_cast<unsigned int>(tmp) < - static_cast<unsigned int>(nextTimeout)) { - nextTimeout = tmp; - } + nextTimeout = minTimeout(nextTimeout, tmp); } // Step 3: OK - time to wait for there to be something to do. Release our diff --git a/media/libaah_rtp/aah_tx_group.h b/media/libaah_rtp/aah_tx_group.h index 4736ea0473d9..276e81933f4b 100644 --- a/media/libaah_rtp/aah_tx_group.h +++ b/media/libaah_rtp/aah_tx_group.h @@ -26,12 +26,15 @@ #include <utils/Vector.h> #include "aah_tx_packet.h" +#include "utils.h" #define IP_PRINTF_HELPER(a) ((a >> 24) & 0xFF), ((a >> 16) & 0xFF), \ ((a >> 8) & 0xFF), (a & 0xFF) namespace android { +class AAH_TXPlayer; + template <typename T> class CircularBuffer { public: CircularBuffer(size_t capacity); @@ -69,26 +72,36 @@ class AAH_TXGroup : public virtual RefBase { // Obtain the instance of the TXGroup whose command and control socket is // currently listening on the specified port. Alternatively, if port is 0, // create a new TXGroup with an ephemerally bound command and control port. - static sp<AAH_TXGroup> getGroup(uint16_t port); + static sp<AAH_TXGroup> getGroup(uint16_t port, + const sp<AAH_TXPlayer>& client); // Obtain the instance of the TXGroup whose multicast transmit target is // currently set to target, or NULL if no such group exists. To create a // new transmit group with a new multicast target address, call getGroup(0) // followed by setMulticastTXTarget. - static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target); - - // AAH_TXGroups successfully obtained using calls to getGroup will have a - // client reference placed on them on behalf of the caller. When the caller - // is finished using the group, they must call dropClientReference to remove - // this reference. Once call client references have been released from a TX - // Group, the group will linger in the system for a short period of time - // before finally expiring and being cleaned up by the command and control - // thread. - // + static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target, + const sp<AAH_TXPlayer>& client); + + // AAH_TXGroups successfully obtained using calls to getGroup will hold a + // reference back to the client passed to getGroup. When the client is + // finished using the group, it must call unregisterClient to release this + // reference. + // + // While there exists active clients of transmit group, the TX group will + // periodically send heartbeat messages to receiver clients containing the + // program IDs of the currently active TX Player clients so that receivers + // have a chance to clean up orphaned programs in the case where all EOS + // messages got dropped on the way to the receiver. + // + // Once all clients references have been released from a TX Group, the group + // will linger in the system for a short period of time before finally + // expiring and being cleaned up by the command and control thread. + // // TODO : someday, expose the AAH_TXGroup as a top level object in the // android MediaAPIs so that applications may explicitly manage TX group // lifecycles instead of relying on this timeout/cleanup mechanism. - void dropClientReference(); + void unregisterClient(const sp<AAH_TXPlayer>& client); + // Fetch the UDP port on which this TXGroup is listening for command and // control messages. No need to hold any locks for this, the port is @@ -96,9 +109,6 @@ class AAH_TXGroup : public virtual RefBase { // afterwards. uint16_t getCmdAndControlPort() const { return mCmdAndControlPort; } - // Used by players to obtain a new program ID for this retransmission group. - uint16_t getNewProgramID(); - // Assign a TRTP sequence number to the supplied packet and send it to all // registered clients. Then place the packet into the RetryBuffer to // service future client retry requests. @@ -114,30 +124,6 @@ class AAH_TXGroup : public virtual RefBase { ~AAH_TXGroup(); private: - // Definition of a helper class used to track things like when we need to - // transmit a heartbeat, or when we will need to wake up and trim the retry - // buffers. - class Timeout { - public: - Timeout() : mSystemEndTime(0) { } - - // Set a timeout which should occur msec milliseconds from now. - // Negative values will cancel any current timeout; - void setTimeout(int msec); - - // Return the number of milliseconds until the timeout occurs, or -1 if - // no timeout is scheduled. - int msecTillTimeout(nsecs_t nowTime); - int msecTillTimeout() { return msecTillTimeout(systemTime()); } - - private: - // The systemTime() at which the timeout will be complete, or 0 if no - // timeout is currently scheduled. - nsecs_t mSystemEndTime; - - DISALLOW_EVIL_CONSTRUCTORS(Timeout); - }; - // Definition of the singleton command and control receiver who will handle // requests from RX clients. Requests include things like unicast group // management as well as retransmission requests. @@ -228,8 +214,11 @@ class AAH_TXGroup : public virtual RefBase { const uint8_t* payload, size_t length); - // Add a client ref to this TX Group and reset its cleanup timer. - void addClientReference(); + // Register a player client to this TX Group and reset its cleanup timer. + bool registerClient(const sp<AAH_TXPlayer>& client); + + // Obtain a new program ID for a newly registered client. + uint8_t getNewProgramID(); // Test used by the C&C thread to see if its time to expire and cleanup a // client. @@ -263,12 +252,12 @@ class AAH_TXGroup : public virtual RefBase { // Lock we use to serialize access to instance variables. Mutex mLock; - + // The list of packets we hold for servicing retry requests. RetryBuffer mRetryBuffer; - // The number of TX Player clients currently using this TX group. - uint32_t mClientRefCount; + // The current set of active TX Player clients using this TX group. + Vector< sp<AAH_TXPlayer> > mActiveClients; // The sequence number to assign to the next transmitted TRTP packet. uint16_t mTRTPSeqNumber; @@ -322,10 +311,12 @@ class AAH_TXGroup : public virtual RefBase { static const int kUnicastClientTimeoutMsec; static const size_t kRetryBufferCapacity; - static const size_t kMaxUnicastTargets; + static const size_t kMaxAllowedUnicastTargets; static const size_t kInitialUnicastTargetCapacity; static const size_t kMaxAllowedTXGroups; static const size_t kInitialActiveTXGroupsCapacity; + static const size_t kMaxAllowedPlayerClients; + static const size_t kInitialPlayerClientCapacity; static const uint32_t kCNC_RetryRequestID; static const uint32_t kCNC_FastStartRequestID; diff --git a/media/libaah_rtp/aah_tx_packet.cpp b/media/libaah_rtp/aah_tx_packet.cpp index c7ad3e0d2c96..d41ba8c1703c 100644 --- a/media/libaah_rtp/aah_tx_packet.cpp +++ b/media/libaah_rtp/aah_tx_packet.cpp @@ -74,7 +74,7 @@ void TRTPPacket::setEpoch(uint32_t val) { } } -void TRTPPacket::setProgramID(uint16_t val) { +void TRTPPacket::setProgramID(uint8_t val) { CHECK(!mIsPacked); mProgramID = val; } @@ -341,4 +341,44 @@ bool TRTPControlPacket::pack() { return true; } +bool TRTPActiveProgramUpdatePacket::pushProgramID(uint8_t id) { + if (mProgramIDCnt >= kMaxProgramIDs) + return false; + + mProgramIDs[mProgramIDCnt++] = id; + return true; +} + +bool TRTPActiveProgramUpdatePacket::pack() { + if (mIsPacked) { + return false; + } + + // Active program update packets contain a 2-byte command ID, 1 byte of + // length, and length bytes of program IDs. + int packetLen = kRTPHeaderLen + + TRTPHeaderLen() + + mProgramIDCnt + + 3; + + mPacket = new uint8_t[packetLen]; + if (!mPacket) { + return false; + } + + mPacketLen = packetLen; + + uint8_t* cur = mPacket; + + writeTRTPHeader(cur, true, packetLen); + writeU16(cur, mCommandID); + writeU8(cur, mProgramIDCnt); + for (uint8_t i = 0; i < mProgramIDCnt; ++i) { + writeU8(cur, mProgramIDs[i]); + } + + mIsPacked = true; + return true; +} + } // namespace android diff --git a/media/libaah_rtp/aah_tx_packet.h b/media/libaah_rtp/aah_tx_packet.h index b9f0fe6d2dea..670bf1afcac4 100644 --- a/media/libaah_rtp/aah_tx_packet.h +++ b/media/libaah_rtp/aah_tx_packet.h @@ -62,7 +62,7 @@ class TRTPPacket : public RefBase { int64_t getPTS() const; void setEpoch(uint32_t val); - void setProgramID(uint16_t val); + void setProgramID(uint8_t val); void setSubstreamID(uint16_t val); void setClockTransform(const LinearTransform& trans); @@ -103,7 +103,7 @@ class TRTPPacket : public RefBase { bool mPTSValid; int64_t mPTS; uint32_t mEpoch; - uint16_t mProgramID; + uint8_t mProgramID; uint16_t mSubstreamID; LinearTransform mClockTranform; bool mClockTranformValid; @@ -177,16 +177,37 @@ class TRTPControlPacket : public TRTPPacket { kCommandNop = 1, kCommandFlush = 2, kCommandEOS = 3, + kCommandAPU = 4, }; void setCommandID(TRTPCommandID val); virtual bool pack(); - private: + protected: + explicit TRTPControlPacket(TRTPCommandID commandID) + : TRTPPacket(kHeaderTypeControl) + , mCommandID(commandID) {} + TRTPCommandID mCommandID; }; +class TRTPActiveProgramUpdatePacket : public TRTPControlPacket { + public: + TRTPActiveProgramUpdatePacket() + : TRTPControlPacket(kCommandAPU) + , mProgramIDCnt(0) {} + + virtual bool pack(); + bool pushProgramID(uint8_t id); + + private: + static const uint8_t kMaxProgramIDs = 31; + + uint8_t mProgramIDCnt; + uint8_t mProgramIDs[kMaxProgramIDs]; +}; + } // namespace android #endif // __AAH_TX_PLAYER_H__ diff --git a/media/libaah_rtp/aah_tx_player.cpp b/media/libaah_rtp/aah_tx_player.cpp index c11e5024946b..476687ae39b7 100644 --- a/media/libaah_rtp/aah_tx_player.cpp +++ b/media/libaah_rtp/aah_tx_player.cpp @@ -34,6 +34,7 @@ #include "aah_tx_packet.h" #include "aah_tx_player.h" +#include "utils.h" namespace android { @@ -52,6 +53,7 @@ static const int64_t kAAHBufferTimeUs = 1000000LL; const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs = kAAHBufferTimeUs * 1100; +const int AAH_TXPlayer::kPauseTSUpdateResendTimeoutMsec = 250; const int32_t AAH_TXPlayer::kInvokeGetCNCPort = 0xB33977; sp<MediaPlayerBase> createAAH_TXPlayer() { @@ -521,6 +523,7 @@ status_t AAH_TXPlayer::play_l() { status_t AAH_TXPlayer::stop() { status_t ret = pause(); + mPauseTSUpdateResendTimeout.setTimeout(-1); sendEOS_l(); return ret; } @@ -586,11 +589,22 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) { mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1; // send a packet announcing the new transform + sendTSUpdateNop_l(); + + // if we are paused, schedule a periodic resend of the TS update, JiC the + // receiveing client misses it. + if (mPlayRateIsPaused) { + mPauseTSUpdateResendTimeout.setTimeout(kPauseTSUpdateResendTimeoutMsec); + } else { + mPauseTSUpdateResendTimeout.setTimeout(-1); + } +} + +void AAH_TXPlayer::sendEOS_l() { if (mAAH_TXGroup != NULL) { sp<TRTPControlPacket> packet = new TRTPControlPacket(); if (packet != NULL) { - packet->setClockTransform(mCurrentClockTransform); - packet->setCommandID(TRTPControlPacket::kCommandNop); + packet->setCommandID(TRTPControlPacket::kCommandEOS); sendPacket_l(packet); } else { LOGD("Failed to allocate TRTP packet at %s:%d", __FILE__, __LINE__); @@ -598,11 +612,12 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) { } } -void AAH_TXPlayer::sendEOS_l() { - if (mAAH_TXGroup != NULL) { +void AAH_TXPlayer::sendTSUpdateNop_l() { + if ((mAAH_TXGroup != NULL) && mCurrentClockTransformValid) { sp<TRTPControlPacket> packet = new TRTPControlPacket(); if (packet != NULL) { - packet->setCommandID(TRTPControlPacket::kCommandEOS); + packet->setClockTransform(mCurrentClockTransform); + packet->setCommandID(TRTPControlPacket::kCommandFlush); sendPacket_l(packet); } else { LOGD("Failed to allocate TRTP packet at %s:%d", __FILE__, __LINE__); @@ -754,20 +769,20 @@ void AAH_TXPlayer::reset_l() { mIsSeeking = false; mSeekTimeUs = 0; + mPauseTSUpdateResendTimeout.setTimeout(-1); + mUri.setTo(""); mUriHeaders.clear(); mFileSource.clear(); mBitrate = -1; - mProgramID = 0; if (mAAH_TXGroup != NULL) { - LOGI("TXPlayer leaving TXGroup listening on C&C port %hu", - mAAH_TXGroup->getCmdAndControlPort()); - mAAH_TXGroup->dropClientReference(); + mAAH_TXGroup->unregisterClient(sp<AAH_TXPlayer>(this)); mAAH_TXGroup = NULL; } + mProgramID = 0; mLastQueuedMediaTimePTSValid = false; mCurrentClockTransformValid = false; @@ -873,6 +888,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint( uint32_t addr = ntohl(endpoint->sin_addr.s_addr); uint16_t port = ntohs(endpoint->sin_port); + sp<AAH_TXPlayer> thiz(this); if ((addr & 0xF0000000) == 0xE0000000) { // Starting in multicast mode? We need to have a specified port to // multicast to, so sanity check that first. Then search for an @@ -884,10 +900,11 @@ status_t AAH_TXPlayer::setRetransmitEndpoint( return BAD_VALUE; } - mAAH_TXGroup = AAH_TXGroup::getGroup(endpoint); + mAAH_TXGroup = AAH_TXGroup::getGroup(endpoint, thiz); if (mAAH_TXGroup == NULL) { // No pre-existing group. Make a new one. - mAAH_TXGroup = AAH_TXGroup::getGroup(static_cast<uint16_t>(0)); + mAAH_TXGroup = AAH_TXGroup::getGroup(static_cast<uint16_t>(0), + thiz); // Still no group? Thats bad. We probably have exceeded our limit // on the number of simultaneous TX groups. @@ -907,7 +924,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint( } else if (addr == INADDR_ANY) { // Starting in unicast mode. A port of 0 means we need to create a new // group, a non-zero port means that we want to join an existing one. - mAAH_TXGroup = AAH_TXGroup::getGroup(port); + mAAH_TXGroup = AAH_TXGroup::getGroup(port, thiz); if (mAAH_TXGroup == NULL) { if (port) { @@ -929,7 +946,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint( } CHECK(mAAH_TXGroup != NULL); - mProgramID = mAAH_TXGroup->getNewProgramID(); + CHECK(mProgramID != 0); return OK; } @@ -1111,6 +1128,8 @@ void AAH_TXPlayer::onPumpAudio() { // of good options here. For now, signal an error up to the app level // and shut down the transmission pump. int64_t commonTimeNow; + int64_t mediaTimeNow; + bool mediaTimeNowValid = false; if (OK != mCCHelper.getCommonTime(&commonTimeNow)) { // Failed to get common time; either the service is down or common // time is not synced. Raise an error and shutdown the player. @@ -1121,20 +1140,34 @@ void AAH_TXPlayer::onPumpAudio() { break; } - if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) { - int64_t mediaTimeNow; - bool conversionResult = mCurrentClockTransform.doReverseTransform( - commonTimeNow, - &mediaTimeNow); - CHECK(conversionResult); + if (mCurrentClockTransformValid) { + mediaTimeNowValid = mCurrentClockTransform.doReverseTransform( + commonTimeNow, + &mediaTimeNow); + CHECK(mediaTimeNowValid); + } - if ((mediaTimeNow + - kAAHBufferTimeUs - - mLastQueuedMediaTimePTS) <= 0) { - break; + // Has our pause-timestamp-update timer fired? If so, take appropriate + // action. + if (!mPauseTSUpdateResendTimeout.msecTillTimeout()) { + if (mPlayRateIsPaused) { + // Send the update and schedule the next update. + sendTSUpdateNop_l(); + mPauseTSUpdateResendTimeout.setTimeout( + kPauseTSUpdateResendTimeoutMsec); + } else { + // Not paused; cancel the timer so it does not bug us anymore. + mPauseTSUpdateResendTimeout.setTimeout(-1); } } + // Stop if we have reached our buffer threshold. + if (mediaTimeNowValid && + mLastQueuedMediaTimePTSValid && + (mediaTimeNow + kAAHBufferTimeUs - mLastQueuedMediaTimePTS) <= 0) { + break; + } + MediaSource::ReadOptions options; if (mIsSeeking) { options.setSeekTo(mSeekTimeUs); diff --git a/media/libaah_rtp/aah_tx_player.h b/media/libaah_rtp/aah_tx_player.h index 210e8ff22e0a..05dcd73322f3 100644 --- a/media/libaah_rtp/aah_tx_player.h +++ b/media/libaah_rtp/aah_tx_player.h @@ -66,6 +66,9 @@ class AAH_TXPlayer : public MediaPlayerHWInterface { virtual status_t setRetransmitEndpoint(const struct sockaddr_in* endpoint); + void setProgramID(uint8_t programID) { mProgramID = programID; } + uint8_t getProgramID() const { return mProgramID; } + static const int64_t kAAHRetryKeepAroundTimeNs; static const int32_t kInvokeGetCNCPort; @@ -103,6 +106,7 @@ class AAH_TXPlayer : public MediaPlayerHWInterface { status_t seekTo_l(int64_t timeUs); void updateClockTransform_l(bool pause); void sendEOS_l(); + void sendTSUpdateNop_l(); void cancelPlayerEvents(bool keepBufferingGoing = false); void reset_l(); void notifyListener_l(int msg, int ext1 = 0, int ext2 = 0); @@ -139,6 +143,8 @@ class AAH_TXPlayer : public MediaPlayerHWInterface { bool mIsSeeking; int64_t mSeekTimeUs; + Timeout mPauseTSUpdateResendTimeout; + sp<TimedEventQueue::Event> mPumpAudioEvent; bool mPumpAudioEventPending; @@ -162,9 +168,11 @@ class AAH_TXPlayer : public MediaPlayerHWInterface { bool mPlayRateIsPaused; CCHelper mCCHelper; - uint16_t mProgramID; + uint8_t mProgramID; uint8_t mTRTPVolume; + static const int kPauseTSUpdateResendTimeoutMsec; + DISALLOW_EVIL_CONSTRUCTORS(AAH_TXPlayer); }; diff --git a/media/libaah_rtp/utils.cpp b/media/libaah_rtp/utils.cpp new file mode 100644 index 000000000000..3ed259944cf1 --- /dev/null +++ b/media/libaah_rtp/utils.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils.h" + +namespace android { + +void Timeout::setTimeout(int msec) { + if (msec < 0) { + mSystemEndTime = 0; + return; + } + + mSystemEndTime = systemTime() + (static_cast<nsecs_t>(msec) * 1000000); +} + +int Timeout::msecTillTimeout(nsecs_t nowTime) { + if (!mSystemEndTime) { + return -1; + } + + if (mSystemEndTime < nowTime) { + return 0; + } + + nsecs_t delta = mSystemEndTime - nowTime; + delta += 999999; + delta /= 1000000; + if (delta > 0x7FFFFFFF) { + return 0x7FFFFFFF; + } + + return static_cast<int>(delta); +} + +} // namespace android diff --git a/media/libaah_rtp/utils.h b/media/libaah_rtp/utils.h new file mode 100644 index 000000000000..1e3d326612f6 --- /dev/null +++ b/media/libaah_rtp/utils.h @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __UTILS_H__ +#define __UTILS_H__ + +#include <netinet/in.h> + +#include <media/stagefright/foundation/ABase.h> +#include <utils/Timers.h> + +namespace android { + +// Definition of a helper class used to track things like when we need to +// transmit a heartbeat, or when we will need to wake up and trim the retry +// buffers. +class Timeout { + public: + Timeout() : mSystemEndTime(0) { } + + // Set a timeout which should occur msec milliseconds from now. + // Negative values will cancel any current timeout; + void setTimeout(int msec); + + // Return the number of milliseconds until the timeout occurs, or -1 if + // no timeout is scheduled. + int msecTillTimeout(nsecs_t nowTime); + int msecTillTimeout() { return msecTillTimeout(systemTime()); } + + private: + // The systemTime() at which the timeout will be complete, or 0 if no + // timeout is currently scheduled. + nsecs_t mSystemEndTime; + + DISALLOW_EVIL_CONSTRUCTORS(Timeout); +}; + +inline bool matchSockaddrs(const struct sockaddr_in* a, + const struct sockaddr_in* b) { + return ((a->sin_family == b->sin_family) && + (a->sin_addr.s_addr == b->sin_addr.s_addr) && + (a->sin_port == b->sin_port)); +} + +// Return the minimum timeout between a and b where timeouts less than 0 are +// considered to be infinite +inline int minTimeout(int a, int b) { + if (a < 0) { + return b; + } + + if (b < 0) { + return a; + } + + return ((a < b) ? a : b); +} + +} // namespace android + +#endif // __UTILS_H__ |