bufferpool2: handle transfer to closed connection
Handle transfer to closed connection properly. Also avoid spinlocking
during invalidation.
Bug: 139506073
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Merged-In: I9037ab0bde48a51b2b97158ecd0086dac84f8b26
Change-Id: I9037ab0bde48a51b2b97158ecd0086dac84f8b26
diff --git a/media/bufferpool/1.0/AccessorImpl.cpp b/media/bufferpool/1.0/AccessorImpl.cpp
index fa17f15..a5366f6 100644
--- a/media/bufferpool/1.0/AccessorImpl.cpp
+++ b/media/bufferpool/1.0/AccessorImpl.cpp
@@ -151,6 +151,7 @@
newConnection->initialize(accessor, id);
*connection = newConnection;
*pConnectionId = id;
+ mBufferPool.mConnectionIds.insert(id);
++sSeqId;
}
}
@@ -305,7 +306,12 @@
found->second->mSenderValidated = true;
return true;
}
- // TODO: verify there is target connection Id
+ if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
+ // N.B: it could be fake or receive connection already closed.
+ ALOGD("bufferpool %p receiver connection %lld is no longer valid",
+ this, (long long)message.targetConnectionId);
+ return false;
+ }
mStats.onBufferSent();
mTransactions.insert(std::make_pair(
message.transactionId,
@@ -450,6 +456,7 @@
}
}
}
+ mConnectionIds.erase(connectionId);
return true;
}
diff --git a/media/bufferpool/1.0/AccessorImpl.h b/media/bufferpool/1.0/AccessorImpl.h
index c04dbf3..84cb685 100644
--- a/media/bufferpool/1.0/AccessorImpl.h
+++ b/media/bufferpool/1.0/AccessorImpl.h
@@ -94,6 +94,7 @@
std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
std::set<BufferId> mFreeBuffers;
+ std::set<ConnectionId> mConnectionIds;
/// Buffer pool statistics which tracks allocation and transfer statistics.
struct Stats {
diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp
index 94cf006..cacd465 100644
--- a/media/bufferpool/2.0/AccessorImpl.cpp
+++ b/media/bufferpool/2.0/AccessorImpl.cpp
@@ -163,6 +163,7 @@
*connection = newConnection;
*pConnectionId = id;
*pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+ mBufferPool.mConnectionIds.insert(id);
mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
mBufferPool.mInvalidation.onConnect(id, observer);
++sSeqId;
@@ -474,7 +475,12 @@
found->second->mSenderValidated = true;
return true;
}
- // TODO: verify there is target connection Id
+ if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
+ // N.B: it could be fake or receive connection already closed.
+ ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
+ this, (long long)message.targetConnectionId);
+ return false;
+ }
mStats.onBufferSent();
mTransactions.insert(std::make_pair(
message.transactionId,
@@ -644,6 +650,7 @@
}
}
}
+ mConnectionIds.erase(connectionId);
return true;
}
@@ -774,11 +781,19 @@
std::mutex &mutex,
std::condition_variable &cv,
bool &ready) {
+ constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
+ constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
+ constexpr useconds_t MAX_SLEEP_US = 10000;
+ uint32_t numSpin = 0;
+ useconds_t sleepUs = 1;
+
while(true) {
std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
{
std::unique_lock<std::mutex> lock(mutex);
if (!ready) {
+ numSpin = 0;
+ sleepUs = 1;
cv.wait(lock);
}
copied.insert(accessors.begin(), accessors.end());
@@ -800,9 +815,20 @@
if (accessors.size() == 0) {
ready = false;
} else {
- // prevent draining cpu.
+ // TODO Use an efficient way to wait over FMQ.
+ // N.B. Since there is not a efficient way to wait over FMQ,
+ // polling over the FMQ is the current way to prevent draining
+ // CPU.
lock.unlock();
- std::this_thread::yield();
+ ++numSpin;
+ if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
+ sleepUs < MAX_SLEEP_US) {
+ sleepUs *= 10;
+ }
+ if (numSpin % NUM_SPIN_TO_LOG == 0) {
+ ALOGW("invalidator thread spinning");
+ }
+ ::usleep(sleepUs);
}
}
}
diff --git a/media/bufferpool/2.0/AccessorImpl.h b/media/bufferpool/2.0/AccessorImpl.h
index eea72b9..807e0f1 100644
--- a/media/bufferpool/2.0/AccessorImpl.h
+++ b/media/bufferpool/2.0/AccessorImpl.h
@@ -111,6 +111,7 @@
std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
std::set<BufferId> mFreeBuffers;
+ std::set<ConnectionId> mConnectionIds;
struct Invalidation {
static std::atomic<std::uint32_t> sInvSeqId;