diff options
| -rw-r--r-- | core/java/android/os/CombinedMessageQueue/MessageQueue.java | 2508 |
1 files changed, 2508 insertions, 0 deletions
diff --git a/core/java/android/os/CombinedMessageQueue/MessageQueue.java b/core/java/android/os/CombinedMessageQueue/MessageQueue.java new file mode 100644 index 000000000000..3e5ac6ff5a4a --- /dev/null +++ b/core/java/android/os/CombinedMessageQueue/MessageQueue.java @@ -0,0 +1,2508 @@ +/* + * Copyright (C) 2006 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package android.os; + +import android.annotation.IntDef; +import android.annotation.NonNull; +import android.annotation.TestApi; +import android.compat.annotation.UnsupportedAppUsage; +import android.os.Process; +import android.ravenwood.annotation.RavenwoodKeepWholeClass; +import android.ravenwood.annotation.RavenwoodRedirect; +import android.ravenwood.annotation.RavenwoodRedirectionClass; +import android.util.Log; +import android.util.Printer; +import android.util.SparseArray; +import android.util.proto.ProtoOutputStream; + +import dalvik.annotation.optimization.NeverCompile; + +import java.io.FileDescriptor; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Low-level class holding the list of messages to be dispatched by a + * {@link Looper}. Messages are not added directly to a MessageQueue, + * but rather through {@link Handler} objects associated with the Looper. + * + * <p>You can retrieve the MessageQueue for the current thread with + * {@link Looper#myQueue() Looper.myQueue()}. + */ +@RavenwoodKeepWholeClass +@RavenwoodRedirectionClass("MessageQueue_host") +public final class MessageQueue { + private static final String TAG_L = "LegacyMessageQueue"; + private static final String TAG_C = "ConcurrentMessageQueue"; + private static final boolean DEBUG = false; + private static final boolean TRACE = false; + + // True if the message queue can be quit. + @UnsupportedAppUsage + private final boolean mQuitAllowed; + + @UnsupportedAppUsage + @SuppressWarnings("unused") + private long mPtr; // used by native code + + @UnsupportedAppUsage + Message mMessages; + private Message mLast; + @UnsupportedAppUsage + private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>(); + private SparseArray<FileDescriptorRecord> mFileDescriptorRecords; + private IdleHandler[] mPendingIdleHandlers; + private boolean mLegacyQuitting; + + // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout. + private boolean mLegacyBlocked; + + // Tracks the number of async message. We use this in enqueueMessage() to avoid searching the + // queue for async messages when inserting a message at the tail. + private int mLegacyAsyncMessageCount; + + // The next barrier token. + // Barriers are indicated by messages with a null target whose arg1 field carries the token. + @UnsupportedAppUsage + private int mLegacyNextBarrierToken; + + /* + * Select between two implementations of message queue. The legacy implementation is used + * by default as it provides maximum compatibility with applications and tests that + * reach into MessageQueue via the mMessages field. The concurrent implemmentation is used for + * system processes and provides a higher level of concurrency and higher enqueue throughput + * than the legacy implementation. + */ + private static boolean sForceConcurrent = false; + + @RavenwoodRedirect + private native static long nativeInit(); + @RavenwoodRedirect + private native static void nativeDestroy(long ptr); + @UnsupportedAppUsage + @RavenwoodRedirect + private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/ + @RavenwoodRedirect + private native static void nativeWake(long ptr); + @RavenwoodRedirect + private native static boolean nativeIsPolling(long ptr); + @RavenwoodRedirect + private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events); + + MessageQueue(boolean quitAllowed) { + if (!sForceConcurrent) { + sForceConcurrent = Process.myUid() < Process.FIRST_APPLICATION_UID; + } + mQuitAllowed = quitAllowed; + mPtr = nativeInit(); + } + + @Override + protected void finalize() throws Throwable { + try { + dispose(); + } finally { + super.finalize(); + } + } + + // Disposes of the underlying message queue. + // Must only be called on the looper thread or the finalizer. + private void dispose() { + if (mPtr != 0) { + nativeDestroy(mPtr); + mPtr = 0; + } + } + + private class MatchDeliverableMessages extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.when <= when) { + return true; + } + return false; + } + } + private final MatchDeliverableMessages mMatchDeliverableMessages = + new MatchDeliverableMessages(); + /** + * Returns true if the looper has no pending messages which are due to be processed. + * + * <p>This method is safe to call from any thread. + * + * @return True if the looper is idle. + */ + public boolean isIdle() { + if (sForceConcurrent) { + final long now = SystemClock.uptimeMillis(); + + if (stackHasMessages(null, 0, null, null, now, mMatchDeliverableMessages, false)) { + return false; + } + + MessageNode msgNode = null; + MessageNode asyncMsgNode = null; + + if (!mPriorityQueue.isEmpty()) { + try { + msgNode = mPriorityQueue.first(); + } catch (NoSuchElementException e) { } + } + + if (!mAsyncPriorityQueue.isEmpty()) { + try { + asyncMsgNode = mAsyncPriorityQueue.first(); + } catch (NoSuchElementException e) { } + } + + if ((msgNode != null && msgNode.getWhen() <= now) + || (asyncMsgNode != null && asyncMsgNode.getWhen() <= now)) { + return false; + } + + return true; + } else { + synchronized (this) { + final long now = SystemClock.uptimeMillis(); + return mMessages == null || now < mMessages.when; + } + } + } + + /** + * Add a new {@link IdleHandler} to this message queue. This may be + * removed automatically for you by returning false from + * {@link IdleHandler#queueIdle IdleHandler.queueIdle()} when it is + * invoked, or explicitly removing it with {@link #removeIdleHandler}. + * + * <p>This method is safe to call from any thread. + * + * @param handler The IdleHandler to be added. + */ + public void addIdleHandler(@NonNull IdleHandler handler) { + if (handler == null) { + throw new NullPointerException("Can't add a null IdleHandler"); + } + if (sForceConcurrent) { + synchronized (mIdleHandlersLock) { + mIdleHandlers.add(handler); + } + } else { + synchronized (this) { + mIdleHandlers.add(handler); + } + } + } + + /** + * Remove an {@link IdleHandler} from the queue that was previously added + * with {@link #addIdleHandler}. If the given object is not currently + * in the idle list, nothing is done. + * + * <p>This method is safe to call from any thread. + * + * @param handler The IdleHandler to be removed. + */ + public void removeIdleHandler(@NonNull IdleHandler handler) { + if (sForceConcurrent) { + synchronized (mIdleHandlersLock) { + mIdleHandlers.remove(handler); + } + } else { + synchronized (this) { + mIdleHandlers.remove(handler); + } + } + } + + /** + * Returns whether this looper's thread is currently polling for more work to do. + * This is a good signal that the loop is still alive rather than being stuck + * handling a callback. Note that this method is intrinsically racy, since the + * state of the loop can change before you get the result back. + * + * <p>This method is safe to call from any thread. + * + * @return True if the looper is currently polling for events. + * @hide + */ + public boolean isPolling() { + if (sForceConcurrent) { + // If the loop is quitting then it must not be idling. + // We can assume mPtr != 0 when sQuitting is false. + return !((boolean) sQuitting.getVolatile(this)) && nativeIsPolling(mPtr); + } else { + synchronized (this) { + return isPollingLocked(); + } + } + } + + private boolean isPollingLocked() { + // If the loop is quitting then it must not be idling. + // We can assume mPtr != 0 when mLegacyQuitting is false. + return !mLegacyQuitting && nativeIsPolling(mPtr); + } + + /** + * Adds a file descriptor listener to receive notification when file descriptor + * related events occur. + * <p> + * If the file descriptor has already been registered, the specified events + * and listener will replace any that were previously associated with it. + * It is not possible to set more than one listener per file descriptor. + * </p><p> + * It is important to always unregister the listener when the file descriptor + * is no longer of use. + * </p> + * + * @param fd The file descriptor for which a listener will be registered. + * @param events The set of events to receive: a combination of the + * {@link OnFileDescriptorEventListener#EVENT_INPUT}, + * {@link OnFileDescriptorEventListener#EVENT_OUTPUT}, and + * {@link OnFileDescriptorEventListener#EVENT_ERROR} event masks. If the requested + * set of events is zero, then the listener is unregistered. + * @param listener The listener to invoke when file descriptor events occur. + * + * @see OnFileDescriptorEventListener + * @see #removeOnFileDescriptorEventListener + */ + @android.ravenwood.annotation.RavenwoodThrow(blockedBy = android.os.ParcelFileDescriptor.class) + public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd, + @OnFileDescriptorEventListener.Events int events, + @NonNull OnFileDescriptorEventListener listener) { + if (fd == null) { + throw new IllegalArgumentException("fd must not be null"); + } + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + + if (sForceConcurrent) { + synchronized (mFileDescriptorRecordsLock) { + updateOnFileDescriptorEventListenerLocked(fd, events, listener); + } + } else { + synchronized (this) { + updateOnFileDescriptorEventListenerLocked(fd, events, listener); + } + } + } + + /** + * Removes a file descriptor listener. + * <p> + * This method does nothing if no listener has been registered for the + * specified file descriptor. + * </p> + * + * @param fd The file descriptor whose listener will be unregistered. + * + * @see OnFileDescriptorEventListener + * @see #addOnFileDescriptorEventListener + */ + @android.ravenwood.annotation.RavenwoodThrow(blockedBy = android.os.ParcelFileDescriptor.class) + public void removeOnFileDescriptorEventListener(@NonNull FileDescriptor fd) { + if (fd == null) { + throw new IllegalArgumentException("fd must not be null"); + } + if (sForceConcurrent) { + synchronized (mFileDescriptorRecordsLock) { + updateOnFileDescriptorEventListenerLocked(fd, 0, null); + } + } else { + synchronized (this) { + updateOnFileDescriptorEventListenerLocked(fd, 0, null); + } + } + } + + @android.ravenwood.annotation.RavenwoodThrow(blockedBy = android.os.ParcelFileDescriptor.class) + private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events, + OnFileDescriptorEventListener listener) { + final int fdNum = fd.getInt$(); + + int index = -1; + FileDescriptorRecord record = null; + if (mFileDescriptorRecords != null) { + index = mFileDescriptorRecords.indexOfKey(fdNum); + if (index >= 0) { + record = mFileDescriptorRecords.valueAt(index); + if (record != null && record.mEvents == events) { + return; + } + } + } + + if (events != 0) { + events |= OnFileDescriptorEventListener.EVENT_ERROR; + if (record == null) { + if (mFileDescriptorRecords == null) { + mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>(); + } + record = new FileDescriptorRecord(fd, events, listener); + mFileDescriptorRecords.put(fdNum, record); + } else { + record.mListener = listener; + record.mEvents = events; + record.mSeq += 1; + } + nativeSetFileDescriptorEvents(mPtr, fdNum, events); + } else if (record != null) { + record.mEvents = 0; + mFileDescriptorRecords.removeAt(index); + nativeSetFileDescriptorEvents(mPtr, fdNum, 0); + } + } + + // Called from native code. + @UnsupportedAppUsage(maxTargetSdk = Build.VERSION_CODES.R, trackingBug = 170729553) + private int dispatchEvents(int fd, int events) { + // Get the file descriptor record and any state that might change. + final FileDescriptorRecord record; + final int oldWatchedEvents; + final OnFileDescriptorEventListener listener; + final int seq; + if (sForceConcurrent) { + synchronized (mFileDescriptorRecordsLock) { + record = mFileDescriptorRecords.get(fd); + if (record == null) { + return 0; // spurious, no listener registered + } + + oldWatchedEvents = record.mEvents; + events &= oldWatchedEvents; // filter events based on current watched set + if (events == 0) { + return oldWatchedEvents; // spurious, watched events changed + } + + listener = record.mListener; + seq = record.mSeq; + } + } else { + synchronized (this) { + record = mFileDescriptorRecords.get(fd); + if (record == null) { + return 0; // spurious, no listener registered + } + + oldWatchedEvents = record.mEvents; + events &= oldWatchedEvents; // filter events based on current watched set + if (events == 0) { + return oldWatchedEvents; // spurious, watched events changed + } + + listener = record.mListener; + seq = record.mSeq; + } + } + // Invoke the listener outside of the lock. + int newWatchedEvents = listener.onFileDescriptorEvents( + record.mDescriptor, events); + if (newWatchedEvents != 0) { + newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR; + } + + // Update the file descriptor record if the listener changed the set of + // events to watch and the listener itself hasn't been updated since. + if (newWatchedEvents != oldWatchedEvents) { + synchronized (this) { + int index = mFileDescriptorRecords.indexOfKey(fd); + if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record + && record.mSeq == seq) { + record.mEvents = newWatchedEvents; + if (newWatchedEvents == 0) { + mFileDescriptorRecords.removeAt(index); + } + } + } + } + + // Return the new set of events to watch for native code to take care of. + return newWatchedEvents; + } + + private static final AtomicLong mMessagesDelivered = new AtomicLong(); + + /* This is only read/written from the Looper thread. For use with Concurrent MQ */ + private int mNextPollTimeoutMillis; + private boolean mMessageDirectlyQueued; + private Message nextMessage() { + int i = 0; + + while (true) { + if (DEBUG) { + Log.d(TAG_C, "nextMessage loop #" + i); + i++; + } + + mDrainingLock.lock(); + mNextIsDrainingStack = true; + mDrainingLock.unlock(); + + /* + * Set our state to active, drain any items from the stack into our priority queues + */ + StackNode oldTop; + oldTop = swapAndSetStackStateActive(); + drainStack(oldTop); + + mDrainingLock.lock(); + mNextIsDrainingStack = false; + mDrainCompleted.signalAll(); + mDrainingLock.unlock(); + + /* + * The objective of this next block of code is to: + * - find a message to return (if any is ready) + * - find a next message we would like to return, after scheduling. + * - we make our scheduling decision based on this next message (if it exists). + * + * We have two queues to juggle and the presence of barriers throws an additional + * wrench into our plans. + * + * The last wrinkle is that remove() may delete items from underneath us. If we hit + * that case, we simply restart the loop. + */ + + /* Get the first node from each queue */ + Iterator<MessageNode> queueIter = mPriorityQueue.iterator(); + MessageNode msgNode = iterateNext(queueIter); + Iterator<MessageNode> asyncQueueIter = mAsyncPriorityQueue.iterator(); + MessageNode asyncMsgNode = iterateNext(asyncQueueIter); + + if (DEBUG) { + if (msgNode != null) { + Message msg = msgNode.mMessage; + Log.d(TAG_C, "Next found node what: " + msg.what + " when: " + msg.when + + " seq: " + msgNode.mInsertSeq + "barrier: " + + msgNode.isBarrier() + " now: " + SystemClock.uptimeMillis()); + } + if (asyncMsgNode != null) { + Message msg = asyncMsgNode.mMessage; + Log.d(TAG_C, "Next found async node what: " + msg.what + " when: " + msg.when + + " seq: " + asyncMsgNode.mInsertSeq + "barrier: " + + asyncMsgNode.isBarrier() + " now: " + + SystemClock.uptimeMillis()); + } + } + + /* + * the node which we will return, null if none are ready + */ + MessageNode found = null; + /* + * The node from which we will determine our next wakeup time. + * Null indicates there is no next message ready. If we found a node, + * we can leave this null as Looper will call us again after delivering + * the message. + */ + MessageNode next = null; + + long now = SystemClock.uptimeMillis(); + /* + * If we have a barrier we should return the async node (if it exists and is ready) + */ + if (msgNode != null && msgNode.isBarrier()) { + if (asyncMsgNode != null && now >= asyncMsgNode.getWhen()) { + found = asyncMsgNode; + } else { + next = asyncMsgNode; + } + } else { /* No barrier. */ + MessageNode earliest; + /* + * If we have two messages, pick the earliest option from either queue. + * Otherwise grab whichever node is non-null. If both are null we'll fall through. + */ + earliest = pickEarliestNode(msgNode, asyncMsgNode); + + if (earliest != null) { + if (now >= earliest.getWhen()) { + found = earliest; + } else { + next = earliest; + } + } + } + + if (DEBUG) { + if (found != null) { + Message msg = found.mMessage; + Log.d(TAG_C, " Will deliver node what: " + msg.what + " when: " + msg.when + + " seq: " + found.mInsertSeq + " barrier: " + found.isBarrier() + + " async: " + found.isAsync() + " now: " + + SystemClock.uptimeMillis()); + } else { + Log.d(TAG_C, "No node to deliver"); + } + if (next != null) { + Message msg = next.mMessage; + Log.d(TAG_C, "Next node what: " + msg.what + " when: " + msg.when + " seq: " + + next.mInsertSeq + " barrier: " + next.isBarrier() + " async: " + + next.isAsync() + + " now: " + SystemClock.uptimeMillis()); + } else { + Log.d(TAG_C, "No next node"); + } + } + + /* + * If we have a found message, we will get called again so there's no need to set state. + * In that case we can leave our state as ACTIVE. + * + * Otherwise we should determine how to park the thread. + */ + StateNode nextOp = sStackStateActive; + if (found == null) { + if (next == null) { + /* No message to deliver, sleep indefinitely */ + mNextPollTimeoutMillis = -1; + nextOp = sStackStateParked; + if (DEBUG) { + Log.d(TAG_C, "nextMessage next state is StackStateParked"); + } + } else { + /* Message not ready, or we found one to deliver already, set a timeout */ + long nextMessageWhen = next.getWhen(); + if (nextMessageWhen > now) { + mNextPollTimeoutMillis = (int) Math.min(nextMessageWhen - now, + Integer.MAX_VALUE); + } else { + mNextPollTimeoutMillis = 0; + } + + mStackStateTimedPark.mWhenToWake = now + mNextPollTimeoutMillis; + nextOp = mStackStateTimedPark; + if (DEBUG) { + Log.d(TAG_C, "nextMessage next state is StackStateTimedParked timeout ms " + + mNextPollTimeoutMillis + " mWhenToWake: " + + mStackStateTimedPark.mWhenToWake + " now " + now); + } + } + } + + /* + * Try to swap our state from Active back to Park or TimedPark. If we raced with + * enqueue, loop back around to pick up any new items. + */ + if (sState.compareAndSet(this, sStackStateActive, nextOp)) { + mMessageCounts.clearCounts(); + if (found != null) { + if (!removeFromPriorityQueue(found)) { + /* + * RemoveMessages() might be able to pull messages out from under us + * However we can detect that here and just loop around if it happens. + */ + continue; + } + + if (TRACE) { + Trace.setCounter("MQ.Delivered", mMessagesDelivered.incrementAndGet()); + } + return found.mMessage; + } + return null; + } + } + } + + private Message nextConcurrent() { + final long ptr = mPtr; + if (ptr == 0) { + return null; + } + + mNextPollTimeoutMillis = 0; + int pendingIdleHandlerCount = -1; // -1 only during first iteration + while (true) { + if (mNextPollTimeoutMillis != 0) { + Binder.flushPendingCommands(); + } + + mMessageDirectlyQueued = false; + nativePollOnce(ptr, mNextPollTimeoutMillis); + + Message msg = nextMessage(); + if (msg != null) { + msg.markInUse(); + return msg; + } + + if ((boolean) sQuitting.getVolatile(this)) { + return null; + } + + synchronized (mIdleHandlersLock) { + // If first time idle, then get the number of idlers to run. + // Idle handles only run if the queue is empty or if the first message + // in the queue (possibly a barrier) is due to be handled in the future. + if (pendingIdleHandlerCount < 0 + && isIdle()) { + pendingIdleHandlerCount = mIdleHandlers.size(); + } + if (pendingIdleHandlerCount <= 0) { + // No idle handlers to run. Loop and wait some more. + continue; + } + + if (mPendingIdleHandlers == null) { + mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; + } + mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); + } + + // Run the idle handlers. + // We only ever reach this code block during the first iteration. + for (int i = 0; i < pendingIdleHandlerCount; i++) { + final IdleHandler idler = mPendingIdleHandlers[i]; + mPendingIdleHandlers[i] = null; // release the reference to the handler + + boolean keep = false; + try { + keep = idler.queueIdle(); + } catch (Throwable t) { + Log.wtf(TAG_C, "IdleHandler threw exception", t); + } + + if (!keep) { + synchronized (mIdleHandlersLock) { + mIdleHandlers.remove(idler); + } + } + } + + // Reset the idle handler count to 0 so we do not run them again. + pendingIdleHandlerCount = 0; + + // While calling an idle handler, a new message could have been delivered + // so go back and look again for a pending message without waiting. + mNextPollTimeoutMillis = 0; + } + } + + @UnsupportedAppUsage + Message next() { + if (sForceConcurrent) { + return nextConcurrent(); + } + + // Return here if the message loop has already quit and been disposed. + // This can happen if the application tries to restart a looper after quit + // which is not supported. + final long ptr = mPtr; + if (ptr == 0) { + return null; + } + + int pendingIdleHandlerCount = -1; // -1 only during first iteration + int nextPollTimeoutMillis = 0; + for (;;) { + if (nextPollTimeoutMillis != 0) { + Binder.flushPendingCommands(); + } + + nativePollOnce(ptr, nextPollTimeoutMillis); + + synchronized (this) { + // Try to retrieve the next message. Return if found. + final long now = SystemClock.uptimeMillis(); + Message prevMsg = null; + Message msg = mMessages; + if (msg != null && msg.target == null) { + // Stalled by a barrier. Find the next asynchronous message in the queue. + do { + prevMsg = msg; + msg = msg.next; + } while (msg != null && !msg.isAsynchronous()); + } + if (msg != null) { + if (now < msg.when) { + // Next message is not ready. Set a timeout to wake up when it is ready. + nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); + } else { + // Got a message. + mLegacyBlocked = false; + if (prevMsg != null) { + prevMsg.next = msg.next; + if (prevMsg.next == null) { + mLast = prevMsg; + } + } else { + mMessages = msg.next; + if (msg.next == null) { + mLast = null; + } + } + msg.next = null; + if (DEBUG) Log.v(TAG_L, "Returning message: " + msg); + msg.markInUse(); + if (msg.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + if (TRACE) { + Trace.setCounter("MQ.Delivered", mMessagesDelivered.incrementAndGet()); + } + return msg; + } + } else { + // No more messages. + nextPollTimeoutMillis = -1; + } + + // Process the quit message now that all pending messages have been handled. + if (mLegacyQuitting) { + dispose(); + return null; + } + + // If first time idle, then get the number of idlers to run. + // Idle handles only run if the queue is empty or if the first message + // in the queue (possibly a barrier) is due to be handled in the future. + if (pendingIdleHandlerCount < 0 + && (mMessages == null || now < mMessages.when)) { + pendingIdleHandlerCount = mIdleHandlers.size(); + } + if (pendingIdleHandlerCount <= 0) { + // No idle handlers to run. Loop and wait some more. + mLegacyBlocked = true; + continue; + } + + if (mPendingIdleHandlers == null) { + mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; + } + mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); + } + + // Run the idle handlers. + // We only ever reach this code block during the first iteration. + for (int i = 0; i < pendingIdleHandlerCount; i++) { + final IdleHandler idler = mPendingIdleHandlers[i]; + mPendingIdleHandlers[i] = null; // release the reference to the handler + + boolean keep = false; + try { + keep = idler.queueIdle(); + } catch (Throwable t) { + Log.wtf(TAG_L, "IdleHandler threw exception", t); + } + + if (!keep) { + synchronized (this) { + mIdleHandlers.remove(idler); + } + } + } + + // Reset the idle handler count to 0 so we do not run them again. + pendingIdleHandlerCount = 0; + + // While calling an idle handler, a new message could have been delivered + // so go back and look again for a pending message without waiting. + nextPollTimeoutMillis = 0; + } + } + + void quit(boolean safe) { + if (!mQuitAllowed) { + throw new IllegalStateException("Main thread not allowed to quit."); + } + + if (sForceConcurrent) { + synchronized (mIdleHandlersLock) { + if (sQuitting.compareAndSet(this, false, true)) { + if (safe) { + removeAllFutureMessages(); + } else { + removeAllMessages(); + } + + // We can assume mPtr != 0 because sQuitting was previously false. + nativeWake(mPtr); + } + } + } else { + synchronized (this) { + if (mLegacyQuitting) { + return; + } + mLegacyQuitting = true; + + if (safe) { + removeAllFutureMessagesLocked(); + } else { + removeAllMessagesLocked(); + } + + // We can assume mPtr != 0 because mLegacyQuitting was previously false. + nativeWake(mPtr); + } + } + } + + /** + * Posts a synchronization barrier to the Looper's message queue. + * + * Message processing occurs as usual until the message queue encounters the + * synchronization barrier that has been posted. When the barrier is encountered, + * later synchronous messages in the queue are stalled (prevented from being executed) + * until the barrier is released by calling {@link #removeSyncBarrier} and specifying + * the token that identifies the synchronization barrier. + * + * This method is used to immediately postpone execution of all subsequently posted + * synchronous messages until a condition is met that releases the barrier. + * Asynchronous messages (see {@link Message#isAsynchronous} are exempt from the barrier + * and continue to be processed as usual. + * + * This call must be always matched by a call to {@link #removeSyncBarrier} with + * the same token to ensure that the message queue resumes normal operation. + * Otherwise the application will probably hang! + * + * @return A token that uniquely identifies the barrier. This token must be + * passed to {@link #removeSyncBarrier} to release the barrier. + * + * @hide + */ + @UnsupportedAppUsage + @TestApi + public int postSyncBarrier() { + return postSyncBarrier(SystemClock.uptimeMillis()); + } + + private int postSyncBarrier(long when) { + // Enqueue a new sync barrier token. + // We don't need to wake the queue because the purpose of a barrier is to stall it. + if (sForceConcurrent) { + final int token = mNextBarrierToken.getAndIncrement(); + final Message msg = Message.obtain(); + + msg.markInUse(); + msg.arg1 = token; + + if (!enqueueMessageUnchecked(msg, when)) { + Log.wtf(TAG_C, "Unexpected error while adding sync barrier!"); + return -1; + } + + return token; + } + + synchronized (this) { + final int token = mLegacyNextBarrierToken++; + final Message msg = Message.obtain(); + msg.markInUse(); + msg.when = when; + msg.arg1 = token; + + if (Flags.messageQueueTailTracking() && mLast != null && mLast.when <= when) { + /* Message goes to tail of list */ + mLast.next = msg; + mLast = msg; + msg.next = null; + return token; + } + + Message prev = null; + Message p = mMessages; + if (when != 0) { + while (p != null && p.when <= when) { + prev = p; + p = p.next; + } + } + + if (p == null) { + /* We reached the tail of the list, or list is empty. */ + mLast = msg; + } + + if (prev != null) { // invariant: p == prev.next + msg.next = p; + prev.next = msg; + } else { + msg.next = p; + mMessages = msg; + } + return token; + } + } + + private class MatchBarrierToken extends MessageCompare { + int mBarrierToken; + + MatchBarrierToken(int token) { + super(); + mBarrierToken = token; + } + + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == null && m.arg1 == mBarrierToken) { + return true; + } + return false; + } + } + + /** + * Removes a synchronization barrier. + * + * @param token The synchronization barrier token that was returned by + * {@link #postSyncBarrier}. + * + * @throws IllegalStateException if the barrier was not found. + * + * @hide + */ + @UnsupportedAppUsage + @TestApi + public void removeSyncBarrier(int token) { + // Remove a sync barrier token from the queue. + // If the queue is no longer stalled by a barrier then wake it. + if (sForceConcurrent) { + boolean removed; + MessageNode first; + final MatchBarrierToken matchBarrierToken = new MatchBarrierToken(token); + + try { + /* Retain the first element to see if we are currently stuck on a barrier. */ + first = mPriorityQueue.first(); + } catch (NoSuchElementException e) { + /* The queue is empty */ + first = null; + } + + removed = findOrRemoveMessages(null, 0, null, null, 0, matchBarrierToken, true); + if (removed && first != null) { + Message m = first.mMessage; + if (m.target == null && m.arg1 == token) { + /* Wake up next() in case it was sleeping on this barrier. */ + nativeWake(mPtr); + } + } else if (!removed) { + throw new IllegalStateException("The specified message queue synchronization " + + " barrier token has not been posted or has already been removed."); + } + return; + } + + synchronized (this) { + Message prev = null; + Message p = mMessages; + while (p != null && (p.target != null || p.arg1 != token)) { + prev = p; + p = p.next; + } + if (p == null) { + throw new IllegalStateException("The specified message queue synchronization " + + " barrier token has not been posted or has already been removed."); + } + final boolean needWake; + if (prev != null) { + prev.next = p.next; + if (prev.next == null) { + mLast = prev; + } + needWake = false; + } else { + mMessages = p.next; + if (mMessages == null) { + mLast = null; + } + needWake = mMessages == null || mMessages.target != null; + } + p.recycleUnchecked(); + + // If the loop is quitting then it is already awake. + // We can assume mPtr != 0 when mLegacyQuitting is false. + if (needWake && !mLegacyQuitting) { + nativeWake(mPtr); + } + } + } + + boolean enqueueMessage(Message msg, long when) { + if (msg.target == null) { + throw new IllegalArgumentException("Message must have a target."); + } + + if (sForceConcurrent) { + if (msg.isInUse()) { + throw new IllegalStateException(msg + " This message is already in use."); + } + + return enqueueMessageUnchecked(msg, when); + } + + synchronized (this) { + if (msg.isInUse()) { + throw new IllegalStateException(msg + " This message is already in use."); + } + + if (mLegacyQuitting) { + IllegalStateException e = new IllegalStateException( + msg.target + " sending message to a Handler on a dead thread"); + Log.w(TAG_L, e.getMessage(), e); + msg.recycle(); + return false; + } + + msg.markInUse(); + msg.when = when; + Message p = mMessages; + boolean needWake; + if (p == null || when == 0 || when < p.when) { + // New head, wake up the event queue if blocked. + msg.next = p; + mMessages = msg; + needWake = mLegacyBlocked; + if (p == null) { + mLast = mMessages; + } + } else { + // Message is to be inserted at tail or middle of queue. Usually we don't have to + // wake up the event queue unless there is a barrier at the head of the queue and + // the message is the earliest asynchronous message in the queue. + needWake = mLegacyBlocked && p.target == null && msg.isAsynchronous(); + + // For readability, we split this portion of the function into two blocks based on + // whether tail tracking is enabled. This has a minor implication for the case + // where tail tracking is disabled. See the comment below. + if (Flags.messageQueueTailTracking()) { + if (when >= mLast.when) { + needWake = needWake && mLegacyAsyncMessageCount == 0; + msg.next = null; + mLast.next = msg; + mLast = msg; + } else { + // Inserted within the middle of the queue. + Message prev; + for (;;) { + prev = p; + p = p.next; + if (p == null || when < p.when) { + break; + } + if (needWake && p.isAsynchronous()) { + needWake = false; + } + } + if (p == null) { + /* Inserting at tail of queue */ + mLast = msg; + } + msg.next = p; // invariant: p == prev.next + prev.next = msg; + } + } else { + Message prev; + for (;;) { + prev = p; + p = p.next; + if (p == null || when < p.when) { + break; + } + if (needWake && p.isAsynchronous()) { + needWake = false; + } + } + msg.next = p; // invariant: p == prev.next + prev.next = msg; + + /* + * If this block is executing then we have a build without tail tracking - + * specifically: Flags.messageQueueTailTracking() == false. This is determined + * at build time so the flag won't change on us during runtime. + * + * Since we don't want to pepper the code with extra checks, we only check + * for tail tracking when we might use mLast. Otherwise, we continue to update + * mLast as the tail of the list. + * + * In this case however we are not maintaining mLast correctly. Since we never + * use it, this is fine. However, we run the risk of leaking a reference. + * So set mLast to null in this case to avoid any Message leaks. The other + * sites will never use the value so we are safe against null pointer derefs. + */ + mLast = null; + } + } + + if (msg.isAsynchronous()) { + mLegacyAsyncMessageCount++; + } + + // We can assume mPtr != 0 because mLegacyQuitting is false. + if (needWake) { + nativeWake(mPtr); + } + } + return true; + } + + private static class MatchHandlerWhatAndObject extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h && m.what == what && (object == null || m.obj == object)) { + return true; + } + return false; + } + } + private final MatchHandlerWhatAndObject mMatchHandlerWhatAndObject = + new MatchHandlerWhatAndObject(); + boolean hasMessages(Handler h, int what, Object object) { + if (h == null) { + return false; + } + if (sForceConcurrent) { + return findOrRemoveMessages(h, what, object, null, 0, mMatchHandlerWhatAndObject, + false); + } + synchronized (this) { + Message p = mMessages; + while (p != null) { + if (p.target == h && p.what == what && (object == null || p.obj == object)) { + return true; + } + p = p.next; + } + return false; + } + } + + private static class MatchHandlerWhatAndObjectEquals extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h && m.what == what && (object == null || object.equals(m.obj))) { + return true; + } + return false; + } + } + private final MatchHandlerWhatAndObjectEquals mMatchHandlerWhatAndObjectEquals = + new MatchHandlerWhatAndObjectEquals(); + boolean hasEqualMessages(Handler h, int what, Object object) { + if (h == null) { + return false; + } + if (sForceConcurrent) { + return findOrRemoveMessages(h, what, object, null, 0, mMatchHandlerWhatAndObjectEquals, + false); + + } + synchronized (this) { + Message p = mMessages; + while (p != null) { + if (p.target == h && p.what == what && (object == null || object.equals(p.obj))) { + return true; + } + p = p.next; + } + return false; + } + } + + private static class MatchHandlerRunnableAndObject extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h && m.callback == r && (object == null || m.obj == object)) { + return true; + } + return false; + } + } + private final MatchHandlerRunnableAndObject mMatchHandlerRunnableAndObject = + new MatchHandlerRunnableAndObject(); + @UnsupportedAppUsage(maxTargetSdk = Build.VERSION_CODES.R, trackingBug = 170729553) + boolean hasMessages(Handler h, Runnable r, Object object) { + if (h == null) { + return false; + } + if (sForceConcurrent) { + return findOrRemoveMessages(h, -1, object, r, 0, mMatchHandlerRunnableAndObject, + false); + } + + synchronized (this) { + Message p = mMessages; + while (p != null) { + if (p.target == h && p.callback == r && (object == null || p.obj == object)) { + return true; + } + p = p.next; + } + return false; + } + } + + private static class MatchHandler extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h) { + return true; + } + return false; + } + } + private final MatchHandler mMatchHandler = new MatchHandler(); + boolean hasMessages(Handler h) { + if (h == null) { + return false; + } + if (sForceConcurrent) { + return findOrRemoveMessages(h, -1, null, null, 0, mMatchHandler, false); + } + synchronized (this) { + Message p = mMessages; + while (p != null) { + if (p.target == h) { + return true; + } + p = p.next; + } + return false; + } + } + + void removeMessages(Handler h, int what, Object object) { + if (h == null) { + return; + } + if (sForceConcurrent) { + findOrRemoveMessages(h, what, object, null, 0, mMatchHandlerWhatAndObject, true); + return; + } + synchronized (this) { + Message p = mMessages; + + // Remove all messages at front. + while (p != null && p.target == h && p.what == what + && (object == null || p.obj == object)) { + Message n = p.next; + mMessages = n; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + p = n; + } + + if (p == null) { + mLast = mMessages; + } + + // Remove all messages after front. + while (p != null) { + Message n = p.next; + if (n != null) { + if (n.target == h && n.what == what + && (object == null || n.obj == object)) { + Message nn = n.next; + if (n.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + n.recycleUnchecked(); + p.next = nn; + if (p.next == null) { + mLast = p; + } + continue; + } + } + p = n; + } + } + } + + void removeEqualMessages(Handler h, int what, Object object) { + if (h == null) { + return; + } + + if (sForceConcurrent) { + findOrRemoveMessages(h, what, object, null, 0, mMatchHandlerWhatAndObjectEquals, true); + return; + } + + synchronized (this) { + Message p = mMessages; + + // Remove all messages at front. + while (p != null && p.target == h && p.what == what + && (object == null || object.equals(p.obj))) { + Message n = p.next; + mMessages = n; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + p = n; + } + + if (p == null) { + mLast = mMessages; + } + + // Remove all messages after front. + while (p != null) { + Message n = p.next; + if (n != null) { + if (n.target == h && n.what == what + && (object == null || object.equals(n.obj))) { + Message nn = n.next; + if (n.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + n.recycleUnchecked(); + p.next = nn; + if (p.next == null) { + mLast = p; + } + continue; + } + } + p = n; + } + } + } + + void removeMessages(Handler h, Runnable r, Object object) { + if (h == null || r == null) { + return; + } + + if (sForceConcurrent) { + findOrRemoveMessages(h, -1, object, r, 0, mMatchHandlerRunnableAndObject, true); + return; + } + synchronized (this) { + Message p = mMessages; + + // Remove all messages at front. + while (p != null && p.target == h && p.callback == r + && (object == null || p.obj == object)) { + Message n = p.next; + mMessages = n; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + p = n; + } + + if (p == null) { + mLast = mMessages; + } + + // Remove all messages after front. + while (p != null) { + Message n = p.next; + if (n != null) { + if (n.target == h && n.callback == r + && (object == null || n.obj == object)) { + Message nn = n.next; + if (n.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + n.recycleUnchecked(); + p.next = nn; + if (p.next == null) { + mLast = p; + } + continue; + } + } + p = n; + } + } + } + + private static class MatchHandlerRunnableAndObjectEquals extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h && m.callback == r && (object == null || object.equals(m.obj))) { + return true; + } + return false; + } + } + private final MatchHandlerRunnableAndObjectEquals mMatchHandlerRunnableAndObjectEquals = + new MatchHandlerRunnableAndObjectEquals(); + void removeEqualMessages(Handler h, Runnable r, Object object) { + if (h == null || r == null) { + return; + } + + if (sForceConcurrent) { + findOrRemoveMessages(h, -1, object, r, 0, mMatchHandlerRunnableAndObjectEquals, true); + return; + } + synchronized (this) { + Message p = mMessages; + + // Remove all messages at front. + while (p != null && p.target == h && p.callback == r + && (object == null || object.equals(p.obj))) { + Message n = p.next; + mMessages = n; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + p = n; + } + + if (p == null) { + mLast = mMessages; + } + + // Remove all messages after front. + while (p != null) { + Message n = p.next; + if (n != null) { + if (n.target == h && n.callback == r + && (object == null || object.equals(n.obj))) { + Message nn = n.next; + if (n.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + n.recycleUnchecked(); + p.next = nn; + if (p.next == null) { + mLast = p; + } + continue; + } + } + p = n; + } + } + } + + private static class MatchHandlerAndObject extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h && (object == null || m.obj == object)) { + return true; + } + return false; + } + } + private final MatchHandlerAndObject mMatchHandlerAndObject = new MatchHandlerAndObject(); + void removeCallbacksAndMessages(Handler h, Object object) { + if (h == null) { + return; + } + + if (sForceConcurrent) { + findOrRemoveMessages(h, -1, object, null, 0, mMatchHandlerAndObject, true); + return; + } + synchronized (this) { + Message p = mMessages; + + // Remove all messages at front. + while (p != null && p.target == h + && (object == null || p.obj == object)) { + Message n = p.next; + mMessages = n; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + p = n; + } + + if (p == null) { + mLast = mMessages; + } + + // Remove all messages after front. + while (p != null) { + Message n = p.next; + if (n != null) { + if (n.target == h && (object == null || n.obj == object)) { + Message nn = n.next; + if (n.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + n.recycleUnchecked(); + p.next = nn; + if (p.next == null) { + mLast = p; + } + continue; + } + } + p = n; + } + } + } + + private static class MatchHandlerAndObjectEquals extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.target == h && (object == null || object.equals(m.obj))) { + return true; + } + return false; + } + } + private final MatchHandlerAndObjectEquals mMatchHandlerAndObjectEquals = + new MatchHandlerAndObjectEquals(); + void removeCallbacksAndEqualMessages(Handler h, Object object) { + if (h == null) { + return; + } + + if (sForceConcurrent) { + findOrRemoveMessages(h, -1, object, null, 0, mMatchHandlerAndObjectEquals, true); + return; + } + synchronized (this) { + Message p = mMessages; + + // Remove all messages at front. + while (p != null && p.target == h + && (object == null || object.equals(p.obj))) { + Message n = p.next; + mMessages = n; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + p = n; + } + + if (p == null) { + mLast = mMessages; + } + + // Remove all messages after front. + while (p != null) { + Message n = p.next; + if (n != null) { + if (n.target == h && (object == null || object.equals(n.obj))) { + Message nn = n.next; + if (n.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + n.recycleUnchecked(); + p.next = nn; + if (p.next == null) { + mLast = p; + } + continue; + } + } + p = n; + } + } + } + + private void removeAllMessagesLocked() { + Message p = mMessages; + while (p != null) { + Message n = p.next; + p.recycleUnchecked(); + p = n; + } + mMessages = null; + mLast = null; + mLegacyAsyncMessageCount = 0; + } + + private void removeAllFutureMessagesLocked() { + final long now = SystemClock.uptimeMillis(); + Message p = mMessages; + if (p != null) { + if (p.when > now) { + removeAllMessagesLocked(); + } else { + Message n; + for (;;) { + n = p.next; + if (n == null) { + return; + } + if (n.when > now) { + break; + } + p = n; + } + p.next = null; + mLast = p; + + do { + p = n; + n = p.next; + if (p.isAsynchronous()) { + mLegacyAsyncMessageCount--; + } + p.recycleUnchecked(); + } while (n != null); + } + } + } + + private static class MatchAllMessages extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + return true; + } + } + private final MatchAllMessages mMatchAllMessages = new MatchAllMessages(); + private void removeAllMessages() { + findOrRemoveMessages(null, -1, null, null, 0, mMatchAllMessages, true); + } + + private static class MatchAllFutureMessages extends MessageCompare { + @Override + public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r, + long when) { + if (m.when > when) { + return true; + } + return false; + } + } + private final MatchAllFutureMessages mMatchAllFutureMessages = new MatchAllFutureMessages(); + private void removeAllFutureMessages() { + findOrRemoveMessages(null, -1, null, null, SystemClock.uptimeMillis(), + mMatchAllFutureMessages, true); + } + + @NeverCompile + private void printPriorityQueueNodes() { + Iterator<MessageNode> iterator = mPriorityQueue.iterator(); + + Log.d(TAG_C, "* Dump priority queue"); + while (iterator.hasNext()) { + MessageNode msgNode = iterator.next(); + Log.d(TAG_C, "** MessageNode what: " + msgNode.mMessage.what + " when " + + msgNode.mMessage.when + " seq: " + msgNode.mInsertSeq); + } + } + + @NeverCompile + private int dumpPriorityQueue(ConcurrentSkipListSet<MessageNode> queue, Printer pw, + String prefix, Handler h, int n) { + int count = 0; + long now = SystemClock.uptimeMillis(); + + for (MessageNode msgNode : queue) { + Message msg = msgNode.mMessage; + if (h == null || h == msg.target) { + pw.println(prefix + "Message " + (n + count) + ": " + msg.toString(now)); + } + count++; + } + return count; + } + + @NeverCompile + void dump(Printer pw, String prefix, Handler h) { + if (sForceConcurrent) { + long now = SystemClock.uptimeMillis(); + int n = 0; + + pw.println(prefix + "(MessageQueue is using Concurrent implementation)"); + + StackNode node = (StackNode) sState.getVolatile(this); + while (node != null) { + if (node.isMessageNode()) { + Message msg = ((MessageNode) node).mMessage; + if (h == null || h == msg.target) { + pw.println(prefix + "Message " + n + ": " + msg.toString(now)); + } + node = ((MessageNode) node).mNext; + } else { + pw.println(prefix + "State: " + node); + node = null; + } + n++; + } + + pw.println(prefix + "PriorityQueue Messages: "); + n += dumpPriorityQueue(mPriorityQueue, pw, prefix, h, n); + pw.println(prefix + "AsyncPriorityQueue Messages: "); + n += dumpPriorityQueue(mAsyncPriorityQueue, pw, prefix, h, n); + + pw.println(prefix + "(Total messages: " + n + ", polling=" + isPolling() + + ", quitting=" + (boolean) sQuitting.getVolatile(this) + ")"); + return; + } + + synchronized (this) { + pw.println(prefix + "(MessageQueue is using Legacy implementation)"); + long now = SystemClock.uptimeMillis(); + int n = 0; + for (Message msg = mMessages; msg != null; msg = msg.next) { + if (h == null || h == msg.target) { + pw.println(prefix + "Message " + n + ": " + msg.toString(now)); + } + n++; + } + pw.println(prefix + "(Total messages: " + n + ", polling=" + isPollingLocked() + + ", quitting=" + mLegacyQuitting + ")"); + } + } + + @NeverCompile + private int dumpPriorityQueue(ConcurrentSkipListSet<MessageNode> queue, + ProtoOutputStream proto) { + int count = 0; + + for (MessageNode msgNode : queue) { + Message msg = msgNode.mMessage; + msg.dumpDebug(proto, MessageQueueProto.MESSAGES); + count++; + } + return count; + } + + @NeverCompile + void dumpDebug(ProtoOutputStream proto, long fieldId) { + if (sForceConcurrent) { + final long messageQueueToken = proto.start(fieldId); + + StackNode node = (StackNode) sState.getVolatile(this); + while (node.isMessageNode()) { + Message msg = ((MessageNode) node).mMessage; + msg.dumpDebug(proto, MessageQueueProto.MESSAGES); + node = ((MessageNode) node).mNext; + } + + dumpPriorityQueue(mPriorityQueue, proto); + dumpPriorityQueue(mAsyncPriorityQueue, proto); + + proto.write(MessageQueueProto.IS_POLLING_LOCKED, isPolling()); + proto.write(MessageQueueProto.IS_QUITTING, (boolean) sQuitting.getVolatile(this)); + proto.end(messageQueueToken); + return; + } + + final long messageQueueToken = proto.start(fieldId); + synchronized (this) { + for (Message msg = mMessages; msg != null; msg = msg.next) { + msg.dumpDebug(proto, MessageQueueProto.MESSAGES); + } + proto.write(MessageQueueProto.IS_POLLING_LOCKED, isPollingLocked()); + proto.write(MessageQueueProto.IS_QUITTING, mLegacyQuitting); + } + proto.end(messageQueueToken); + } + + /** + * Callback interface for discovering when a thread is going to block + * waiting for more messages. + */ + public static interface IdleHandler { + /** + * Called when the message queue has run out of messages and will now + * wait for more. Return true to keep your idle handler active, false + * to have it removed. This may be called if there are still messages + * pending in the queue, but they are all scheduled to be dispatched + * after the current time. + */ + boolean queueIdle(); + } + + /** + * A listener which is invoked when file descriptor related events occur. + */ + public interface OnFileDescriptorEventListener { + /** + * File descriptor event: Indicates that the file descriptor is ready for input + * operations, such as reading. + * <p> + * The listener should read all available data from the file descriptor + * then return <code>true</code> to keep the listener active or <code>false</code> + * to remove the listener. + * </p><p> + * In the case of a socket, this event may be generated to indicate + * that there is at least one incoming connection that the listener + * should accept. + * </p><p> + * This event will only be generated if the {@link #EVENT_INPUT} event mask was + * specified when the listener was added. + * </p> + */ + public static final int EVENT_INPUT = 1 << 0; + + /** + * File descriptor event: Indicates that the file descriptor is ready for output + * operations, such as writing. + * <p> + * The listener should write as much data as it needs. If it could not + * write everything at once, then it should return <code>true</code> to + * keep the listener active. Otherwise, it should return <code>false</code> + * to remove the listener then re-register it later when it needs to write + * something else. + * </p><p> + * This event will only be generated if the {@link #EVENT_OUTPUT} event mask was + * specified when the listener was added. + * </p> + */ + public static final int EVENT_OUTPUT = 1 << 1; + + /** + * File descriptor event: Indicates that the file descriptor encountered a + * fatal error. + * <p> + * File descriptor errors can occur for various reasons. One common error + * is when the remote peer of a socket or pipe closes its end of the connection. + * </p><p> + * This event may be generated at any time regardless of whether the + * {@link #EVENT_ERROR} event mask was specified when the listener was added. + * </p> + */ + public static final int EVENT_ERROR = 1 << 2; + + /** @hide */ + @Retention(RetentionPolicy.SOURCE) + @IntDef(flag = true, prefix = { "EVENT_" }, value = { + EVENT_INPUT, + EVENT_OUTPUT, + EVENT_ERROR + }) + public @interface Events {} + + /** + * Called when a file descriptor receives events. + * + * @param fd The file descriptor. + * @param events The set of events that occurred: a combination of the + * {@link #EVENT_INPUT}, {@link #EVENT_OUTPUT}, and {@link #EVENT_ERROR} event masks. + * @return The new set of events to watch, or 0 to unregister the listener. + * + * @see #EVENT_INPUT + * @see #EVENT_OUTPUT + * @see #EVENT_ERROR + */ + @Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events); + } + + private static final class FileDescriptorRecord { + public final FileDescriptor mDescriptor; + public int mEvents; + public OnFileDescriptorEventListener mListener; + public int mSeq; + + public FileDescriptorRecord(FileDescriptor descriptor, + int events, OnFileDescriptorEventListener listener) { + mDescriptor = descriptor; + mEvents = events; + mListener = listener; + } + } + + /** + * ConcurrentMessageQueue specific classes methods and variables + */ + /* Helper to choose the correct queue to insert into. */ + private void insertIntoPriorityQueue(MessageNode msgNode) { + if (msgNode.isAsync()) { + mAsyncPriorityQueue.add(msgNode); + } else { + mPriorityQueue.add(msgNode); + } + } + + private boolean removeFromPriorityQueue(MessageNode msgNode) { + if (msgNode.isAsync()) { + return mAsyncPriorityQueue.remove(msgNode); + } else { + return mPriorityQueue.remove(msgNode); + } + } + + private MessageNode pickEarliestNode(MessageNode nodeA, MessageNode nodeB) { + if (nodeA != null && nodeB != null) { + if (nodeA.compareTo(nodeB) < 0) { + return nodeA; + } + return nodeB; + } + + return nodeA != null ? nodeA : nodeB; + } + + private MessageNode iterateNext(Iterator<MessageNode> iter) { + if (iter.hasNext()) { + try { + return iter.next(); + } catch (NoSuchElementException e) { + /* The queue is empty - this can happen if we race with remove */ + } + } + return null; + } + + /* Move any non-cancelled messages into the priority queue */ + private void drainStack(StackNode oldTop) { + while (oldTop.isMessageNode()) { + MessageNode oldTopMessageNode = (MessageNode) oldTop; + if (oldTopMessageNode.removeFromStack()) { + insertIntoPriorityQueue(oldTopMessageNode); + } + MessageNode inserted = oldTopMessageNode; + oldTop = oldTopMessageNode.mNext; + /* + * removeMessages can walk this list while we are consuming it. + * Set our next pointer to null *after* we add the message to our + * priority queue. This way removeMessages() will always find the + * message, either in our list or in the priority queue. + */ + inserted.mNext = null; + } + } + + /* Set the stack state to Active, return a list of nodes to walk. */ + private StackNode swapAndSetStackStateActive() { + while (true) { + /* Set stack state to Active, get node list to walk later */ + StackNode current = (StackNode) sState.getVolatile(this); + if (current == sStackStateActive + || sState.compareAndSet(this, current, sStackStateActive)) { + return current; + } + } + } + private StateNode getStateNode(StackNode node) { + if (node.isMessageNode()) { + return ((MessageNode) node).mBottomOfStack; + } + return (StateNode) node; + } + + private void waitForDrainCompleted() { + mDrainingLock.lock(); + while (mNextIsDrainingStack) { + mDrainCompleted.awaitUninterruptibly(); + } + mDrainingLock.unlock(); + } + + @IntDef(value = { + STACK_NODE_MESSAGE, + STACK_NODE_ACTIVE, + STACK_NODE_PARKED, + STACK_NODE_TIMEDPARK}) + @Retention(RetentionPolicy.SOURCE) + private @interface StackNodeType {} + + /* + * Stack node types. STACK_NODE_MESSAGE indicates a node containing a message. + * The other types indicate what state our Looper thread is in. The bottom of + * the stack is always a single state node. Message nodes are added on top. + */ + private static final int STACK_NODE_MESSAGE = 0; + /* + * Active state indicates that next() is processing messages + */ + private static final int STACK_NODE_ACTIVE = 1; + /* + * Parked state indicates that the Looper thread is sleeping indefinitely (nothing to deliver) + */ + private static final int STACK_NODE_PARKED = 2; + /* + * Timed Park state indicates that the Looper thread is sleeping, waiting for a message + * deadline + */ + private static final int STACK_NODE_TIMEDPARK = 3; + + /* Describes a node in the Treiber stack */ + static class StackNode { + @StackNodeType + private final int mType; + + StackNode(@StackNodeType int type) { + mType = type; + } + + @StackNodeType + final int getNodeType() { + return mType; + } + + final boolean isMessageNode() { + return mType == STACK_NODE_MESSAGE; + } + } + + static final class MessageNode extends StackNode implements Comparable<MessageNode> { + private final Message mMessage; + volatile StackNode mNext; + StateNode mBottomOfStack; + boolean mWokeUp; + final long mInsertSeq; + private static final VarHandle sRemovedFromStack; + private volatile boolean mRemovedFromStackValue; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + sRemovedFromStack = l.findVarHandle(MessageQueue.MessageNode.class, + "mRemovedFromStackValue", boolean.class); + } catch (Exception e) { + Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e); + throw new ExceptionInInitializerError(e); + } + } + + MessageNode(@NonNull Message message, long insertSeq) { + super(STACK_NODE_MESSAGE); + mMessage = message; + mInsertSeq = insertSeq; + } + + long getWhen() { + return mMessage.when; + } + + boolean isRemovedFromStack() { + return mRemovedFromStackValue; + } + + boolean removeFromStack() { + return sRemovedFromStack.compareAndSet(this, false, true); + } + + boolean isAsync() { + return mMessage.isAsynchronous(); + } + + boolean isBarrier() { + return mMessage.target == null; + } + + @Override + public int compareTo(@NonNull MessageNode messageNode) { + Message other = messageNode.mMessage; + + int compared = Long.compare(mMessage.when, other.when); + if (compared == 0) { + compared = Long.compare(mInsertSeq, messageNode.mInsertSeq); + } + return compared; + } + } + + static class StateNode extends StackNode { + StateNode(int type) { + super(type); + } + } + + static final class TimedParkStateNode extends StateNode { + long mWhenToWake; + + TimedParkStateNode() { + super(STACK_NODE_TIMEDPARK); + } + } + + private static final StateNode sStackStateActive = new StateNode(STACK_NODE_ACTIVE); + private static final StateNode sStackStateParked = new StateNode(STACK_NODE_PARKED); + private final TimedParkStateNode mStackStateTimedPark = new TimedParkStateNode(); + + /* This is the top of our treiber stack. */ + private static final VarHandle sState; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + sState = l.findVarHandle(MessageQueue.class, "mStateValue", + MessageQueue.StackNode.class); + } catch (Exception e) { + Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e); + throw new ExceptionInInitializerError(e); + } + } + + private volatile StackNode mStateValue = sStackStateParked; + private final ConcurrentSkipListSet<MessageNode> mPriorityQueue = + new ConcurrentSkipListSet<MessageNode>(); + private final ConcurrentSkipListSet<MessageNode> mAsyncPriorityQueue = + new ConcurrentSkipListSet<MessageNode>(); + + /* + * This helps us ensure that messages with the same timestamp are inserted in FIFO order. + * Increments on each insert, starting at 0. MessageNode.compareTo() will compare sequences + * when delivery timestamps are identical. + */ + private static final VarHandle sNextInsertSeq; + private volatile long mNextInsertSeqValue = 0; + /* + * The exception to the FIFO order rule is sendMessageAtFrontOfQueue(). + * Those messages must be in LIFO order. + * Decrements on each front of queue insert. + */ + private static final VarHandle sNextFrontInsertSeq; + private volatile long mNextFrontInsertSeqValue = -1; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + sNextInsertSeq = l.findVarHandle(MessageQueue.class, "mNextInsertSeqValue", + long.class); + sNextFrontInsertSeq = l.findVarHandle(MessageQueue.class, "mNextFrontInsertSeqValue", + long.class); + } catch (Exception e) { + Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e); + throw new ExceptionInInitializerError(e); + } + + } + + /* + * Tracks the number of queued and cancelled messages in our stack. + * + * On item cancellation, determine whether to wake next() to flush tombstoned messages. + * We track queued and cancelled counts as two ints packed into a single long. + */ + private static final class MessageCounts { + private static VarHandle sCounts; + private volatile long mCountsValue = 0; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + sCounts = l.findVarHandle(MessageQueue.MessageCounts.class, "mCountsValue", + long.class); + } catch (Exception e) { + Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e); + throw new ExceptionInInitializerError(e); + } + } + + /* We use a special value to indicate when next() has been woken for flush. */ + private static final long AWAKE = Long.MAX_VALUE; + /* + * Minimum number of messages in the stack which we need before we consider flushing + * tombstoned items. + */ + private static final int MESSAGE_FLUSH_THRESHOLD = 10; + + private static int numQueued(long val) { + return (int) (val >>> Integer.SIZE); + } + + private static int numCancelled(long val) { + return (int) val; + } + + private static long combineCounts(int queued, int cancelled) { + return ((long) queued << Integer.SIZE) | (long) cancelled; + } + + public void incrementQueued() { + while (true) { + long oldVal = mCountsValue; + int queued = numQueued(oldVal); + int cancelled = numCancelled(oldVal); + /* Use Math.max() to avoid overflow of queued count */ + long newVal = combineCounts(Math.max(queued + 1, queued), cancelled); + + /* Don't overwrite 'AWAKE' state */ + if (oldVal == AWAKE || sCounts.compareAndSet(this, oldVal, newVal)) { + break; + } + } + } + + public boolean incrementCancelled() { + while (true) { + long oldVal = mCountsValue; + if (oldVal == AWAKE) { + return false; + } + int queued = numQueued(oldVal); + int cancelled = numCancelled(oldVal); + boolean needsPurge = queued > MESSAGE_FLUSH_THRESHOLD + && (queued >> 1) < cancelled; + long newVal; + if (needsPurge) { + newVal = AWAKE; + } else { + newVal = combineCounts(queued, + Math.max(cancelled + 1, cancelled)); + } + + if (sCounts.compareAndSet(this, oldVal, newVal)) { + return needsPurge; + } + } + } + + public void clearCounts() { + mCountsValue = 0; + } + } + + private final MessageCounts mMessageCounts = new MessageCounts(); + + private final Object mIdleHandlersLock = new Object(); + private final Object mFileDescriptorRecordsLock = new Object(); + + private static final VarHandle sQuitting; + private boolean mQuittingValue = false; + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + sQuitting = l.findVarHandle(MessageQueue.class, "mQuittingValue", boolean.class); + } catch (Exception e) { + Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e); + throw new ExceptionInInitializerError(e); + } + } + + // The next barrier token. + // Barriers are indicated by messages with a null target whose arg1 field carries the token. + private final AtomicInteger mNextBarrierToken = new AtomicInteger(1); + + /* Protects mNextIsDrainingStack */ + private final ReentrantLock mDrainingLock = new ReentrantLock(); + private boolean mNextIsDrainingStack = false; + private final Condition mDrainCompleted = mDrainingLock.newCondition(); + + private boolean enqueueMessageUnchecked(@NonNull Message msg, long when) { + if ((boolean) sQuitting.getVolatile(this)) { + IllegalStateException e = new IllegalStateException( + msg.target + " sending message to a Handler on a dead thread"); + Log.w(TAG_C, e.getMessage(), e); + msg.recycleUnchecked(); + return false; + } + + long seq = when != 0 ? ((long) sNextInsertSeq.getAndAdd(this, 1L) + 1L) + : ((long) sNextFrontInsertSeq.getAndAdd(this, -1L) - 1L); + /* TODO: Add a MessageNode member to Message so we can avoid this allocation */ + MessageNode node = new MessageNode(msg, seq); + msg.when = when; + msg.markInUse(); + + if (DEBUG) { + Log.d(TAG_C, "Insert message what: " + msg.what + " when: " + msg.when + " seq: " + + node.mInsertSeq + " barrier: " + node.isBarrier() + " async: " + + node.isAsync() + " now: " + SystemClock.uptimeMillis()); + } + + final Looper myLooper = Looper.myLooper(); + /* If we are running on the looper thread we can add directly to the priority queue */ + if (myLooper != null && myLooper.getQueue() == this) { + node.removeFromStack(); + insertIntoPriorityQueue(node); + /* + * We still need to do this even though we are the current thread, + * otherwise next() may sleep indefinitely. + */ + if (!mMessageDirectlyQueued) { + mMessageDirectlyQueued = true; + nativeWake(mPtr); + } + return true; + } + + while (true) { + StackNode old = (StackNode) sState.getVolatile(this); + boolean wakeNeeded; + boolean inactive; + + node.mNext = old; + switch (old.getNodeType()) { + case STACK_NODE_ACTIVE: + /* + * The worker thread is currently active and will process any elements added to + * the stack before parking again. + */ + node.mBottomOfStack = (StateNode) old; + inactive = false; + node.mWokeUp = true; + wakeNeeded = false; + break; + + case STACK_NODE_PARKED: + node.mBottomOfStack = (StateNode) old; + inactive = true; + node.mWokeUp = true; + wakeNeeded = true; + break; + + case STACK_NODE_TIMEDPARK: + node.mBottomOfStack = (StateNode) old; + inactive = true; + wakeNeeded = mStackStateTimedPark.mWhenToWake >= node.getWhen(); + node.mWokeUp = wakeNeeded; + break; + + default: + MessageNode oldMessage = (MessageNode) old; + + node.mBottomOfStack = oldMessage.mBottomOfStack; + int bottomType = node.mBottomOfStack.getNodeType(); + inactive = bottomType >= STACK_NODE_PARKED; + wakeNeeded = (bottomType == STACK_NODE_TIMEDPARK + && mStackStateTimedPark.mWhenToWake >= node.getWhen() + && !oldMessage.mWokeUp); + node.mWokeUp = oldMessage.mWokeUp || wakeNeeded; + break; + } + if (sState.compareAndSet(this, old, node)) { + if (inactive) { + if (wakeNeeded) { + nativeWake(mPtr); + } else { + mMessageCounts.incrementQueued(); + } + } + return true; + } + } + } + + /* + * This class is used to find matches for hasMessages() and removeMessages() + */ + private abstract static class MessageCompare { + public abstract boolean compareMessage(Message m, Handler h, int what, Object object, + Runnable r, long when); + } + + private boolean stackHasMessages(Handler h, int what, Object object, Runnable r, long when, + MessageCompare compare, boolean removeMatches) { + boolean found = false; + StackNode top = (StackNode) sState.getVolatile(this); + StateNode bottom = getStateNode(top); + + /* + * If the top node is a state node, there are no reachable messages. + * If it's anything other than Active, we can quit as we know that next() is not + * consuming items. + * If the top node is Active then we know that next() is currently consuming items. + * In that case we should wait next() has drained the stack. + */ + if (top == bottom) { + if (bottom != sStackStateActive) { + return false; + } + waitForDrainCompleted(); + return false; + } + + /* + * We have messages that we may tombstone. Walk the stack until we hit the bottom or we + * hit a null pointer. + * If we hit the bottom, we are done. + * If we hit a null pointer, then the stack is being consumed by next() and we must cycle + * until the stack has been drained. + */ + MessageNode p = (MessageNode) top; + + while (true) { + if (compare.compareMessage(p.mMessage, h, what, object, r, when)) { + found = true; + if (DEBUG) { + Log.d(TAG_C, "stackHasMessages node matches"); + } + if (removeMatches) { + if (p.removeFromStack()) { + p.mMessage.recycleUnchecked(); + if (mMessageCounts.incrementCancelled()) { + nativeWake(mPtr); + } + } + } else { + return true; + } + } + + StackNode n = p.mNext; + if (n == null) { + /* Next() is walking the stack, we must re-sample */ + if (DEBUG) { + Log.d(TAG_C, "stackHasMessages next() is walking the stack, we must re-sample"); + } + waitForDrainCompleted(); + break; + } + if (!n.isMessageNode()) { + /* We reached the end of the stack */ + return found; + } + p = (MessageNode) n; + } + + return found; + } + + private boolean priorityQueueHasMessage(ConcurrentSkipListSet<MessageNode> queue, Handler h, + int what, Object object, Runnable r, long when, MessageCompare compare, + boolean removeMatches) { + Iterator<MessageNode> iterator = queue.iterator(); + boolean found = false; + + while (iterator.hasNext()) { + MessageNode msg = iterator.next(); + + if (compare.compareMessage(msg.mMessage, h, what, object, r, when)) { + if (removeMatches) { + found = true; + if (queue.remove(msg)) { + msg.mMessage.recycleUnchecked(); + } + } else { + return true; + } + } + } + return found; + } + + private boolean findOrRemoveMessages(Handler h, int what, Object object, Runnable r, long when, + MessageCompare compare, boolean removeMatches) { + boolean foundInStack, foundInQueue; + + foundInStack = stackHasMessages(h, what, object, r, when, compare, removeMatches); + foundInQueue = priorityQueueHasMessage(mPriorityQueue, h, what, object, r, when, compare, + removeMatches); + foundInQueue |= priorityQueueHasMessage(mAsyncPriorityQueue, h, what, object, r, when, + compare, removeMatches); + + return foundInStack || foundInQueue; + } + +} |