summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--media/libaah_rtp/Android.mk3
-rw-r--r--media/libaah_rtp/aah_rx_player.h43
-rw-r--r--media/libaah_rtp/aah_rx_player_core.cpp184
-rw-r--r--media/libaah_rtp/aah_rx_player_ring_buffer.cpp17
-rw-r--r--media/libaah_rtp/aah_rx_player_substream.cpp8
-rw-r--r--media/libaah_rtp/aah_tx_group.cpp181
-rw-r--r--media/libaah_rtp/aah_tx_group.h81
-rw-r--r--media/libaah_rtp/aah_tx_packet.cpp42
-rw-r--r--media/libaah_rtp/aah_tx_packet.h27
-rw-r--r--media/libaah_rtp/aah_tx_player.cpp79
-rw-r--r--media/libaah_rtp/aah_tx_player.h10
-rw-r--r--media/libaah_rtp/utils.cpp49
-rw-r--r--media/libaah_rtp/utils.h74
13 files changed, 555 insertions, 243 deletions
diff --git a/media/libaah_rtp/Android.mk b/media/libaah_rtp/Android.mk
index 44351edef622..361c073d3093 100644
--- a/media/libaah_rtp/Android.mk
+++ b/media/libaah_rtp/Android.mk
@@ -17,7 +17,8 @@ LOCAL_SRC_FILES := \
aah_tx_group.cpp \
aah_tx_packet.cpp \
aah_tx_player.cpp \
- pipe_event.cpp
+ pipe_event.cpp \
+ utils.cpp
LOCAL_C_INCLUDES := \
frameworks/base/include \
diff --git a/media/libaah_rtp/aah_rx_player.h b/media/libaah_rtp/aah_rx_player.h
index 53361b47707f..c9d0eb49978d 100644
--- a/media/libaah_rtp/aah_rx_player.h
+++ b/media/libaah_rtp/aah_rx_player.h
@@ -31,6 +31,7 @@
#include "aah_decoder_pump.h"
#include "pipe_event.h"
+#include "utils.h"
namespace android {
@@ -173,8 +174,7 @@ class AAH_RXPlayer : public MediaPlayerInterface {
bool waiting_for_fast_start_;
bool fetched_first_packet_;
- uint64_t rtp_activity_timeout_;
- bool rtp_activity_timeout_valid_;
+ Timeout rtp_activity_timeout_;
DISALLOW_EVIL_CONSTRUCTORS(RXRingBuffer);
};
@@ -194,9 +194,26 @@ class AAH_RXPlayer : public MediaPlayerInterface {
bool isAboutToUnderflow();
uint32_t getSSRC() const { return ssrc_; }
- uint16_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; }
+ uint8_t getProgramID() const { return (ssrc_ >> 5) & 0x1F; }
status_t getStatus() const { return status_; }
+ void clearInactivityTimeout() {
+ inactivity_timeout_.setTimeout(-1);
+ }
+
+ void resetInactivityTimeout() {
+ inactivity_timeout_.setTimeout(kInactivityTimeoutMsec);
+ }
+
+ bool shouldExpire() {
+ // Substreams should always have a positive time until timeout. A
+ // timeout value of 0 indicates that the timer has expired, while a
+ // negative timeout (normally meaning no timeout) is used by some of
+ // the core code to implement a mark and sweep pattern for cleaning
+ // out no longer relevant substreams.
+ return (inactivity_timeout_.msecTillTimeout() <= 0);
+ }
+
protected:
virtual ~Substream();
@@ -228,8 +245,10 @@ class AAH_RXPlayer : public MediaPlayerInterface {
uint32_t aux_data_expected_size_;
sp<AAH_DecoderPump> decoder_;
+ Timeout inactivity_timeout_;
- static int64_t kAboutToUnderflowThreshold;
+ static const int64_t kAboutToUnderflowThreshold;
+ static const int kInactivityTimeoutMsec;
DISALLOW_EVIL_CONSTRUCTORS(Substream);
};
@@ -247,7 +266,8 @@ class AAH_RXPlayer : public MediaPlayerInterface {
void processRingBuffer();
void processCommandPacket(PacketBuffer* pb);
bool processGaps();
- int computeNextGapRetransmitTimeout();
+ void setGapStatus(GapStatus status);
+ void cleanoutExpiredSubstreams();
void fetchAudioFlinger();
PipeEvent wakeup_work_thread_evt_;
@@ -268,7 +288,9 @@ class AAH_RXPlayer : public MediaPlayerInterface {
SeqNoGap current_gap_;
GapStatus current_gap_status_;
- uint64_t next_retrans_req_time_;
+ Timeout next_retrans_req_timeout_;
+
+ Timeout ss_cleanout_timeout_;
RXRingBuffer ring_buffer_;
SubstreamVec substreams_;
@@ -282,15 +304,14 @@ class AAH_RXPlayer : public MediaPlayerInterface {
static const uint32_t kRetransRequestMagic;
static const uint32_t kFastStartRequestMagic;
static const uint32_t kRetransNAKMagic;
- static const uint32_t kGapRerequestTimeoutUSec;
- static const uint32_t kFastStartTimeoutUSec;
- static const uint32_t kRTPActivityTimeoutUSec;
+ static const uint32_t kGapRerequestTimeoutMsec;
+ static const uint32_t kFastStartTimeoutMsec;
+ static const uint32_t kRTPActivityTimeoutMsec;
+ static const uint32_t kSSCleanoutTimeoutMsec;
static const uint32_t INVOKE_GET_MASTER_VOLUME = 3;
static const uint32_t INVOKE_SET_MASTER_VOLUME = 4;
- static uint64_t monotonicUSecNow();
-
DISALLOW_EVIL_CONSTRUCTORS(AAH_RXPlayer);
};
diff --git a/media/libaah_rtp/aah_rx_player_core.cpp b/media/libaah_rtp/aah_rx_player_core.cpp
index 616df0fb3684..33a548a0a3a8 100644
--- a/media/libaah_rtp/aah_rx_player_core.cpp
+++ b/media/libaah_rtp/aah_rx_player_core.cpp
@@ -37,9 +37,10 @@ const uint32_t AAH_RXPlayer::kRetransNAKMagic =
FOURCC('T','n','a','k');
const uint32_t AAH_RXPlayer::kFastStartRequestMagic =
FOURCC('T','f','s','t');
-const uint32_t AAH_RXPlayer::kGapRerequestTimeoutUSec = 75000;
-const uint32_t AAH_RXPlayer::kFastStartTimeoutUSec = 800000;
-const uint32_t AAH_RXPlayer::kRTPActivityTimeoutUSec = 10000000;
+const uint32_t AAH_RXPlayer::kGapRerequestTimeoutMsec = 75;
+const uint32_t AAH_RXPlayer::kFastStartTimeoutMsec = 800;
+const uint32_t AAH_RXPlayer::kRTPActivityTimeoutMsec = 10000;
+const uint32_t AAH_RXPlayer::kSSCleanoutTimeoutMsec = 1000;
static inline int16_t fetchInt16(uint8_t* data) {
return static_cast<int16_t>(U16_AT(data));
@@ -53,20 +54,10 @@ static inline int64_t fetchInt64(uint8_t* data) {
return static_cast<int64_t>(U64_AT(data));
}
-uint64_t AAH_RXPlayer::monotonicUSecNow() {
- struct timespec now;
- int res = clock_gettime(CLOCK_MONOTONIC, &now);
- CHECK(res >= 0);
-
- uint64_t ret = static_cast<uint64_t>(now.tv_sec) * 1000000;
- ret += now.tv_nsec / 1000;
-
- return ret;
-}
-
status_t AAH_RXPlayer::startWorkThread() {
status_t res;
stopWorkThread();
+ ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec);
res = thread_wrapper_->run("TRX_Player", PRIORITY_AUDIO);
if (res != OK) {
@@ -125,7 +116,7 @@ void AAH_RXPlayer::resetPipeline() {
substreams_.clear();
- current_gap_status_ = kGS_NoGap;
+ setGapStatus(kGS_NoGap);
}
bool AAH_RXPlayer::setupSocket() {
@@ -231,25 +222,26 @@ bool AAH_RXPlayer::threadLoop() {
while (!thread_wrapper_->exitPending()) {
// Step 1: Wait until there is something to do.
- int gap_timeout = computeNextGapRetransmitTimeout();
+ int gap_timeout = next_retrans_req_timeout_.msecTillTimeout();
int ring_timeout = ring_buffer_.computeInactivityTimeout();
+ int ss_cleanout_timeout = ss_cleanout_timeout_.msecTillTimeout();
int timeout = -1;
if (!ring_timeout) {
LOGW("RTP inactivity timeout reached, resetting pipeline.");
resetPipeline();
- timeout = gap_timeout;
- } else {
- if (gap_timeout < 0) {
- timeout = ring_timeout;
- } else if (ring_timeout < 0) {
- timeout = gap_timeout;
- } else {
- timeout = (gap_timeout < ring_timeout) ? gap_timeout
- : ring_timeout;
- }
+ continue;
+ }
+
+ if (!ss_cleanout_timeout) {
+ cleanoutExpiredSubstreams();
+ continue;
}
+ timeout = minTimeout(gap_timeout, timeout);
+ timeout = minTimeout(ring_timeout, timeout);
+ timeout = minTimeout(ss_cleanout_timeout, timeout);
+
if ((0 != timeout) && (!process_more_right_now)) {
// Set up the events to wait on. Start with the wakeup pipe.
memset(&poll_fds, 0, sizeof(poll_fds));
@@ -564,7 +556,7 @@ void AAH_RXPlayer::processRingBuffer() {
}
}
- // Is this a command packet? If so, its not necessarily associate
+ // Is this a command packet? If so, its not necessarily associated
// with one particular substream. Just give it to the command
// packet handler and then move on.
if (4 == payload_type) {
@@ -620,7 +612,7 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {
}
uint8_t trtp_version = data[12];
- uint8_t trtp_flags = data[13] & 0xF;
+ uint8_t trtp_flags = data[13] & 0xF;
if (1 != trtp_version) {
LOGV("Dropping packet, bad trtp version %hhu", trtp_version);
@@ -643,32 +635,85 @@ void AAH_RXPlayer::processCommandPacket(PacketBuffer* pb) {
return;
}
+ bool do_cleanup_pass = false;
uint16_t command_id = U16_AT(data + offset);
+ offset += 2;
switch (command_id) {
case TRTPControlPacket::kCommandNop:
+ // Note: NOPs are frequently used to carry timestamp transformation
+ // updates. If there was a timestamp transform attached to this
+ // payload, it was already taken care of by processRX.
break;
case TRTPControlPacket::kCommandEOS:
+ // TODO need to differentiate between flush and EOS. Substreams
+ // which have hit EOS need a chance to drain before being destroyed.
+
case TRTPControlPacket::kCommandFlush: {
- uint16_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;
+ uint8_t program_id = (U32_AT(data + 8) >> 5) & 0x1F;
LOGI("*** %s flushing program_id=%d",
__PRETTY_FUNCTION__, program_id);
- Vector<uint32_t> substreams_to_remove;
+ // Flag any programs with the given program ID for cleanup.
for (size_t i = 0; i < substreams_.size(); ++i) {
- sp<Substream> iter = substreams_.valueAt(i);
- if (iter->getProgramID() == program_id) {
- iter->shutdown();
- substreams_to_remove.add(iter->getSSRC());
+ const sp<Substream>& stream = substreams_.valueAt(i);
+ if (stream->getProgramID() == program_id) {
+ stream->clearInactivityTimeout();
}
}
- for (size_t i = 0; i < substreams_to_remove.size(); ++i) {
- substreams_.removeItem(substreams_to_remove[i]);
+ // Make sure we do our cleanup pass at the end of this.
+ do_cleanup_pass = true;
+ } break;
+
+ case TRTPControlPacket::kCommandAPU: {
+ // Active program update packet. Go over all of our substreams and
+ // either reset the inactivity timer for the substreams listed in
+ // this update packet, or clear the inactivity timer for the
+ // substreams not listed in this update packet. A cleared
+ // inactivity timer will flag a substream for deletion in the
+ // cleanup pass at the end of this function.
+
+ // The packet must contain at least the 1 byte numActivePrograms
+ // field.
+ if (amt < offset + 1) {
+ return;
}
+ uint8_t numActivePrograms = data[offset++];
+
+ // If the payload is not long enough to contain the list it promises
+ // to have, just skip it.
+ if (amt < (offset + numActivePrograms)) {
+ return;
+ }
+
+ // Clear all inactivity timers.
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ const sp<Substream>& stream = substreams_.valueAt(i);
+ stream->clearInactivityTimeout();
+ }
+
+ // Now go over the list of active programs and reset the inactivity
+ // timers for those streams which are currently in the active
+ // program update packet.
+ for (uint8_t j = 0; j < numActivePrograms; ++j) {
+ uint8_t pid = (data[offset + j] & 0x1F);
+ for (size_t i = 0; i < substreams_.size(); ++i) {
+ const sp<Substream>& stream = substreams_.valueAt(i);
+ if (stream->getProgramID() == pid) {
+ stream->resetInactivityTimeout();
+ }
+ }
+ }
+
+ // Make sure we do our cleanup pass at the end of this.
+ do_cleanup_pass = true;
} break;
}
+
+ if (do_cleanup_pass)
+ cleanoutExpiredSubstreams();
}
bool AAH_RXPlayer::processGaps() {
@@ -705,18 +750,18 @@ bool AAH_RXPlayer::processGaps() {
// this gap and move on.
if (!send_retransmit_request &&
(kGS_NoGap != current_gap_status_) &&
- (0 == computeNextGapRetransmitTimeout())) {
-
+ (0 == next_retrans_req_timeout_.msecTillTimeout())) {
// If out current gap is the fast-start gap, don't bother to skip it
// because substreams look like the are about to underflow.
if ((kGS_FastStartGap != gap_status) ||
(current_gap_.end_seq_ != gap.end_seq_)) {
+
for (size_t i = 0; i < substreams_.size(); ++i) {
if (substreams_.valueAt(i)->isAboutToUnderflow()) {
- LOGV("About to underflow, giving up on gap [%hu, %hu]",
+ LOGI("About to underflow, giving up on gap [%hu, %hu]",
gap.start_seq_, gap.end_seq_);
ring_buffer_.processNAK();
- current_gap_status_ = kGS_NoGap;
+ setGapStatus(kGS_NoGap);
return true;
}
}
@@ -727,7 +772,7 @@ bool AAH_RXPlayer::processGaps() {
send_retransmit_request = true;
}
} else {
- current_gap_status_ = kGS_NoGap;
+ setGapStatus(kGS_NoGap);
}
if (send_retransmit_request) {
@@ -738,7 +783,7 @@ bool AAH_RXPlayer::processGaps() {
(current_gap_.end_seq_ == gap.end_seq_)) {
LOGV("Fast start is taking forever; giving up.");
ring_buffer_.processNAK();
- current_gap_status_ = kGS_NoGap;
+ setGapStatus(kGS_NoGap);
return true;
}
@@ -777,32 +822,53 @@ bool AAH_RXPlayer::processGaps() {
// Update the current gap info.
current_gap_ = gap;
- current_gap_status_ = gap_status;
- next_retrans_req_time_ = monotonicUSecNow() +
- ((kGS_FastStartGap == current_gap_status_)
- ? kFastStartTimeoutUSec
- : kGapRerequestTimeoutUSec);
+ setGapStatus(gap_status);
}
return false;
}
-// Compute when its time to send the next gap retransmission in milliseconds.
-// Returns < 0 for an infinite timeout (no gap) and 0 if its time to retransmit
-// right now.
-int AAH_RXPlayer::computeNextGapRetransmitTimeout() {
- if (kGS_NoGap == current_gap_status_) {
- return -1;
- }
+void AAH_RXPlayer::setGapStatus(GapStatus status) {
+ current_gap_status_ = status;
+
+ switch(current_gap_status_) {
+ case kGS_NormalGap:
+ next_retrans_req_timeout_.setTimeout(kGapRerequestTimeoutMsec);
+ break;
- int64_t timeout_delta = next_retrans_req_time_ - monotonicUSecNow();
+ case kGS_FastStartGap:
+ next_retrans_req_timeout_.setTimeout(kFastStartTimeoutMsec);
+ break;
- timeout_delta /= 1000;
- if (timeout_delta <= 0) {
- return 0;
+ case kGS_NoGap:
+ default:
+ next_retrans_req_timeout_.setTimeout(-1);
+ break;
}
+}
+
+void AAH_RXPlayer::cleanoutExpiredSubstreams() {
+ static const size_t kMaxPerPass = 32;
+ uint32_t to_remove[kMaxPerPass];
+ size_t cnt, i;
+
+ do {
+ for (i = 0, cnt = 0;
+ (i < substreams_.size()) && (cnt < kMaxPerPass);
+ ++i) {
+ const sp<Substream>& stream = substreams_.valueAt(i);
+ if (stream->shouldExpire()) {
+ to_remove[cnt++] = stream->getSSRC();
+ }
+ }
+
+ for (i = 0; i < cnt; ++i) {
+ LOGI("Purging substream with SSRC 0x%08x", to_remove[i]);
+ substreams_.removeItem(to_remove[i]);
+ }
+ } while (cnt >= kMaxPerPass);
- return static_cast<uint32_t>(timeout_delta);
+ ss_cleanout_timeout_.setTimeout(kSSCleanoutTimeoutMsec);
}
} // namespace android
diff --git a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp
index 22467f79e8d1..b59185060c4b 100644
--- a/media/libaah_rtp/aah_rx_player_ring_buffer.cpp
+++ b/media/libaah_rtp/aah_rx_player_ring_buffer.cpp
@@ -53,7 +53,7 @@ void AAH_RXPlayer::RXRingBuffer::reset() {
rd_seq_known_ = false;
waiting_for_fast_start_ = true;
fetched_first_packet_ = false;
- rtp_activity_timeout_valid_ = false;
+ rtp_activity_timeout_.setTimeout(-1);
}
bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,
@@ -62,8 +62,7 @@ bool AAH_RXPlayer::RXRingBuffer::pushBuffer(PacketBuffer* buf,
CHECK(NULL != ring_);
CHECK(NULL != buf);
- rtp_activity_timeout_valid_ = true;
- rtp_activity_timeout_ = monotonicUSecNow() + kRTPActivityTimeoutUSec;
+ rtp_activity_timeout_.setTimeout(kRTPActivityTimeoutMsec);
// If the ring buffer is totally reset (we have never received a single
// payload) then we don't know the rd sequence number and this should be
@@ -328,17 +327,7 @@ void AAH_RXPlayer::RXRingBuffer::processNAK(SeqNoGap* nak) {
int AAH_RXPlayer::RXRingBuffer::computeInactivityTimeout() {
AutoMutex lock(&lock_);
-
- if (!rtp_activity_timeout_valid_) {
- return -1;
- }
-
- uint64_t now = monotonicUSecNow();
- if (rtp_activity_timeout_ <= now) {
- return 0;
- }
-
- return (rtp_activity_timeout_ - now) / 1000;
+ return rtp_activity_timeout_.msecTillTimeout();
}
AAH_RXPlayer::PacketBuffer*
diff --git a/media/libaah_rtp/aah_rx_player_substream.cpp b/media/libaah_rtp/aah_rx_player_substream.cpp
index 3e3a95c47620..c72215b0d3eb 100644
--- a/media/libaah_rtp/aah_rx_player_substream.cpp
+++ b/media/libaah_rtp/aah_rx_player_substream.cpp
@@ -33,8 +33,9 @@
namespace android {
-int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold =
+const int64_t AAH_RXPlayer::Substream::kAboutToUnderflowThreshold =
50ull * 1000;
+const int AAH_RXPlayer::Substream::kInactivityTimeoutMsec = 10000;
AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {
ssrc_ = ssrc;
@@ -54,6 +55,7 @@ AAH_RXPlayer::Substream::Substream(uint32_t ssrc, OMXClient& omx) {
// cleanupBufferInProgress will reset most of the internal state variables.
// Just need to make sure that buffer_in_progress_ is NULL before calling.
cleanupBufferInProgress();
+ resetInactivityTimeout();
}
AAH_RXPlayer::Substream::~Substream() {
@@ -108,6 +110,8 @@ void AAH_RXPlayer::Substream::processPayloadStart(uint8_t* buf,
return;
}
+ resetInactivityTimeout();
+
// Do we have a buffer in progress already? If so, abort the buffer. In
// theory, this should never happen. If there were a discontinutity in the
// stream, the discon in the seq_nos at the RTP level should have already
@@ -362,6 +366,8 @@ void AAH_RXPlayer::Substream::processPayloadCont(uint8_t* buf,
return;
}
+ resetInactivityTimeout();
+
if (NULL == buffer_in_progress_) {
LOGV("TRTP Receiver skipping payload continuation; no buffer currently"
" in progress.");
diff --git a/media/libaah_rtp/aah_tx_group.cpp b/media/libaah_rtp/aah_tx_group.cpp
index 22dd0260a994..2e3df0a73d5a 100644
--- a/media/libaah_rtp/aah_tx_group.cpp
+++ b/media/libaah_rtp/aah_tx_group.cpp
@@ -29,6 +29,7 @@
#include "aah_tx_group.h"
#include "aah_tx_player.h"
+#include "utils.h"
//#define DROP_PACKET_TEST
#ifdef DROP_PACKET_TEST
@@ -101,15 +102,17 @@ static ssize_t droptest_recvfrom(int sockfd, void *buf,
namespace android {
const int AAH_TXGroup::kRetryTrimIntervalMsec = 100;
-const int AAH_TXGroup::kHeartbeatIntervalMsec = 1000;
+const int AAH_TXGroup::kHeartbeatIntervalMsec = 500;
const int AAH_TXGroup::kTXGroupLingerTimeMsec = 10000;
const int AAH_TXGroup::kUnicastClientTimeoutMsec = 5000;
const size_t AAH_TXGroup::kRetryBufferCapacity = 100;
-const size_t AAH_TXGroup::kMaxUnicastTargets = 16;
+const size_t AAH_TXGroup::kMaxAllowedUnicastTargets = 16;
const size_t AAH_TXGroup::kInitialUnicastTargetCapacity = 4;
-const size_t AAH_TXGroup::kMaxAllowedTXGroups = 16;
+const size_t AAH_TXGroup::kMaxAllowedTXGroups = 8;
const size_t AAH_TXGroup::kInitialActiveTXGroupsCapacity = 4;
+const size_t AAH_TXGroup::kMaxAllowedPlayerClients = 4;
+const size_t AAH_TXGroup::kInitialPlayerClientCapacity = 2;
const uint32_t AAH_TXGroup::kCNC_RetryRequestID = 'Treq';
const uint32_t AAH_TXGroup::kCNC_FastStartRequestID = 'Tfst';
@@ -124,33 +127,24 @@ sp<AAH_TXGroup::CmdAndControlRXer> AAH_TXGroup::mCmdAndControlRXer;
uint32_t AAH_TXGroup::sNextEpoch;
bool AAH_TXGroup::sNextEpochValid = false;
-static inline bool matchSockaddrs(const struct sockaddr_in* a,
- const struct sockaddr_in* b) {
- CHECK(NULL != a);
- CHECK(NULL != b);
- return ((a->sin_family == b->sin_family) &&
- (a->sin_addr.s_addr == b->sin_addr.s_addr) &&
- (a->sin_port == b->sin_port));
-}
-
AAH_TXGroup::AAH_TXGroup()
: mRetryBuffer(kRetryBufferCapacity)
{
// Initialize members with no constructor to sensible defaults.
- mClientRefCount = 0;
mTRTPSeqNumber = 0;
mNextProgramID = 1;
mEpoch = getNextEpoch();
mMulticastTargetValid = false;
mSocket = -1;
mCmdAndControlPort = 0;
- mClientRefCount = 0;
mUnicastTargets.setCapacity(kInitialUnicastTargetCapacity);
+ mActiveClients.setCapacity(kInitialPlayerClientCapacity);
+ mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);
}
AAH_TXGroup::~AAH_TXGroup() {
- CHECK(0 == mClientRefCount);
+ CHECK(mActiveClients.size() == 0);
if (mSocket >= 0) {
::close(mSocket);
@@ -237,7 +231,8 @@ bailout:
return ret_val;
}
-sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {
+sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port,
+ const sp<AAH_TXPlayer>& client) {
sp<AAH_TXGroup> ret_val;
// If port is non-zero, we are creating a new group. Otherwise, we are
@@ -250,7 +245,13 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {
for (size_t i = 0; i < sActiveTXGroups.size(); ++i) {
if (port == sActiveTXGroups[i]->getCmdAndControlPort()) {
ret_val = sActiveTXGroups[i];
- ret_val->addClientReference();
+
+ if (!ret_val->registerClient(client)) {
+ // No need to log an error, registerClient has already done
+ // so for us.
+ ret_val = NULL;
+ }
+
break;
}
}
@@ -312,6 +313,13 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {
}
}
+ // Register the client with the newly created group.
+ if (!ret_val->registerClient(client)) {
+ // No need to log an error, registerClient has already done so
+ // for us.
+ goto bailout;
+ }
+
// Make sure we are at least at minimum capacity in the
// ActiveTXGroups vector.
if (sActiveTXGroups.capacity() < kInitialActiveTXGroupsCapacity) {
@@ -321,11 +329,10 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(uint16_t port) {
// Add ourselves to the list of active TXGroups.
if (sActiveTXGroups.add(ret_val) < 0) {
LOGE("Failed to add new TX Group to Active Group list");
+ ret_val->unregisterClient(client);
goto bailout;
}
- ret_val->addClientReference();
-
LOGI("Created TX Group with C&C Port %hu. %d/%d groups now"
" active.", ret_val->getCmdAndControlPort(),
sActiveTXGroups.size(), kMaxAllowedTXGroups);
@@ -342,7 +349,8 @@ bailout:
return sp<AAH_TXGroup>(NULL);
}
-sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target) {
+sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target,
+ const sp<AAH_TXPlayer>& client) {
// Hold the static lock while we search for a TX Group which has the
// multicast target passed to us.
Mutex::Autolock lock(sLock);
@@ -359,32 +367,71 @@ sp<AAH_TXGroup> AAH_TXGroup::getGroup(const struct sockaddr_in* target) {
}
}
- if (ret_val != NULL)
- ret_val->addClientReference();
+ if (ret_val != NULL) {
+ if (!ret_val->registerClient(client)) {
+ // No need to log an error, registerClient has already done so for
+ // us.
+ ret_val = NULL;
+ }
+ }
return ret_val;
}
-void AAH_TXGroup::dropClientReference() {
+void AAH_TXGroup::unregisterClient(const sp<AAH_TXPlayer>& client) {
Mutex::Autolock lock(mLock);
- CHECK(mClientRefCount > 0);
- --mClientRefCount;
- if (!mClientRefCount) {
+ LOGI("TXPlayer leaving TXGroup listening on C&C port %hu",
+ mCmdAndControlPort);
+
+ bool found_it = false;
+ for (size_t i = 0; i < mActiveClients.size(); ++i) {
+ if (mActiveClients[i].get() == client.get()) {
+ found_it = true;
+ mActiveClients.removeAt(i);
+ break;
+ }
+ }
+ CHECK(found_it);
+
+ if (!mActiveClients.size()) {
mCleanupTimeout.setTimeout(kTXGroupLingerTimeMsec);
}
}
-void AAH_TXGroup::addClientReference() {
+bool AAH_TXGroup::registerClient(const sp<AAH_TXPlayer>& client) {
+ // ASSERT holding sLock
Mutex::Autolock lock(mLock);
- ++mClientRefCount;
+
+ CHECK(client != NULL);
+
+ // Check the client limit.
+ if (mActiveClients.size() >= kMaxAllowedPlayerClients) {
+ LOGE("Cannot register new client with C&C group listening on port %hu."
+ " %d/%d clients are already active", mCmdAndControlPort,
+ mActiveClients.size(), kMaxAllowedPlayerClients);
+ return false;
+ }
+
+ // Try to add the client to the list.
+ if (mActiveClients.add(client) < 0) {
+ LOGE("Failed to register new client with C&C group listening on port"
+ " %hu. %d/%d clients are currently active", mCmdAndControlPort,
+ mActiveClients.size(), kMaxAllowedPlayerClients);
+ return false;
+ }
+
+ // Assign our new client's program ID, cancel the cleanup timeout and get
+ // out.
+ client->setProgramID(getNewProgramID());
mCleanupTimeout.setTimeout(-1);
+ return true;
}
bool AAH_TXGroup::shouldExpire() {
Mutex::Autolock lock(mLock);
- if (mClientRefCount) {
+ if (mActiveClients.size()) {
return false;
}
@@ -395,9 +442,12 @@ bool AAH_TXGroup::shouldExpire() {
return true;
}
-uint16_t AAH_TXGroup::getNewProgramID() {
- int tmp = android_atomic_inc(&mNextProgramID);
- return static_cast<uint16_t>(tmp & 0xFFFF);
+uint8_t AAH_TXGroup::getNewProgramID() {
+ uint8_t tmp;
+ do {
+ tmp = static_cast<uint8_t>(android_atomic_inc(&mNextProgramID) & 0x1F);
+ } while(!tmp);
+ return tmp;
}
status_t AAH_TXGroup::sendPacket(const sp<TRTPPacket>& packet) {
@@ -449,13 +499,10 @@ status_t AAH_TXGroup::sendPacket_l(const sp<TRTPPacket>& packet) {
LOGI("TXGroup on port %hu removing client at %d.%d.%d.%d:%hu due to"
" timeout. Now serving %d/%d unicast clients.",
mCmdAndControlPort, IP_PRINTF_HELPER(addr), port,
- mUnicastTargets.size(), kMaxUnicastTargets);
+ mUnicastTargets.size(), kMaxAllowedUnicastTargets);
}
}
- // reset our heartbeat timer.
- mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);
-
// Done; see comments sendToTarget discussing error handling behavior
return OK;
}
@@ -562,19 +609,22 @@ void AAH_TXGroup::sendHeartbeatIfNeeded() {
Mutex::Autolock lock(mLock);
if (!mHeartbeatTimeout.msecTillTimeout()) {
- sp<TRTPControlPacket> packet = new TRTPControlPacket();
+ sp<TRTPActiveProgramUpdatePacket> packet =
+ new TRTPActiveProgramUpdatePacket();
if (packet != NULL) {
- packet->setCommandID(TRTPControlPacket::kCommandNop);
+ for (size_t i = 0; i < mActiveClients.size(); ++i) {
+ packet->pushProgramID(mActiveClients[i]->getProgramID());
+ }
- // Note: the act of calling sendPacket will reset our heartbeat
- // timer.
sendPacket_l(packet);
} else {
LOGE("Failed to allocate TRTP packet for heartbeat on TX Group with"
" C&C port %hu", mCmdAndControlPort);
- mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);
}
+
+ // reset our heartbeat timer.
+ mHeartbeatTimeout.setTimeout(kHeartbeatIntervalMsec);
}
}
@@ -828,7 +878,7 @@ void AAH_TXGroup::handleJoinGroup(const struct sockaddr_in* src_addr) {
// Looks like we have a new client. Check to see if we have room to add it
// before proceeding. If not, send a NAK back so it knows to signal an
// error to its application level.
- if (mUnicastTargets.size() >= kMaxUnicastTargets) {
+ if (mUnicastTargets.size() >= kMaxAllowedUnicastTargets) {
uint32_t nak_payload = htonl(kCNC_NakJoinGroupID);
if (sendto(mSocket, &nak_payload, sizeof(nak_payload),
@@ -865,7 +915,7 @@ void AAH_TXGroup::handleJoinGroup(const struct sockaddr_in* src_addr) {
LOGI("TXGroup on port %hu added new client at %d.%d.%d.%d:%hu. "
"Now serving %d/%d unicast clients.",
mCmdAndControlPort, IP_PRINTF_HELPER(addr), port,
- mUnicastTargets.size(), kMaxUnicastTargets);
+ mUnicastTargets.size(), kMaxAllowedUnicastTargets);
}
void AAH_TXGroup::handleLeaveGroup(const struct sockaddr_in* src_addr) {
@@ -888,41 +938,13 @@ void AAH_TXGroup::handleLeaveGroup(const struct sockaddr_in* src_addr) {
LOGI("TXGroup on port %hu removing client at %d.%d.%d.%d:%hu due to"
" leave request. Now serving %d/%d unicast clients.",
mCmdAndControlPort, IP_PRINTF_HELPER(addr), port,
- mUnicastTargets.size(), kMaxUnicastTargets);
+ mUnicastTargets.size(), kMaxAllowedUnicastTargets);
return;
}
}
}
-void AAH_TXGroup::Timeout::setTimeout(int msec) {
- if (msec < 0) {
- mSystemEndTime = 0;
- return;
- }
-
- mSystemEndTime = systemTime() + (static_cast<nsecs_t>(msec) * 1000000);
-}
-
-int AAH_TXGroup::Timeout::msecTillTimeout(nsecs_t nowTime) {
- if (!mSystemEndTime) {
- return -1;
- }
-
- if (mSystemEndTime < nowTime) {
- return 0;
- }
-
- nsecs_t delta = mSystemEndTime - nowTime;
- delta += 999999;
- delta /= 1000000;
- if (delta > 0x7FFFFFFF) {
- return 0x7FFFFFFF;
- }
-
- return static_cast<int>(delta);
-}
-
AAH_TXGroup::CmdAndControlRXer::CmdAndControlRXer() {
mWakeupEventFD = -1;
}
@@ -1003,25 +1025,16 @@ bool AAH_TXGroup::CmdAndControlRXer::threadLoop() {
// Check the heartbeat timeout for this group.
tmp = txGroups[i]->mHeartbeatTimeout.msecTillTimeout(now);
- if (static_cast<unsigned int>(tmp) <
- static_cast<unsigned int>(nextTimeout)) {
- nextTimeout = tmp;
- }
+ nextTimeout = minTimeout(nextTimeout, tmp);
// Check the cleanup timeout for this group.
tmp = txGroups[i]->mCleanupTimeout.msecTillTimeout(now);
- if (static_cast<unsigned int>(tmp) <
- static_cast<unsigned int>(nextTimeout)) {
- nextTimeout = tmp;
- }
+ nextTimeout = minTimeout(nextTimeout, tmp);
}
// Take into account the common trim timeout.
tmp = mTrimRetryTimeout.msecTillTimeout(now);
- if (static_cast<unsigned int>(tmp) <
- static_cast<unsigned int>(nextTimeout)) {
- nextTimeout = tmp;
- }
+ nextTimeout = minTimeout(nextTimeout, tmp);
}
// Step 3: OK - time to wait for there to be something to do. Release our
diff --git a/media/libaah_rtp/aah_tx_group.h b/media/libaah_rtp/aah_tx_group.h
index 4736ea0473d9..276e81933f4b 100644
--- a/media/libaah_rtp/aah_tx_group.h
+++ b/media/libaah_rtp/aah_tx_group.h
@@ -26,12 +26,15 @@
#include <utils/Vector.h>
#include "aah_tx_packet.h"
+#include "utils.h"
#define IP_PRINTF_HELPER(a) ((a >> 24) & 0xFF), ((a >> 16) & 0xFF), \
((a >> 8) & 0xFF), (a & 0xFF)
namespace android {
+class AAH_TXPlayer;
+
template <typename T> class CircularBuffer {
public:
CircularBuffer(size_t capacity);
@@ -69,26 +72,36 @@ class AAH_TXGroup : public virtual RefBase {
// Obtain the instance of the TXGroup whose command and control socket is
// currently listening on the specified port. Alternatively, if port is 0,
// create a new TXGroup with an ephemerally bound command and control port.
- static sp<AAH_TXGroup> getGroup(uint16_t port);
+ static sp<AAH_TXGroup> getGroup(uint16_t port,
+ const sp<AAH_TXPlayer>& client);
// Obtain the instance of the TXGroup whose multicast transmit target is
// currently set to target, or NULL if no such group exists. To create a
// new transmit group with a new multicast target address, call getGroup(0)
// followed by setMulticastTXTarget.
- static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target);
-
- // AAH_TXGroups successfully obtained using calls to getGroup will have a
- // client reference placed on them on behalf of the caller. When the caller
- // is finished using the group, they must call dropClientReference to remove
- // this reference. Once call client references have been released from a TX
- // Group, the group will linger in the system for a short period of time
- // before finally expiring and being cleaned up by the command and control
- // thread.
- //
+ static sp<AAH_TXGroup> getGroup(const struct sockaddr_in* target,
+ const sp<AAH_TXPlayer>& client);
+
+ // AAH_TXGroups successfully obtained using calls to getGroup will hold a
+ // reference back to the client passed to getGroup. When the client is
+ // finished using the group, it must call unregisterClient to release this
+ // reference.
+ //
+ // While there exists active clients of transmit group, the TX group will
+ // periodically send heartbeat messages to receiver clients containing the
+ // program IDs of the currently active TX Player clients so that receivers
+ // have a chance to clean up orphaned programs in the case where all EOS
+ // messages got dropped on the way to the receiver.
+ //
+ // Once all clients references have been released from a TX Group, the group
+ // will linger in the system for a short period of time before finally
+ // expiring and being cleaned up by the command and control thread.
+ //
// TODO : someday, expose the AAH_TXGroup as a top level object in the
// android MediaAPIs so that applications may explicitly manage TX group
// lifecycles instead of relying on this timeout/cleanup mechanism.
- void dropClientReference();
+ void unregisterClient(const sp<AAH_TXPlayer>& client);
+
// Fetch the UDP port on which this TXGroup is listening for command and
// control messages. No need to hold any locks for this, the port is
@@ -96,9 +109,6 @@ class AAH_TXGroup : public virtual RefBase {
// afterwards.
uint16_t getCmdAndControlPort() const { return mCmdAndControlPort; }
- // Used by players to obtain a new program ID for this retransmission group.
- uint16_t getNewProgramID();
-
// Assign a TRTP sequence number to the supplied packet and send it to all
// registered clients. Then place the packet into the RetryBuffer to
// service future client retry requests.
@@ -114,30 +124,6 @@ class AAH_TXGroup : public virtual RefBase {
~AAH_TXGroup();
private:
- // Definition of a helper class used to track things like when we need to
- // transmit a heartbeat, or when we will need to wake up and trim the retry
- // buffers.
- class Timeout {
- public:
- Timeout() : mSystemEndTime(0) { }
-
- // Set a timeout which should occur msec milliseconds from now.
- // Negative values will cancel any current timeout;
- void setTimeout(int msec);
-
- // Return the number of milliseconds until the timeout occurs, or -1 if
- // no timeout is scheduled.
- int msecTillTimeout(nsecs_t nowTime);
- int msecTillTimeout() { return msecTillTimeout(systemTime()); }
-
- private:
- // The systemTime() at which the timeout will be complete, or 0 if no
- // timeout is currently scheduled.
- nsecs_t mSystemEndTime;
-
- DISALLOW_EVIL_CONSTRUCTORS(Timeout);
- };
-
// Definition of the singleton command and control receiver who will handle
// requests from RX clients. Requests include things like unicast group
// management as well as retransmission requests.
@@ -228,8 +214,11 @@ class AAH_TXGroup : public virtual RefBase {
const uint8_t* payload,
size_t length);
- // Add a client ref to this TX Group and reset its cleanup timer.
- void addClientReference();
+ // Register a player client to this TX Group and reset its cleanup timer.
+ bool registerClient(const sp<AAH_TXPlayer>& client);
+
+ // Obtain a new program ID for a newly registered client.
+ uint8_t getNewProgramID();
// Test used by the C&C thread to see if its time to expire and cleanup a
// client.
@@ -263,12 +252,12 @@ class AAH_TXGroup : public virtual RefBase {
// Lock we use to serialize access to instance variables.
Mutex mLock;
-
+
// The list of packets we hold for servicing retry requests.
RetryBuffer mRetryBuffer;
- // The number of TX Player clients currently using this TX group.
- uint32_t mClientRefCount;
+ // The current set of active TX Player clients using this TX group.
+ Vector< sp<AAH_TXPlayer> > mActiveClients;
// The sequence number to assign to the next transmitted TRTP packet.
uint16_t mTRTPSeqNumber;
@@ -322,10 +311,12 @@ class AAH_TXGroup : public virtual RefBase {
static const int kUnicastClientTimeoutMsec;
static const size_t kRetryBufferCapacity;
- static const size_t kMaxUnicastTargets;
+ static const size_t kMaxAllowedUnicastTargets;
static const size_t kInitialUnicastTargetCapacity;
static const size_t kMaxAllowedTXGroups;
static const size_t kInitialActiveTXGroupsCapacity;
+ static const size_t kMaxAllowedPlayerClients;
+ static const size_t kInitialPlayerClientCapacity;
static const uint32_t kCNC_RetryRequestID;
static const uint32_t kCNC_FastStartRequestID;
diff --git a/media/libaah_rtp/aah_tx_packet.cpp b/media/libaah_rtp/aah_tx_packet.cpp
index c7ad3e0d2c96..d41ba8c1703c 100644
--- a/media/libaah_rtp/aah_tx_packet.cpp
+++ b/media/libaah_rtp/aah_tx_packet.cpp
@@ -74,7 +74,7 @@ void TRTPPacket::setEpoch(uint32_t val) {
}
}
-void TRTPPacket::setProgramID(uint16_t val) {
+void TRTPPacket::setProgramID(uint8_t val) {
CHECK(!mIsPacked);
mProgramID = val;
}
@@ -341,4 +341,44 @@ bool TRTPControlPacket::pack() {
return true;
}
+bool TRTPActiveProgramUpdatePacket::pushProgramID(uint8_t id) {
+ if (mProgramIDCnt >= kMaxProgramIDs)
+ return false;
+
+ mProgramIDs[mProgramIDCnt++] = id;
+ return true;
+}
+
+bool TRTPActiveProgramUpdatePacket::pack() {
+ if (mIsPacked) {
+ return false;
+ }
+
+ // Active program update packets contain a 2-byte command ID, 1 byte of
+ // length, and length bytes of program IDs.
+ int packetLen = kRTPHeaderLen +
+ TRTPHeaderLen() +
+ mProgramIDCnt +
+ 3;
+
+ mPacket = new uint8_t[packetLen];
+ if (!mPacket) {
+ return false;
+ }
+
+ mPacketLen = packetLen;
+
+ uint8_t* cur = mPacket;
+
+ writeTRTPHeader(cur, true, packetLen);
+ writeU16(cur, mCommandID);
+ writeU8(cur, mProgramIDCnt);
+ for (uint8_t i = 0; i < mProgramIDCnt; ++i) {
+ writeU8(cur, mProgramIDs[i]);
+ }
+
+ mIsPacked = true;
+ return true;
+}
+
} // namespace android
diff --git a/media/libaah_rtp/aah_tx_packet.h b/media/libaah_rtp/aah_tx_packet.h
index b9f0fe6d2dea..670bf1afcac4 100644
--- a/media/libaah_rtp/aah_tx_packet.h
+++ b/media/libaah_rtp/aah_tx_packet.h
@@ -62,7 +62,7 @@ class TRTPPacket : public RefBase {
int64_t getPTS() const;
void setEpoch(uint32_t val);
- void setProgramID(uint16_t val);
+ void setProgramID(uint8_t val);
void setSubstreamID(uint16_t val);
void setClockTransform(const LinearTransform& trans);
@@ -103,7 +103,7 @@ class TRTPPacket : public RefBase {
bool mPTSValid;
int64_t mPTS;
uint32_t mEpoch;
- uint16_t mProgramID;
+ uint8_t mProgramID;
uint16_t mSubstreamID;
LinearTransform mClockTranform;
bool mClockTranformValid;
@@ -177,16 +177,37 @@ class TRTPControlPacket : public TRTPPacket {
kCommandNop = 1,
kCommandFlush = 2,
kCommandEOS = 3,
+ kCommandAPU = 4,
};
void setCommandID(TRTPCommandID val);
virtual bool pack();
- private:
+ protected:
+ explicit TRTPControlPacket(TRTPCommandID commandID)
+ : TRTPPacket(kHeaderTypeControl)
+ , mCommandID(commandID) {}
+
TRTPCommandID mCommandID;
};
+class TRTPActiveProgramUpdatePacket : public TRTPControlPacket {
+ public:
+ TRTPActiveProgramUpdatePacket()
+ : TRTPControlPacket(kCommandAPU)
+ , mProgramIDCnt(0) {}
+
+ virtual bool pack();
+ bool pushProgramID(uint8_t id);
+
+ private:
+ static const uint8_t kMaxProgramIDs = 31;
+
+ uint8_t mProgramIDCnt;
+ uint8_t mProgramIDs[kMaxProgramIDs];
+};
+
} // namespace android
#endif // __AAH_TX_PLAYER_H__
diff --git a/media/libaah_rtp/aah_tx_player.cpp b/media/libaah_rtp/aah_tx_player.cpp
index c11e5024946b..476687ae39b7 100644
--- a/media/libaah_rtp/aah_tx_player.cpp
+++ b/media/libaah_rtp/aah_tx_player.cpp
@@ -34,6 +34,7 @@
#include "aah_tx_packet.h"
#include "aah_tx_player.h"
+#include "utils.h"
namespace android {
@@ -52,6 +53,7 @@ static const int64_t kAAHBufferTimeUs = 1000000LL;
const int64_t AAH_TXPlayer::kAAHRetryKeepAroundTimeNs =
kAAHBufferTimeUs * 1100;
+const int AAH_TXPlayer::kPauseTSUpdateResendTimeoutMsec = 250;
const int32_t AAH_TXPlayer::kInvokeGetCNCPort = 0xB33977;
sp<MediaPlayerBase> createAAH_TXPlayer() {
@@ -521,6 +523,7 @@ status_t AAH_TXPlayer::play_l() {
status_t AAH_TXPlayer::stop() {
status_t ret = pause();
+ mPauseTSUpdateResendTimeout.setTimeout(-1);
sendEOS_l();
return ret;
}
@@ -586,11 +589,22 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) {
mCurrentClockTransform.a_to_b_denom = pause ? 0 : 1;
// send a packet announcing the new transform
+ sendTSUpdateNop_l();
+
+ // if we are paused, schedule a periodic resend of the TS update, JiC the
+ // receiveing client misses it.
+ if (mPlayRateIsPaused) {
+ mPauseTSUpdateResendTimeout.setTimeout(kPauseTSUpdateResendTimeoutMsec);
+ } else {
+ mPauseTSUpdateResendTimeout.setTimeout(-1);
+ }
+}
+
+void AAH_TXPlayer::sendEOS_l() {
if (mAAH_TXGroup != NULL) {
sp<TRTPControlPacket> packet = new TRTPControlPacket();
if (packet != NULL) {
- packet->setClockTransform(mCurrentClockTransform);
- packet->setCommandID(TRTPControlPacket::kCommandNop);
+ packet->setCommandID(TRTPControlPacket::kCommandEOS);
sendPacket_l(packet);
} else {
LOGD("Failed to allocate TRTP packet at %s:%d", __FILE__, __LINE__);
@@ -598,11 +612,12 @@ void AAH_TXPlayer::updateClockTransform_l(bool pause) {
}
}
-void AAH_TXPlayer::sendEOS_l() {
- if (mAAH_TXGroup != NULL) {
+void AAH_TXPlayer::sendTSUpdateNop_l() {
+ if ((mAAH_TXGroup != NULL) && mCurrentClockTransformValid) {
sp<TRTPControlPacket> packet = new TRTPControlPacket();
if (packet != NULL) {
- packet->setCommandID(TRTPControlPacket::kCommandEOS);
+ packet->setClockTransform(mCurrentClockTransform);
+ packet->setCommandID(TRTPControlPacket::kCommandFlush);
sendPacket_l(packet);
} else {
LOGD("Failed to allocate TRTP packet at %s:%d", __FILE__, __LINE__);
@@ -754,20 +769,20 @@ void AAH_TXPlayer::reset_l() {
mIsSeeking = false;
mSeekTimeUs = 0;
+ mPauseTSUpdateResendTimeout.setTimeout(-1);
+
mUri.setTo("");
mUriHeaders.clear();
mFileSource.clear();
mBitrate = -1;
- mProgramID = 0;
if (mAAH_TXGroup != NULL) {
- LOGI("TXPlayer leaving TXGroup listening on C&C port %hu",
- mAAH_TXGroup->getCmdAndControlPort());
- mAAH_TXGroup->dropClientReference();
+ mAAH_TXGroup->unregisterClient(sp<AAH_TXPlayer>(this));
mAAH_TXGroup = NULL;
}
+ mProgramID = 0;
mLastQueuedMediaTimePTSValid = false;
mCurrentClockTransformValid = false;
@@ -873,6 +888,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(
uint32_t addr = ntohl(endpoint->sin_addr.s_addr);
uint16_t port = ntohs(endpoint->sin_port);
+ sp<AAH_TXPlayer> thiz(this);
if ((addr & 0xF0000000) == 0xE0000000) {
// Starting in multicast mode? We need to have a specified port to
// multicast to, so sanity check that first. Then search for an
@@ -884,10 +900,11 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(
return BAD_VALUE;
}
- mAAH_TXGroup = AAH_TXGroup::getGroup(endpoint);
+ mAAH_TXGroup = AAH_TXGroup::getGroup(endpoint, thiz);
if (mAAH_TXGroup == NULL) {
// No pre-existing group. Make a new one.
- mAAH_TXGroup = AAH_TXGroup::getGroup(static_cast<uint16_t>(0));
+ mAAH_TXGroup = AAH_TXGroup::getGroup(static_cast<uint16_t>(0),
+ thiz);
// Still no group? Thats bad. We probably have exceeded our limit
// on the number of simultaneous TX groups.
@@ -907,7 +924,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(
} else if (addr == INADDR_ANY) {
// Starting in unicast mode. A port of 0 means we need to create a new
// group, a non-zero port means that we want to join an existing one.
- mAAH_TXGroup = AAH_TXGroup::getGroup(port);
+ mAAH_TXGroup = AAH_TXGroup::getGroup(port, thiz);
if (mAAH_TXGroup == NULL) {
if (port) {
@@ -929,7 +946,7 @@ status_t AAH_TXPlayer::setRetransmitEndpoint(
}
CHECK(mAAH_TXGroup != NULL);
- mProgramID = mAAH_TXGroup->getNewProgramID();
+ CHECK(mProgramID != 0);
return OK;
}
@@ -1111,6 +1128,8 @@ void AAH_TXPlayer::onPumpAudio() {
// of good options here. For now, signal an error up to the app level
// and shut down the transmission pump.
int64_t commonTimeNow;
+ int64_t mediaTimeNow;
+ bool mediaTimeNowValid = false;
if (OK != mCCHelper.getCommonTime(&commonTimeNow)) {
// Failed to get common time; either the service is down or common
// time is not synced. Raise an error and shutdown the player.
@@ -1121,20 +1140,34 @@ void AAH_TXPlayer::onPumpAudio() {
break;
}
- if (mCurrentClockTransformValid && mLastQueuedMediaTimePTSValid) {
- int64_t mediaTimeNow;
- bool conversionResult = mCurrentClockTransform.doReverseTransform(
- commonTimeNow,
- &mediaTimeNow);
- CHECK(conversionResult);
+ if (mCurrentClockTransformValid) {
+ mediaTimeNowValid = mCurrentClockTransform.doReverseTransform(
+ commonTimeNow,
+ &mediaTimeNow);
+ CHECK(mediaTimeNowValid);
+ }
- if ((mediaTimeNow +
- kAAHBufferTimeUs -
- mLastQueuedMediaTimePTS) <= 0) {
- break;
+ // Has our pause-timestamp-update timer fired? If so, take appropriate
+ // action.
+ if (!mPauseTSUpdateResendTimeout.msecTillTimeout()) {
+ if (mPlayRateIsPaused) {
+ // Send the update and schedule the next update.
+ sendTSUpdateNop_l();
+ mPauseTSUpdateResendTimeout.setTimeout(
+ kPauseTSUpdateResendTimeoutMsec);
+ } else {
+ // Not paused; cancel the timer so it does not bug us anymore.
+ mPauseTSUpdateResendTimeout.setTimeout(-1);
}
}
+ // Stop if we have reached our buffer threshold.
+ if (mediaTimeNowValid &&
+ mLastQueuedMediaTimePTSValid &&
+ (mediaTimeNow + kAAHBufferTimeUs - mLastQueuedMediaTimePTS) <= 0) {
+ break;
+ }
+
MediaSource::ReadOptions options;
if (mIsSeeking) {
options.setSeekTo(mSeekTimeUs);
diff --git a/media/libaah_rtp/aah_tx_player.h b/media/libaah_rtp/aah_tx_player.h
index 210e8ff22e0a..05dcd73322f3 100644
--- a/media/libaah_rtp/aah_tx_player.h
+++ b/media/libaah_rtp/aah_tx_player.h
@@ -66,6 +66,9 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
virtual status_t setRetransmitEndpoint(const struct sockaddr_in*
endpoint);
+ void setProgramID(uint8_t programID) { mProgramID = programID; }
+ uint8_t getProgramID() const { return mProgramID; }
+
static const int64_t kAAHRetryKeepAroundTimeNs;
static const int32_t kInvokeGetCNCPort;
@@ -103,6 +106,7 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
status_t seekTo_l(int64_t timeUs);
void updateClockTransform_l(bool pause);
void sendEOS_l();
+ void sendTSUpdateNop_l();
void cancelPlayerEvents(bool keepBufferingGoing = false);
void reset_l();
void notifyListener_l(int msg, int ext1 = 0, int ext2 = 0);
@@ -139,6 +143,8 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
bool mIsSeeking;
int64_t mSeekTimeUs;
+ Timeout mPauseTSUpdateResendTimeout;
+
sp<TimedEventQueue::Event> mPumpAudioEvent;
bool mPumpAudioEventPending;
@@ -162,9 +168,11 @@ class AAH_TXPlayer : public MediaPlayerHWInterface {
bool mPlayRateIsPaused;
CCHelper mCCHelper;
- uint16_t mProgramID;
+ uint8_t mProgramID;
uint8_t mTRTPVolume;
+ static const int kPauseTSUpdateResendTimeoutMsec;
+
DISALLOW_EVIL_CONSTRUCTORS(AAH_TXPlayer);
};
diff --git a/media/libaah_rtp/utils.cpp b/media/libaah_rtp/utils.cpp
new file mode 100644
index 000000000000..3ed259944cf1
--- /dev/null
+++ b/media/libaah_rtp/utils.cpp
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils.h"
+
+namespace android {
+
+void Timeout::setTimeout(int msec) {
+ if (msec < 0) {
+ mSystemEndTime = 0;
+ return;
+ }
+
+ mSystemEndTime = systemTime() + (static_cast<nsecs_t>(msec) * 1000000);
+}
+
+int Timeout::msecTillTimeout(nsecs_t nowTime) {
+ if (!mSystemEndTime) {
+ return -1;
+ }
+
+ if (mSystemEndTime < nowTime) {
+ return 0;
+ }
+
+ nsecs_t delta = mSystemEndTime - nowTime;
+ delta += 999999;
+ delta /= 1000000;
+ if (delta > 0x7FFFFFFF) {
+ return 0x7FFFFFFF;
+ }
+
+ return static_cast<int>(delta);
+}
+
+} // namespace android
diff --git a/media/libaah_rtp/utils.h b/media/libaah_rtp/utils.h
new file mode 100644
index 000000000000..1e3d326612f6
--- /dev/null
+++ b/media/libaah_rtp/utils.h
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2012 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __UTILS_H__
+#define __UTILS_H__
+
+#include <netinet/in.h>
+
+#include <media/stagefright/foundation/ABase.h>
+#include <utils/Timers.h>
+
+namespace android {
+
+// Definition of a helper class used to track things like when we need to
+// transmit a heartbeat, or when we will need to wake up and trim the retry
+// buffers.
+class Timeout {
+ public:
+ Timeout() : mSystemEndTime(0) { }
+
+ // Set a timeout which should occur msec milliseconds from now.
+ // Negative values will cancel any current timeout;
+ void setTimeout(int msec);
+
+ // Return the number of milliseconds until the timeout occurs, or -1 if
+ // no timeout is scheduled.
+ int msecTillTimeout(nsecs_t nowTime);
+ int msecTillTimeout() { return msecTillTimeout(systemTime()); }
+
+ private:
+ // The systemTime() at which the timeout will be complete, or 0 if no
+ // timeout is currently scheduled.
+ nsecs_t mSystemEndTime;
+
+ DISALLOW_EVIL_CONSTRUCTORS(Timeout);
+};
+
+inline bool matchSockaddrs(const struct sockaddr_in* a,
+ const struct sockaddr_in* b) {
+ return ((a->sin_family == b->sin_family) &&
+ (a->sin_addr.s_addr == b->sin_addr.s_addr) &&
+ (a->sin_port == b->sin_port));
+}
+
+// Return the minimum timeout between a and b where timeouts less than 0 are
+// considered to be infinite
+inline int minTimeout(int a, int b) {
+ if (a < 0) {
+ return b;
+ }
+
+ if (b < 0) {
+ return a;
+ }
+
+ return ((a < b) ? a : b);
+}
+
+} // namespace android
+
+#endif // __UTILS_H__