blob: 4e71d5fe6d6c76ae504ebb7abcf27083d3513410 [file] [log] [blame]
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <inttypes.h>
//#define LOG_NDEBUG 0
#define LOG_TAG "NuCachedSource2"
#include <utils/Log.h>
#include <datasource/NuCachedSource2.h>
#include <datasource/HTTPBase.h>
#include <cutils/properties.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>
namespace android {
struct PageCache {
explicit PageCache(size_t pageSize);
~PageCache();
struct Page {
void *mData;
size_t mSize;
};
Page *acquirePage();
void releasePage(Page *page);
void appendPage(Page *page);
size_t releaseFromStart(size_t maxBytes);
size_t totalSize() const {
return mTotalSize;
}
void copy(size_t from, void *data, size_t size);
private:
size_t mPageSize;
size_t mTotalSize;
List<Page *> mActivePages;
List<Page *> mFreePages;
void freePages(List<Page *> *list);
DISALLOW_EVIL_CONSTRUCTORS(PageCache);
};
PageCache::PageCache(size_t pageSize)
: mPageSize(pageSize),
mTotalSize(0) {
}
PageCache::~PageCache() {
freePages(&mActivePages);
freePages(&mFreePages);
}
void PageCache::freePages(List<Page *> *list) {
List<Page *>::iterator it = list->begin();
while (it != list->end()) {
Page *page = *it;
free(page->mData);
delete page;
page = NULL;
++it;
}
}
PageCache::Page *PageCache::acquirePage() {
if (!mFreePages.empty()) {
List<Page *>::iterator it = mFreePages.begin();
Page *page = *it;
mFreePages.erase(it);
return page;
}
Page *page = new Page;
page->mData = malloc(mPageSize);
page->mSize = 0;
return page;
}
void PageCache::releasePage(Page *page) {
page->mSize = 0;
mFreePages.push_back(page);
}
void PageCache::appendPage(Page *page) {
mTotalSize += page->mSize;
mActivePages.push_back(page);
}
size_t PageCache::releaseFromStart(size_t maxBytes) {
size_t bytesReleased = 0;
while (maxBytes > 0 && !mActivePages.empty()) {
List<Page *>::iterator it = mActivePages.begin();
Page *page = *it;
if (maxBytes < page->mSize) {
break;
}
mActivePages.erase(it);
maxBytes -= page->mSize;
bytesReleased += page->mSize;
releasePage(page);
}
mTotalSize -= bytesReleased;
return bytesReleased;
}
void PageCache::copy(size_t from, void *data, size_t size) {
ALOGV("copy from %zu size %zu", from, size);
if (size == 0) {
return;
}
CHECK_LE(from + size, mTotalSize);
size_t offset = 0;
List<Page *>::iterator it = mActivePages.begin();
while (from >= offset + (*it)->mSize) {
offset += (*it)->mSize;
++it;
}
size_t delta = from - offset;
size_t avail = (*it)->mSize - delta;
if (avail >= size) {
memcpy(data, (const uint8_t *)(*it)->mData + delta, size);
return;
}
memcpy(data, (const uint8_t *)(*it)->mData + delta, avail);
++it;
data = (uint8_t *)data + avail;
size -= avail;
while (size > 0) {
size_t copy = (*it)->mSize;
if (copy > size) {
copy = size;
}
memcpy(data, (*it)->mData, copy);
data = (uint8_t *)data + copy;
size -= copy;
++it;
}
}
////////////////////////////////////////////////////////////////////////////////
NuCachedSource2::NuCachedSource2(
const sp<DataSource> &source,
const char *cacheConfig,
bool disconnectAtHighwatermark)
: mSource(source),
mReflector(new AHandlerReflector<NuCachedSource2>(this)),
mLooper(new ALooper),
mCache(new PageCache(kPageSize)),
mCacheOffset(0),
mFinalStatus(OK),
mLastAccessPos(0),
mFetching(true),
mDisconnecting(false),
mLastFetchTimeUs(-1),
mNumRetriesLeft(kMaxNumRetries),
mHighwaterThresholdBytes(kDefaultHighWaterThreshold),
mLowwaterThresholdBytes(kDefaultLowWaterThreshold),
mKeepAliveIntervalUs(kDefaultKeepAliveIntervalUs),
mDisconnectAtHighwatermark(disconnectAtHighwatermark) {
// We are NOT going to support disconnect-at-highwatermark indefinitely
// and we are not guaranteeing support for client-specified cache
// parameters. Both of these are temporary measures to solve a specific
// problem that will be solved in a better way going forward.
updateCacheParamsFromSystemProperty();
if (cacheConfig != NULL) {
updateCacheParamsFromString(cacheConfig);
}
if (mDisconnectAtHighwatermark) {
// Makes no sense to disconnect and do keep-alives...
mKeepAliveIntervalUs = 0;
}
mLooper->setName("NuCachedSource2");
mLooper->registerHandler(mReflector);
// Since it may not be obvious why our looper thread needs to be
// able to call into java since it doesn't appear to do so at all...
// IMediaHTTPConnection may be (and most likely is) implemented in JAVA
// and a local JAVA IBinder will call directly into JNI methods.
// So whenever we call DataSource::readAt it may end up in a call to
// IMediaHTTPConnection::readAt and therefore call back into JAVA.
mLooper->start(false /* runOnCallingThread */, true /* canCallJava */);
mName = String8::format("NuCachedSource2(%s)", mSource->toString().c_str());
}
NuCachedSource2::~NuCachedSource2() {
mLooper->stop();
mLooper->unregisterHandler(mReflector->id());
delete mCache;
mCache = NULL;
}
// static
sp<NuCachedSource2> NuCachedSource2::Create(
const sp<DataSource> &source,
const char *cacheConfig,
bool disconnectAtHighwatermark) {
sp<NuCachedSource2> instance = new NuCachedSource2(
source, cacheConfig, disconnectAtHighwatermark);
Mutex::Autolock autoLock(instance->mLock);
(new AMessage(kWhatFetchMore, instance->mReflector))->post();
return instance;
}
status_t NuCachedSource2::getEstimatedBandwidthKbps(int32_t *kbps) {
if (mSource->flags() & kIsHTTPBasedSource) {
HTTPBase* source = static_cast<HTTPBase *>(mSource.get());
return source->getEstimatedBandwidthKbps(kbps);
}
return ERROR_UNSUPPORTED;
}
void NuCachedSource2::close() {
disconnect();
}
void NuCachedSource2::disconnect() {
if (mSource->flags() & kIsHTTPBasedSource) {
ALOGV("disconnecting HTTPBasedSource");
{
Mutex::Autolock autoLock(mLock);
// set mDisconnecting to true, if a fetch returns after
// this, the source will be marked as EOS.
mDisconnecting = true;
// explicitly signal mCondition so that the pending readAt()
// will immediately return
mCondition.signal();
}
// explicitly disconnect from the source, to allow any
// pending reads to return more promptly
static_cast<HTTPBase *>(mSource.get())->disconnect();
}
}
status_t NuCachedSource2::setCacheStatCollectFreq(int32_t freqMs) {
if (mSource->flags() & kIsHTTPBasedSource) {
HTTPBase *source = static_cast<HTTPBase *>(mSource.get());
return source->setBandwidthStatCollectFreq(freqMs);
}
return ERROR_UNSUPPORTED;
}
status_t NuCachedSource2::initCheck() const {
return mSource->initCheck();
}
status_t NuCachedSource2::getSize(off64_t *size) {
return mSource->getSize(size);
}
uint32_t NuCachedSource2::flags() {
// Remove HTTP related flags since NuCachedSource2 is not HTTP-based.
uint32_t flags = mSource->flags() & ~(kWantsPrefetching | kIsHTTPBasedSource);
return (flags | kIsCachingDataSource);
}
void NuCachedSource2::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatFetchMore:
{
onFetch();
break;
}
case kWhatRead:
{
onRead(msg);
break;
}
default:
TRESPASS();
}
}
void NuCachedSource2::fetchInternal() {
ALOGV("fetchInternal");
bool reconnect = false;
{
Mutex::Autolock autoLock(mLock);
CHECK(mFinalStatus == OK || mNumRetriesLeft > 0);
if (mFinalStatus != OK) {
--mNumRetriesLeft;
reconnect = true;
}
}
if (reconnect) {
status_t err =
mSource->reconnectAtOffset(mCacheOffset + mCache->totalSize());
Mutex::Autolock autoLock(mLock);
if (mDisconnecting) {
mNumRetriesLeft = 0;
mFinalStatus = ERROR_END_OF_STREAM;
return;
} else if (err == ERROR_UNSUPPORTED || err == -EPIPE) {
// These are errors that are not likely to go away even if we
// retry, i.e. the server doesn't support range requests or similar.
mNumRetriesLeft = 0;
return;
} else if (err != OK) {
ALOGI("The attempt to reconnect failed, %d retries remaining",
mNumRetriesLeft);
return;
}
}
PageCache::Page *page = mCache->acquirePage();
ssize_t n = mSource->readAt(
mCacheOffset + mCache->totalSize(), page->mData, kPageSize);
Mutex::Autolock autoLock(mLock);
if (n == 0 || mDisconnecting) {
ALOGI("caching reached eos.");
mNumRetriesLeft = 0;
mFinalStatus = ERROR_END_OF_STREAM;
mCache->releasePage(page);
} else if (n < 0) {
mFinalStatus = n;
if (n == ERROR_UNSUPPORTED || n == -EPIPE) {
// These are errors that are not likely to go away even if we
// retry, i.e. the server doesn't support range requests or similar.
mNumRetriesLeft = 0;
}
ALOGE("source returned error %zd, %d retries left", n, mNumRetriesLeft);
mCache->releasePage(page);
} else {
if (mFinalStatus != OK) {
ALOGI("retrying a previously failed read succeeded.");
}
mNumRetriesLeft = kMaxNumRetries;
mFinalStatus = OK;
page->mSize = n;
mCache->appendPage(page);
}
}
void NuCachedSource2::onFetch() {
ALOGV("onFetch");
if (mFinalStatus != OK && mNumRetriesLeft == 0) {
ALOGV("EOS reached, done prefetching for now");
mFetching = false;
}
bool keepAlive =
!mFetching
&& mFinalStatus == OK
&& mKeepAliveIntervalUs > 0
&& ALooper::GetNowUs() >= mLastFetchTimeUs + mKeepAliveIntervalUs;
if (mFetching || keepAlive) {
if (keepAlive) {
ALOGI("Keep alive");
}
fetchInternal();
mLastFetchTimeUs = ALooper::GetNowUs();
if (mFetching && mCache->totalSize() >= mHighwaterThresholdBytes) {
ALOGI("Cache full, done prefetching for now");
mFetching = false;
if (mDisconnectAtHighwatermark
&& (mSource->flags() & DataSource::kIsHTTPBasedSource)) {
ALOGV("Disconnecting at high watermark");
static_cast<HTTPBase *>(mSource.get())->disconnect();
mFinalStatus = -EAGAIN;
}
}
} else {
Mutex::Autolock autoLock(mLock);
restartPrefetcherIfNecessary_l();
}
int64_t delayUs;
if (mFetching) {
if (mFinalStatus != OK && mNumRetriesLeft > 0) {
// We failed this time and will try again in 3 seconds.
delayUs = 3000000LL;
} else {
delayUs = 0;
}
} else {
delayUs = 100000LL;
}
(new AMessage(kWhatFetchMore, mReflector))->post(delayUs);
}
void NuCachedSource2::onRead(const sp<AMessage> &msg) {
ALOGV("onRead");
int64_t offset;
CHECK(msg->findInt64("offset", &offset));
void *data;
CHECK(msg->findPointer("data", &data));
size_t size;
CHECK(msg->findSize("size", &size));
ssize_t result = readInternal(offset, data, size);
if (result == -EAGAIN) {
msg->post(50000);
return;
}
Mutex::Autolock autoLock(mLock);
if (mDisconnecting) {
mCondition.signal();
return;
}
CHECK(mAsyncResult == NULL);
mAsyncResult = new AMessage;
mAsyncResult->setInt32("result", result);
mCondition.signal();
}
void NuCachedSource2::restartPrefetcherIfNecessary_l(
bool ignoreLowWaterThreshold, bool force) {
static const size_t kGrayArea = 1024 * 1024;
if (mFetching || (mFinalStatus != OK && mNumRetriesLeft == 0)) {
return;
}
if (!ignoreLowWaterThreshold && !force
&& mCacheOffset + mCache->totalSize() - mLastAccessPos
>= mLowwaterThresholdBytes) {
return;
}
size_t maxBytes = mLastAccessPos - mCacheOffset;
if (!force) {
if (maxBytes < kGrayArea) {
return;
}
maxBytes -= kGrayArea;
}
size_t actualBytes = mCache->releaseFromStart(maxBytes);
mCacheOffset += actualBytes;
ALOGI("restarting prefetcher, totalSize = %zu", mCache->totalSize());
mFetching = true;
}
ssize_t NuCachedSource2::readAt(off64_t offset, void *data, size_t size) {
Mutex::Autolock autoSerializer(mSerializer);
ALOGV("readAt offset %lld, size %zu", (long long)offset, size);
Mutex::Autolock autoLock(mLock);
if (mDisconnecting) {
return ERROR_END_OF_STREAM;
}
// If the request can be completely satisfied from the cache, do so.
if (offset >= mCacheOffset
&& offset + size <= mCacheOffset + mCache->totalSize()) {
size_t delta = offset - mCacheOffset;
mCache->copy(delta, data, size);
mLastAccessPos = offset + size;
return size;
}
sp<AMessage> msg = new AMessage(kWhatRead, mReflector);
msg->setInt64("offset", offset);
msg->setPointer("data", data);
msg->setSize("size", size);
CHECK(mAsyncResult == NULL);
msg->post();
while (mAsyncResult == NULL && !mDisconnecting) {
mCondition.wait(mLock);
}
if (mDisconnecting) {
mAsyncResult.clear();
return ERROR_END_OF_STREAM;
}
int32_t result;
CHECK(mAsyncResult->findInt32("result", &result));
mAsyncResult.clear();
if (result > 0) {
mLastAccessPos = offset + result;
}
return (ssize_t)result;
}
size_t NuCachedSource2::cachedSize() {
Mutex::Autolock autoLock(mLock);
return mCacheOffset + mCache->totalSize();
}
status_t NuCachedSource2::getAvailableSize(off64_t offset, off64_t *size) {
Mutex::Autolock autoLock(mLock);
status_t finalStatus = UNKNOWN_ERROR;
*size = approxDataRemaining_l(offset, &finalStatus);
return finalStatus;
}
size_t NuCachedSource2::approxDataRemaining(status_t *finalStatus) const {
Mutex::Autolock autoLock(mLock);
return approxDataRemaining_l(mLastAccessPos, finalStatus);
}
size_t NuCachedSource2::approxDataRemaining_l(off64_t offset, status_t *finalStatus) const {
*finalStatus = mFinalStatus;
if (mFinalStatus != OK && mNumRetriesLeft > 0) {
// Pretend that everything is fine until we're out of retries.
*finalStatus = OK;
}
offset = offset >= 0 ? offset : mLastAccessPos;
off64_t lastBytePosCached = mCacheOffset + mCache->totalSize();
if (offset < lastBytePosCached) {
return lastBytePosCached - offset;
}
return 0;
}
ssize_t NuCachedSource2::readInternal(off64_t offset, void *data, size_t size) {
CHECK_LE(size, (size_t)mHighwaterThresholdBytes);
ALOGV("readInternal offset %lld size %zu", (long long)offset, size);
Mutex::Autolock autoLock(mLock);
// If we're disconnecting, return EOS and don't access *data pointer.
// data could be on the stack of the caller to NuCachedSource2::readAt(),
// which may have exited already.
if (mDisconnecting) {
return ERROR_END_OF_STREAM;
}
if (!mFetching) {
mLastAccessPos = offset;
restartPrefetcherIfNecessary_l(
false, // ignoreLowWaterThreshold
true); // force
}
if (offset < mCacheOffset
|| offset >= (off64_t)(mCacheOffset + mCache->totalSize())) {
static const off64_t kPadding = 256 * 1024;
// In the presence of multiple decoded streams, once of them will
// trigger this seek request, the other one will request data "nearby"
// soon, adjust the seek position so that that subsequent request
// does not trigger another seek.
off64_t seekOffset = (offset > kPadding) ? offset - kPadding : 0;
seekInternal_l(seekOffset);
}
size_t delta = offset - mCacheOffset;
if (mFinalStatus != OK && mNumRetriesLeft == 0) {
if (delta >= mCache->totalSize()) {
return mFinalStatus;
}
size_t avail = mCache->totalSize() - delta;
if (avail > size) {
avail = size;
}
mCache->copy(delta, data, avail);
return avail;
}
if (offset + size <= mCacheOffset + mCache->totalSize()) {
mCache->copy(delta, data, size);
return size;
}
ALOGV("deferring read");
return -EAGAIN;
}
status_t NuCachedSource2::seekInternal_l(off64_t offset) {
mLastAccessPos = offset;
if (offset >= mCacheOffset
&& offset <= (off64_t)(mCacheOffset + mCache->totalSize())) {
return OK;
}
ALOGI("new range: offset= %lld", (long long)offset);
mCacheOffset = offset;
size_t totalSize = mCache->totalSize();
CHECK_EQ(mCache->releaseFromStart(totalSize), totalSize);
mNumRetriesLeft = kMaxNumRetries;
mFetching = true;
return OK;
}
void NuCachedSource2::resumeFetchingIfNecessary() {
Mutex::Autolock autoLock(mLock);
restartPrefetcherIfNecessary_l(true /* ignore low water threshold */);
}
String8 NuCachedSource2::getUri() {
return mSource->getUri();
}
String8 NuCachedSource2::getMIMEType() const {
return mSource->getMIMEType();
}
void NuCachedSource2::updateCacheParamsFromSystemProperty() {
char value[PROPERTY_VALUE_MAX];
if (!property_get("media.stagefright.cache-params", value, NULL)) {
return;
}
updateCacheParamsFromString(value);
}
void NuCachedSource2::updateCacheParamsFromString(const char *s) {
ssize_t lowwaterMarkKb, highwaterMarkKb;
int keepAliveSecs;
if (sscanf(s, "%zd/%zd/%d",
&lowwaterMarkKb, &highwaterMarkKb, &keepAliveSecs) != 3) {
ALOGE("Failed to parse cache parameters from '%s'.", s);
return;
}
if (lowwaterMarkKb >= 0) {
mLowwaterThresholdBytes = lowwaterMarkKb * 1024;
} else {
mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
}
if (highwaterMarkKb >= 0) {
mHighwaterThresholdBytes = highwaterMarkKb * 1024;
} else {
mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
}
if (mLowwaterThresholdBytes >= mHighwaterThresholdBytes) {
ALOGE("Illegal low/highwater marks specified, reverting to defaults.");
mLowwaterThresholdBytes = kDefaultLowWaterThreshold;
mHighwaterThresholdBytes = kDefaultHighWaterThreshold;
}
if (keepAliveSecs >= 0) {
mKeepAliveIntervalUs = keepAliveSecs * 1000000LL;
} else {
mKeepAliveIntervalUs = kDefaultKeepAliveIntervalUs;
}
ALOGV("lowwater = %zu bytes, highwater = %zu bytes, keepalive = %lld us",
mLowwaterThresholdBytes,
mHighwaterThresholdBytes,
(long long)mKeepAliveIntervalUs);
}
// static
void NuCachedSource2::RemoveCacheSpecificHeaders(
KeyedVector<String8, String8> *headers,
String8 *cacheConfig,
bool *disconnectAtHighwatermark) {
*cacheConfig = String8();
*disconnectAtHighwatermark = false;
if (headers == NULL) {
return;
}
ssize_t index;
if ((index = headers->indexOfKey(String8("x-cache-config"))) >= 0) {
*cacheConfig = headers->valueAt(index);
headers->removeItemsAt(index);
ALOGV("Using special cache config '%s'", cacheConfig->c_str());
}
if ((index = headers->indexOfKey(
String8("x-disconnect-at-highwatermark"))) >= 0) {
*disconnectAtHighwatermark = true;
headers->removeItemsAt(index);
ALOGV("Client requested disconnection at highwater mark");
}
}
} // namespace android