logd: rework logic for LogTimeEntry

LogTimeEntry's lifecycle is spread out in various locations.  It
further seems incomplete as there is logic that assumes that its
associated thread can exit while the underlying LogTimeEntry remains
valid, however it doesn't appear that that is actually a supported
situation.

This change simplifies this logic to have only one valid state for a
LogTimeEntry: it must have its thread running and be present in
LastLogTimes.  A LogTimeEntry will never be placed into LastLogTimes
unless its thread is running and its thread will remove its associated
LogTimeEntry from LastLogTimes before it has exited.

This admittedly breaks situations where a blocking socket gets issued
multiple commands with different pid filters, tail lines, etc,
however, I'm reasonably sure that these situations were already
broken.  A check is added to close the socket in this case.

Test: multiple logcat instances work, logd.reader.per's are cleaned up
Change-Id: Ibe8651e7d530c5e9a8d6ce3150cd247982887cbe
diff --git a/logd/FlushCommand.cpp b/logd/FlushCommand.cpp
index 658e079..bd17555 100644
--- a/logd/FlushCommand.cpp
+++ b/logd/FlushCommand.cpp
@@ -42,7 +42,7 @@
     LogTimeEntry::wrlock();
     LastLogTimes::iterator it = times.begin();
     while (it != times.end()) {
-        entry = (*it);
+        entry = it->get();
         if (entry->mClient == client) {
             if (!entry->isWatchingMultiple(mLogMask)) {
                 LogTimeEntry::unlock();
@@ -63,31 +63,12 @@
                 }
             }
             entry->triggerReader_Locked();
-            if (entry->runningReader_Locked()) {
-                LogTimeEntry::unlock();
-                return;
-            }
-            entry->incRef_Locked();
-            break;
+            LogTimeEntry::unlock();
+            return;
         }
         it++;
     }
 
-    if (it == times.end()) {
-        // Create LogTimeEntry in notifyNewLog() ?
-        if (mTail == (unsigned long)-1) {
-            LogTimeEntry::unlock();
-            return;
-        }
-        entry = new LogTimeEntry(mReader, client, mNonBlock, mTail, mLogMask,
-                                 mPid, mStart, mTimeout);
-        times.push_front(entry);
-    }
-
-    client->incRef();
-
-    // release client and entry reference counts once done
-    entry->startReader_Locked();
     LogTimeEntry::unlock();
 }
 
diff --git a/logd/FlushCommand.h b/logd/FlushCommand.h
index 543dfc3..ceaf393 100644
--- a/logd/FlushCommand.h
+++ b/logd/FlushCommand.h
@@ -27,36 +27,11 @@
 
 class FlushCommand : public SocketClientCommand {
     LogReader& mReader;
-    bool mNonBlock;
-    unsigned long mTail;
     log_mask_t mLogMask;
-    pid_t mPid;
-    log_time mStart;
-    uint64_t mTimeout;
 
    public:
-    // for opening a reader
-    explicit FlushCommand(LogReader& reader, bool nonBlock, unsigned long tail,
-                          log_mask_t logMask, pid_t pid, log_time start,
-                          uint64_t timeout)
-        : mReader(reader),
-          mNonBlock(nonBlock),
-          mTail(tail),
-          mLogMask(logMask),
-          mPid(pid),
-          mStart(start),
-          mTimeout((start != log_time::EPOCH) ? timeout : 0) {
-    }
-
-    // for notification of an update
     explicit FlushCommand(LogReader& reader, log_mask_t logMask)
-        : mReader(reader),
-          mNonBlock(false),
-          mTail(-1),
-          mLogMask(logMask),
-          mPid(0),
-          mStart(log_time::EPOCH),
-          mTimeout(0) {
+        : mReader(reader), mLogMask(logMask) {
     }
 
     virtual void runSocketCommand(SocketClient* client);
diff --git a/logd/LogBuffer.cpp b/logd/LogBuffer.cpp
index fd1b8b2..fbdbf79 100644
--- a/logd/LogBuffer.cpp
+++ b/logd/LogBuffer.cpp
@@ -105,10 +105,8 @@
 
     LastLogTimes::iterator times = mTimes.begin();
     while (times != mTimes.end()) {
-        LogTimeEntry* entry = (*times);
-        if (entry->owned_Locked()) {
-            entry->triggerReader_Locked();
-        }
+        LogTimeEntry* entry = times->get();
+        entry->triggerReader_Locked();
         times++;
     }
 
@@ -409,17 +407,15 @@
 
         LastLogTimes::iterator times = mTimes.begin();
         while (times != mTimes.end()) {
-            LogTimeEntry* entry = (*times);
-            if (entry->owned_Locked()) {
-                if (!entry->mNonBlock) {
-                    end_always = true;
-                    break;
-                }
-                // it passing mEnd is blocked by the following checks.
-                if (!end_set || (end <= entry->mEnd)) {
-                    end = entry->mEnd;
-                    end_set = true;
-                }
+            LogTimeEntry* entry = times->get();
+            if (!entry->mNonBlock) {
+                end_always = true;
+                break;
+            }
+            // it passing mEnd is blocked by the following checks.
+            if (!end_set || (end <= entry->mEnd)) {
+                end = entry->mEnd;
+                end_set = true;
             }
             times++;
         }
@@ -710,8 +706,8 @@
     // Region locked?
     LastLogTimes::iterator times = mTimes.begin();
     while (times != mTimes.end()) {
-        LogTimeEntry* entry = (*times);
-        if (entry->owned_Locked() && entry->isWatching(id) &&
+        LogTimeEntry* entry = times->get();
+        if (entry->isWatching(id) &&
             (!oldest || (oldest->mStart > entry->mStart) ||
              ((oldest->mStart == entry->mStart) &&
               (entry->mTimeout.tv_sec || entry->mTimeout.tv_nsec)))) {
@@ -1052,9 +1048,9 @@
                 LogTimeEntry::wrlock();
                 LastLogTimes::iterator times = mTimes.begin();
                 while (times != mTimes.end()) {
-                    LogTimeEntry* entry = (*times);
+                    LogTimeEntry* entry = times->get();
                     // Killer punch
-                    if (entry->owned_Locked() && entry->isWatching(id)) {
+                    if (entry->isWatching(id)) {
                         entry->release_Locked();
                     }
                     times++;
diff --git a/logd/LogReader.cpp b/logd/LogReader.cpp
index 2b6556d..13c7af3 100644
--- a/logd/LogReader.cpp
+++ b/logd/LogReader.cpp
@@ -41,6 +41,7 @@
     runOnEachSocket(&command);
 }
 
+// Note returning false will release the SocketClient instance.
 bool LogReader::onDataAvailable(SocketClient* cli) {
     static bool name_set;
     if (!name_set) {
@@ -57,6 +58,18 @@
     }
     buffer[len] = '\0';
 
+    // Clients are only allowed to send one command, disconnect them if they
+    // send another.
+    LogTimeEntry::wrlock();
+    for (const auto& entry : mLogbuf.mTimes) {
+        if (entry->mClient == cli) {
+            entry->release_Locked();
+            LogTimeEntry::unlock();
+            return false;
+        }
+    }
+    LogTimeEntry::unlock();
+
     unsigned long tail = 0;
     static const char _tail[] = " tail=";
     char* cp = strstr(buffer, _tail);
@@ -199,14 +212,25 @@
         cli->getUid(), cli->getGid(), cli->getPid(), nonBlock ? 'n' : 'b', tail,
         logMask, (int)pid, sequence.nsec(), timeout);
 
-    FlushCommand command(*this, nonBlock, tail, logMask, pid, sequence, timeout);
+    LogTimeEntry::wrlock();
+    auto entry = std::make_unique<LogTimeEntry>(
+        *this, cli, nonBlock, tail, logMask, pid, sequence, timeout);
+    if (!entry->startReader_Locked()) {
+        LogTimeEntry::unlock();
+        return false;
+    }
+
+    // release client and entry reference counts once done
+    cli->incRef();
+    mLogbuf.mTimes.emplace_front(std::move(entry));
 
     // Set acceptable upper limit to wait for slow reader processing b/27242723
     struct timeval t = { LOGD_SNDTIMEO, 0 };
     setsockopt(cli->getSocket(), SOL_SOCKET, SO_SNDTIMEO, (const char*)&t,
                sizeof(t));
 
-    command.runSocketCommand(cli);
+    LogTimeEntry::unlock();
+
     return true;
 }
 
@@ -215,9 +239,8 @@
     LogTimeEntry::wrlock();
     LastLogTimes::iterator it = times.begin();
     while (it != times.end()) {
-        LogTimeEntry* entry = (*it);
+        LogTimeEntry* entry = it->get();
         if (entry->mClient == cli) {
-            times.erase(it);
             entry->release_Locked();
             break;
         }
diff --git a/logd/LogTimes.cpp b/logd/LogTimes.cpp
index 7a6f84b..1715501 100644
--- a/logd/LogTimes.cpp
+++ b/logd/LogTimes.cpp
@@ -30,11 +30,7 @@
 LogTimeEntry::LogTimeEntry(LogReader& reader, SocketClient* client,
                            bool nonBlock, unsigned long tail, log_mask_t logMask,
                            pid_t pid, log_time start, uint64_t timeout)
-    : mRefCount(1),
-      mRelease(false),
-      mError(false),
-      threadRunning(false),
-      leadingDropped(false),
+    : leadingDropped(false),
       mReader(reader),
       mLogMask(logMask),
       mPid(pid),
@@ -52,65 +48,21 @@
     cleanSkip_Locked();
 }
 
-void LogTimeEntry::startReader_Locked(void) {
+bool LogTimeEntry::startReader_Locked() {
     pthread_attr_t attr;
 
-    threadRunning = true;
-
     if (!pthread_attr_init(&attr)) {
         if (!pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) {
             if (!pthread_create(&mThread, &attr, LogTimeEntry::threadStart,
                                 this)) {
                 pthread_attr_destroy(&attr);
-                return;
+                return true;
             }
         }
         pthread_attr_destroy(&attr);
     }
-    threadRunning = false;
-    if (mClient) {
-        mClient->decRef();
-    }
-    decRef_Locked();
-}
 
-void LogTimeEntry::threadStop(void* obj) {
-    LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj);
-
-    wrlock();
-
-    if (me->mNonBlock) {
-        me->error_Locked();
-    }
-
-    SocketClient* client = me->mClient;
-
-    if (me->isError_Locked()) {
-        LogReader& reader = me->mReader;
-        LastLogTimes& times = reader.logbuf().mTimes;
-
-        LastLogTimes::iterator it = times.begin();
-        while (it != times.end()) {
-            if (*it == me) {
-                times.erase(it);
-                me->release_nodelete_Locked();
-                break;
-            }
-            it++;
-        }
-
-        me->mClient = nullptr;
-        reader.release(client);
-    }
-
-    if (client) {
-        client->decRef();
-    }
-
-    me->threadRunning = false;
-    me->decRef_Locked();
-
-    unlock();
+    return false;
 }
 
 void* LogTimeEntry::threadStart(void* obj) {
@@ -118,13 +70,7 @@
 
     LogTimeEntry* me = reinterpret_cast<LogTimeEntry*>(obj);
 
-    pthread_cleanup_push(threadStop, obj);
-
     SocketClient* client = me->mClient;
-    if (!client) {
-        me->error();
-        return nullptr;
-    }
 
     LogBuffer& logbuf = me->mReader.logbuf();
 
@@ -137,14 +83,14 @@
 
     log_time start = me->mStart;
 
-    while (me->threadRunning && !me->isError_Locked()) {
+    while (!me->mRelease) {
         if (me->mTimeout.tv_sec || me->mTimeout.tv_nsec) {
             if (pthread_cond_timedwait(&me->threadTriggeredCondition,
                                        &timesLock, &me->mTimeout) == ETIMEDOUT) {
                 me->mTimeout.tv_sec = 0;
                 me->mTimeout.tv_nsec = 0;
             }
-            if (!me->threadRunning || me->isError_Locked()) {
+            if (me->mRelease) {
                 break;
             }
         }
@@ -162,13 +108,12 @@
         wrlock();
 
         if (start == LogBufferElement::FLUSH_ERROR) {
-            me->error_Locked();
             break;
         }
 
         me->mStart = start + log_time(0, 1);
 
-        if (me->mNonBlock || !me->threadRunning || me->isError_Locked()) {
+        if (me->mNonBlock || me->mRelease) {
             break;
         }
 
@@ -179,9 +124,21 @@
         }
     }
 
-    unlock();
+    LogReader& reader = me->mReader;
+    reader.release(client);
 
-    pthread_cleanup_pop(true);
+    client->decRef();
+
+    LastLogTimes& times = reader.logbuf().mTimes;
+    auto it =
+        std::find_if(times.begin(), times.end(),
+                     [&me](const auto& other) { return other.get() == me; });
+
+    if (it != times.end()) {
+        times.erase(it);
+    }
+
+    unlock();
 
     return nullptr;
 }
@@ -247,10 +204,6 @@
         goto skip;
     }
 
-    if (me->isError_Locked()) {
-        goto stop;
-    }
-
     if (!me->mTail) {
         goto ok;
     }
diff --git a/logd/LogTimes.h b/logd/LogTimes.h
index 76d016c..f4e165f 100644
--- a/logd/LogTimes.h
+++ b/logd/LogTimes.h
@@ -22,6 +22,7 @@
 #include <time.h>
 
 #include <list>
+#include <memory>
 
 #include <log/log.h>
 #include <sysutils/SocketClient.h>
@@ -33,16 +34,12 @@
 
 class LogTimeEntry {
     static pthread_mutex_t timesLock;
-    unsigned int mRefCount;
-    bool mRelease;
-    bool mError;
-    bool threadRunning;
+    bool mRelease = false;
     bool leadingDropped;
     pthread_cond_t threadTriggeredCondition;
     pthread_t mThread;
     LogReader& mReader;
     static void* threadStart(void* me);
-    static void threadStop(void* me);
     const log_mask_t mLogMask;
     const pid_t mPid;
     unsigned int skipAhead[LOG_ID_MAX];
@@ -73,11 +70,8 @@
         pthread_mutex_unlock(&timesLock);
     }
 
-    void startReader_Locked(void);
+    bool startReader_Locked();
 
-    bool runningReader_Locked(void) const {
-        return threadRunning || mRelease || mError || mNonBlock;
-    }
     void triggerReader_Locked(void) {
         pthread_cond_signal(&threadTriggeredCondition);
     }
@@ -87,54 +81,11 @@
     }
     void cleanSkip_Locked(void);
 
-    // These called after LogTimeEntry removed from list, lock implicitly held
-    void release_nodelete_Locked(void) {
-        mRelease = true;
-        pthread_cond_signal(&threadTriggeredCondition);
-        // assumes caller code path will call decRef_Locked()
-    }
-
     void release_Locked(void) {
         mRelease = true;
         pthread_cond_signal(&threadTriggeredCondition);
-        if (mRefCount || threadRunning) {
-            return;
-        }
-        // No one else is holding a reference to this
-        delete this;
     }
 
-    // Called to mark socket in jeopardy
-    void error_Locked(void) {
-        mError = true;
-    }
-    void error(void) {
-        wrlock();
-        error_Locked();
-        unlock();
-    }
-
-    bool isError_Locked(void) const {
-        return mRelease || mError;
-    }
-
-    // Mark Used
-    //  Locking implied, grabbed when protection around loop iteration
-    void incRef_Locked(void) {
-        ++mRefCount;
-    }
-
-    bool owned_Locked(void) const {
-        return mRefCount != 0;
-    }
-
-    void decRef_Locked(void) {
-        if ((mRefCount && --mRefCount) || !mRelease || threadRunning) {
-            return;
-        }
-        // No one else is holding a reference to this
-        delete this;
-    }
     bool isWatching(log_id_t id) const {
         return mLogMask & (1 << id);
     }
@@ -146,6 +97,6 @@
     static int FilterSecondPass(const LogBufferElement* element, void* me);
 };
 
-typedef std::list<LogTimeEntry*> LastLogTimes;
+typedef std::list<std::unique_ptr<LogTimeEntry>> LastLogTimes;
 
 #endif  // _LOGD_LOG_TIMES_H__