blob: 5d7d4e9057a125b0d33f9b501c9b9e8246ef3d2c [file] [log] [blame]
#include <pdx/channel_handle.h>
#include <private/dvr/consumer_queue_channel.h>
#include <private/dvr/producer_channel.h>
using android::pdx::ErrorStatus;
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
using android::pdx::rpc::RemoteMethodError;
namespace android {
namespace dvr {
ConsumerQueueChannel::ConsumerQueueChannel(
BufferHubService* service, int buffer_id, int channel_id,
const std::shared_ptr<Channel>& producer, bool silent)
: BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
producer_(producer),
capacity_(0),
silent_(silent) {
GetProducer()->AddConsumer(this);
}
ConsumerQueueChannel::~ConsumerQueueChannel() {
ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
channel_id());
if (auto producer = GetProducer()) {
producer->RemoveConsumer(this);
}
}
bool ConsumerQueueChannel::HandleMessage(Message& message) {
ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
auto producer = GetProducer();
if (!producer) {
RemoteMethodError(message, EPIPE);
return true;
}
switch (message.GetOp()) {
case BufferHubRPC::CreateConsumerQueue::Opcode:
DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
*producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
return true;
case BufferHubRPC::GetQueueInfo::Opcode:
DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
*producer, &ProducerQueueChannel::OnGetQueueInfo, message);
return true;
case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
*this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
return true;
default:
return false;
}
}
std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
const {
return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
}
void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
}
BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
BufferHubChannel::BufferInfo info;
if (auto producer = GetProducer()) {
// If producer has not hung up, copy most buffer info from the producer.
info = producer->GetBufferInfo();
}
info.id = buffer_id();
info.capacity = capacity_;
return info;
}
void ConsumerQueueChannel::RegisterNewBuffer(
const std::shared_ptr<ProducerChannel>& producer_channel,
size_t producer_slot) {
ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
__FUNCTION__, buffer_id(), producer_channel->buffer_id(),
producer_slot, silent_);
// Only register buffers if the queue is not silent.
if (silent_) {
return;
}
auto status = producer_channel->CreateConsumerStateMask();
if (!status.ok()) {
ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
return;
}
uint64_t consumer_state_mask = status.get();
pending_buffer_slots_.emplace(producer_channel, producer_slot,
consumer_state_mask);
// Signal the client that there is new buffer available.
SignalAvailable();
}
Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
ATRACE_NAME(__FUNCTION__);
ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
pending_buffer_slots_.size());
// Indicate this is a silent queue that will not import buffers.
if (silent_)
return ErrorStatus(EBADR);
while (!pending_buffer_slots_.empty()) {
auto producer_channel =
pending_buffer_slots_.front().producer_channel.lock();
size_t producer_slot = pending_buffer_slots_.front().producer_slot;
uint64_t consumer_state_mask =
pending_buffer_slots_.front().consumer_state_mask;
pending_buffer_slots_.pop();
// It's possible that the producer channel has expired. When this occurs,
// ignore the producer channel.
if (producer_channel == nullptr) {
ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
continue;
}
auto status =
producer_channel->CreateConsumer(message, consumer_state_mask);
// If no buffers are imported successfully, clear available and return an
// error. Otherwise, return all consumer handles already imported
// successfully, but keep available bits on, so that the client can retry
// importing remaining consumer buffers.
if (!status) {
ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
status.GetErrorMessage().c_str());
if (buffer_handles.empty()) {
ClearAvailable();
return status.error_status();
} else {
return {std::move(buffer_handles)};
}
}
buffer_handles.emplace_back(status.take(), producer_slot);
}
ClearAvailable();
return {std::move(buffer_handles)};
}
void ConsumerQueueChannel::OnProducerClosed() {
ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
buffer_id());
producer_.reset();
Hangup();
}
} // namespace dvr
} // namespace android