| #include <pdx/channel_handle.h> |
| #include <private/dvr/consumer_queue_channel.h> |
| #include <private/dvr/producer_channel.h> |
| |
| using android::pdx::ErrorStatus; |
| using android::pdx::RemoteChannelHandle; |
| using android::pdx::Status; |
| using android::pdx::rpc::DispatchRemoteMethod; |
| using android::pdx::rpc::RemoteMethodError; |
| |
| namespace android { |
| namespace dvr { |
| |
| ConsumerQueueChannel::ConsumerQueueChannel( |
| BufferHubService* service, int buffer_id, int channel_id, |
| const std::shared_ptr<Channel>& producer, bool silent) |
| : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType), |
| producer_(producer), |
| capacity_(0), |
| silent_(silent) { |
| GetProducer()->AddConsumer(this); |
| } |
| |
| ConsumerQueueChannel::~ConsumerQueueChannel() { |
| ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d", |
| channel_id()); |
| |
| if (auto producer = GetProducer()) { |
| producer->RemoveConsumer(this); |
| } |
| } |
| |
| bool ConsumerQueueChannel::HandleMessage(Message& message) { |
| ATRACE_NAME("ConsumerQueueChannel::HandleMessage"); |
| auto producer = GetProducer(); |
| if (!producer) { |
| RemoteMethodError(message, EPIPE); |
| return true; |
| } |
| |
| switch (message.GetOp()) { |
| case BufferHubRPC::CreateConsumerQueue::Opcode: |
| DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( |
| *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message); |
| return true; |
| |
| case BufferHubRPC::GetQueueInfo::Opcode: |
| DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( |
| *producer, &ProducerQueueChannel::OnGetQueueInfo, message); |
| return true; |
| |
| case BufferHubRPC::ConsumerQueueImportBuffers::Opcode: |
| DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>( |
| *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message); |
| return true; |
| |
| default: |
| return false; |
| } |
| } |
| |
| std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer() |
| const { |
| return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock()); |
| } |
| |
| void ConsumerQueueChannel::HandleImpulse(Message& /* message */) { |
| ATRACE_NAME("ConsumerQueueChannel::HandleImpulse"); |
| } |
| |
| BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { |
| BufferHubChannel::BufferInfo info; |
| if (auto producer = GetProducer()) { |
| // If producer has not hung up, copy most buffer info from the producer. |
| info = producer->GetBufferInfo(); |
| } |
| info.id = buffer_id(); |
| info.capacity = capacity_; |
| return info; |
| } |
| |
| void ConsumerQueueChannel::RegisterNewBuffer( |
| const std::shared_ptr<ProducerChannel>& producer_channel, |
| size_t producer_slot) { |
| ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d", |
| __FUNCTION__, buffer_id(), producer_channel->buffer_id(), |
| producer_slot, silent_); |
| // Only register buffers if the queue is not silent. |
| if (silent_) { |
| return; |
| } |
| |
| auto status = producer_channel->CreateConsumerStateMask(); |
| if (!status.ok()) { |
| ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__, |
| status.GetErrorMessage().c_str()); |
| return; |
| } |
| uint64_t consumer_state_mask = status.get(); |
| |
| pending_buffer_slots_.emplace(producer_channel, producer_slot, |
| consumer_state_mask); |
| // Signal the client that there is new buffer available. |
| SignalAvailable(); |
| } |
| |
| Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> |
| ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { |
| std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; |
| ATRACE_NAME(__FUNCTION__); |
| ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__, |
| pending_buffer_slots_.size()); |
| |
| // Indicate this is a silent queue that will not import buffers. |
| if (silent_) |
| return ErrorStatus(EBADR); |
| |
| while (!pending_buffer_slots_.empty()) { |
| auto producer_channel = |
| pending_buffer_slots_.front().producer_channel.lock(); |
| size_t producer_slot = pending_buffer_slots_.front().producer_slot; |
| uint64_t consumer_state_mask = |
| pending_buffer_slots_.front().consumer_state_mask; |
| pending_buffer_slots_.pop(); |
| |
| // It's possible that the producer channel has expired. When this occurs, |
| // ignore the producer channel. |
| if (producer_channel == nullptr) { |
| ALOGW("%s: producer channel has already been expired.", __FUNCTION__); |
| continue; |
| } |
| |
| auto status = |
| producer_channel->CreateConsumer(message, consumer_state_mask); |
| |
| // If no buffers are imported successfully, clear available and return an |
| // error. Otherwise, return all consumer handles already imported |
| // successfully, but keep available bits on, so that the client can retry |
| // importing remaining consumer buffers. |
| if (!status) { |
| ALOGE("%s: Failed create consumer: %s", __FUNCTION__, |
| status.GetErrorMessage().c_str()); |
| if (buffer_handles.empty()) { |
| ClearAvailable(); |
| return status.error_status(); |
| } else { |
| return {std::move(buffer_handles)}; |
| } |
| } |
| |
| buffer_handles.emplace_back(status.take(), producer_slot); |
| } |
| |
| ClearAvailable(); |
| return {std::move(buffer_handles)}; |
| } |
| |
| void ConsumerQueueChannel::OnProducerClosed() { |
| ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d", |
| buffer_id()); |
| producer_.reset(); |
| Hangup(); |
| } |
| |
| } // namespace dvr |
| } // namespace android |