AU: Execute postinst asynchronously so that the D-Bus service is not blocked.

This CL also cleans up Subprocess memory handling a bit and extends it to
capture the output of asynchronous subprocesses. Also adds a scoped temp mount
unmounter utility class.

BUG=8937
TEST=unit tests, tested on the device -- making sure no D-Bus calls timeou
during postinstall.

Change-Id: I219dda3dc98d875ff05050f1a32ffcc925db1d53

Review URL: http://codereview.chromium.org/4690006
diff --git a/postinstall_runner_action.cc b/postinstall_runner_action.cc
index 9a84563..c6320e1 100644
--- a/postinstall_runner_action.cc
+++ b/postinstall_runner_action.cc
@@ -15,7 +15,7 @@
 using std::vector;
 
 namespace {
-const string kPostinstallScript("/postinst");
+const char kPostinstallScript[] = "/postinst";
 }
 
 void PostinstallRunnerAction::PerformAction() {
@@ -24,53 +24,61 @@
   const string install_device = install_plan.install_path;
   ScopedActionCompleter completer(processor_, this);
 
-  // Make mountpoint
-  string temp_dir;
+  // Make mountpoint.
   TEST_AND_RETURN(utils::MakeTempDirectory("/tmp/au_postint_mount.XXXXXX",
-                                           &temp_dir));
-  ScopedDirRemover temp_dir_remover(temp_dir);
+                                           &temp_rootfs_dir_));
+  ScopedDirRemover temp_dir_remover(temp_rootfs_dir_);
 
-  {
-    // Scope for the mount
-    unsigned long mountflags = MS_RDONLY;
-
-    int rc = mount(install_device.c_str(),
-                   temp_dir.c_str(),
-                   "ext4",
-                   mountflags,
-                   NULL);
-    if (rc < 0) {
-      LOG(INFO) << "Failed to mount install part as ext4. Trying ext3.";
-      rc = mount(install_device.c_str(),
-                 temp_dir.c_str(),
-                 "ext3",
+  unsigned long mountflags = MS_RDONLY;
+  int rc = mount(install_device.c_str(),
+                 temp_rootfs_dir_.c_str(),
+                 "ext4",
                  mountflags,
                  NULL);
-    }
-    if (rc < 0) {
-      LOG(ERROR) << "Unable to mount destination device " << install_device
-                 << " onto " << temp_dir;
-      return;
-    }
-    ScopedFilesystemUnmounter unmounter(temp_dir);
-
-    // run postinstall script
-    vector<string> command;
-    command.push_back(temp_dir + kPostinstallScript);
-    command.push_back(install_device);
-    rc = 0;
-    TEST_AND_RETURN(Subprocess::SynchronousExec(command, &rc));
-    bool success = (rc == 0);
-    if (!success) {
-      LOG(ERROR) << "Postinst command failed with code: " << rc;
-      return;
-    }
+  if (rc < 0) {
+    LOG(INFO) << "Failed to mount install part as ext4. Trying ext3.";
+    rc = mount(install_device.c_str(),
+               temp_rootfs_dir_.c_str(),
+               "ext3",
+               mountflags,
+               NULL);
+  }
+  if (rc < 0) {
+    LOG(ERROR) << "Unable to mount destination device " << install_device
+               << " onto " << temp_rootfs_dir_;
+    return;
   }
 
+  temp_dir_remover.set_should_remove(false);
+  completer.set_should_complete(false);
+
+  // Runs the postinstall script asynchronously to free up the main loop while
+  // it's running.
+  vector<string> command;
+  command.push_back(temp_rootfs_dir_ + kPostinstallScript);
+  command.push_back(install_device);
+  Subprocess::Get().Exec(command, StaticCompletePostinstall, this);
+}
+
+void PostinstallRunnerAction::CompletePostinstall(int return_code) {
+  ScopedActionCompleter completer(processor_, this);
+  ScopedTempUnmounter temp_unmounter(temp_rootfs_dir_);
+  if (return_code != 0) {
+    LOG(ERROR) << "Postinst command failed with code: " << return_code;
+    return;
+  }
   if (HasOutputPipe()) {
-    SetOutputObject(install_plan);
+    CHECK(HasInputObject());
+    SetOutputObject(GetInputObject());
   }
   completer.set_code(kActionCodeSuccess);
 }
 
+void PostinstallRunnerAction::StaticCompletePostinstall(int return_code,
+                                                        const string& output,
+                                                        void* p) {
+  reinterpret_cast<PostinstallRunnerAction*>(p)->CompletePostinstall(
+      return_code);
+}
+
 }  // namespace chromeos_update_engine
diff --git a/postinstall_runner_action.h b/postinstall_runner_action.h
index c2498e0..9a2ebcb 100644
--- a/postinstall_runner_action.h
+++ b/postinstall_runner_action.h
@@ -6,6 +6,7 @@
 #define CHROMEOS_PLATFORM_UPDATE_ENGINE_POSTINSTALL_RUNNER_ACTION_H__
 
 #include <string>
+
 #include "update_engine/action.h"
 #include "update_engine/install_plan.h"
 
@@ -35,8 +36,7 @@
       OutputObjectType;
   void PerformAction();
 
-  // This is a synchronous action, and thus TerminateProcessing() should
-  // never be called
+  // Note that there's no support for terminating this action currently.
   void TerminateProcessing() { CHECK(false); }
 
   // Debugging/logging
@@ -44,6 +44,14 @@
   std::string Type() const { return StaticType(); }
 
  private:
+  // Subprocess::Exec callback.
+  void CompletePostinstall(int return_code);
+  static void StaticCompletePostinstall(int return_code,
+                                        const std::string& output,
+                                        void* p);
+
+  std::string temp_rootfs_dir_;
+
   DISALLOW_COPY_AND_ASSIGN(PostinstallRunnerAction);
 };
 
diff --git a/postinstall_runner_action_unittest.cc b/postinstall_runner_action_unittest.cc
index 55fe79f..7f4e65c 100644
--- a/postinstall_runner_action_unittest.cc
+++ b/postinstall_runner_action_unittest.cc
@@ -17,6 +17,14 @@
 
 namespace chromeos_update_engine {
 
+namespace {
+gboolean StartProcessorInRunLoop(gpointer data) {
+  ActionProcessor *processor = reinterpret_cast<ActionProcessor*>(data);
+  processor->StartProcessing();
+  return FALSE;
+}
+}  // namespace
+
 class PostinstallRunnerActionTest : public ::testing::Test {
  public:
   void DoTest(bool do_losetup, bool do_err_script);
@@ -25,8 +33,14 @@
 class PostinstActionProcessorDelegate : public ActionProcessorDelegate {
  public:
   PostinstActionProcessorDelegate()
-      : code_(kActionCodeError),
+      : loop_(NULL),
+        code_(kActionCodeError),
         code_set_(false) {}
+  void ProcessingDone(const ActionProcessor* processor,
+                      ActionExitCode code) {
+    ASSERT_TRUE(loop_);
+    g_main_loop_quit(loop_);
+  }
   void ActionCompleted(ActionProcessor* processor,
                        AbstractAction* action,
                        ActionExitCode code) {
@@ -35,6 +49,7 @@
       code_set_ = true;
     }
   }
+  GMainLoop* loop_;
   ActionExitCode code_;
   bool code_set_;
 };
@@ -125,9 +140,13 @@
   processor.EnqueueAction(&runner_action);
   processor.EnqueueAction(&collector_action);
   processor.set_delegate(&delegate);
-  processor.StartProcessing();
-  ASSERT_FALSE(processor.IsRunning())
-      << "Update test to handle non-asynch actions";
+
+  GMainLoop* loop = g_main_loop_new(g_main_context_default(), FALSE);
+  delegate.loop_ = loop;
+  g_timeout_add(0, &StartProcessorInRunLoop, &processor);
+  g_main_loop_run(loop);
+  g_main_loop_unref(loop);
+  ASSERT_FALSE(processor.IsRunning());
 
   EXPECT_TRUE(delegate.code_set_);
   EXPECT_EQ(do_losetup && !do_err_script, delegate.code_ == kActionCodeSuccess);
diff --git a/subprocess.cc b/subprocess.cc
index af5fb75..9c19367 100755
--- a/subprocess.cc
+++ b/subprocess.cc
@@ -13,26 +13,55 @@
 #include "base/string_util.h"
 
 using std::string;
+using std::tr1::shared_ptr;
 using std::vector;
 
 namespace chromeos_update_engine {
 
 void Subprocess::GChildExitedCallback(GPid pid, gint status, gpointer data) {
-  COMPILE_ASSERT(sizeof(guint) == sizeof(uint32_t),
-                 guint_uint32_size_mismatch);
-  guint* tag = reinterpret_cast<guint*>(data);
-  const SubprocessCallbackRecord& record = Get().callback_records_[*tag];
-  if (record.callback)
-    record.callback(status, record.callback_data);
+  SubprocessRecord* record = reinterpret_cast<SubprocessRecord*>(data);
+
+  // Make sure we read any remaining process output. Then close the pipe.
+  GStdoutWatchCallback(record->gioout, G_IO_IN, &record->stdout);
+  int fd = g_io_channel_unix_get_fd(record->gioout);
+  g_source_remove(record->gioout_tag);
+  g_io_channel_unref(record->gioout);
+  close(fd);
+
   g_spawn_close_pid(pid);
-  Get().callback_records_.erase(*tag);
-  delete tag;
+  if (status) {
+    LOG(INFO) << "Subprocess status: " << status;
+  }
+  if (!record->stdout.empty()) {
+    LOG(INFO) << "Subprocess output:\n" << record->stdout;
+  }
+  if (record->callback) {
+    record->callback(status, record->stdout, record->callback_data);
+  }
+  Get().subprocess_records_.erase(record->tag);
 }
 
 void Subprocess::GRedirectStderrToStdout(gpointer user_data) {
   dup2(1, 2);
 }
 
+gboolean Subprocess::GStdoutWatchCallback(GIOChannel* source,
+                                          GIOCondition condition,
+                                          gpointer data) {
+  string* stdout = reinterpret_cast<string*>(data);
+  char buf[1024];
+  gsize bytes_read;
+  while (g_io_channel_read_chars(source,
+                                 buf,
+                                 arraysize(buf),
+                                 &bytes_read,
+                                 NULL) == G_IO_STATUS_NORMAL &&
+         bytes_read > 0) {
+    stdout->append(buf, bytes_read);
+  }
+  return TRUE;  // Keep the callback source. It's freed in GChilExitedCallback.
+}
+
 namespace {
 void FreeArgv(char** argv) {
   for (int i = 0; argv[i]; i++) {
@@ -106,33 +135,47 @@
   }
   ScopedFreeArgPointer argp_free(argp);
 
-  SubprocessCallbackRecord callback_record;
-  callback_record.callback = callback;
-  callback_record.callback_data = p;
-
-  bool success = g_spawn_async(NULL,  // working directory
-                               argv.get(),
-                               argp,
-                               G_SPAWN_DO_NOT_REAP_CHILD,  // flags
-                               NULL,  // child setup function
-                               NULL,  // child setup data pointer
-                               &child_pid,
-                               &err);
+  shared_ptr<SubprocessRecord> record(new SubprocessRecord);
+  record->callback = callback;
+  record->callback_data = p;
+  gint stdout_fd = -1;
+  bool success = g_spawn_async_with_pipes(
+      NULL,  // working directory
+      argv.get(),
+      argp,
+      G_SPAWN_DO_NOT_REAP_CHILD,  // flags
+      GRedirectStderrToStdout,  // child setup function
+      NULL,  // child setup data pointer
+      &child_pid,
+      NULL,
+      &stdout_fd,
+      NULL,
+      &err);
   FreeArgv(argv.get());
   if (!success) {
     LOG(ERROR) << "g_spawn_async failed";
     return 0;
   }
-  guint* tag = new guint;
-  *tag = g_child_watch_add(child_pid, GChildExitedCallback, tag);
-  callback_records_[*tag] = callback_record;
-  return *tag;
+  record->tag =
+      g_child_watch_add(child_pid, GChildExitedCallback, record.get());
+  subprocess_records_[record->tag] = record;
+
+  // Capture the subprocess output.
+  record->gioout = g_io_channel_unix_new(stdout_fd);
+  g_io_channel_set_encoding(record->gioout, NULL, NULL);
+  LOG_IF(WARNING,
+         g_io_channel_set_flags(record->gioout, G_IO_FLAG_NONBLOCK, NULL) !=
+         G_IO_STATUS_NORMAL) << "Unable to set non-blocking I/O mode.";
+  record->gioout_tag = g_io_add_watch(
+      record->gioout,
+      static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
+      GStdoutWatchCallback,
+      &record->stdout);
+  return record->tag;
 }
 
 void Subprocess::CancelExec(uint32_t tag) {
-  if (callback_records_[tag].callback) {
-    callback_records_[tag].callback = NULL;
-  }
+  subprocess_records_[tag]->callback = NULL;
 }
 
 bool Subprocess::SynchronousExecFlags(const std::vector<std::string>& cmd,
@@ -178,6 +221,16 @@
   return success;
 }
 
+bool Subprocess::SubprocessInFlight() {
+  for (std::map<int, shared_ptr<SubprocessRecord> >::iterator it =
+           subprocess_records_.begin();
+       it != subprocess_records_.end(); ++it) {
+    if (it->second->callback)
+      return true;
+  }
+  return false;
+}
+
 Subprocess* Subprocess::subprocess_singleton_ = NULL;
 
 }  // namespace chromeos_update_engine
diff --git a/subprocess.h b/subprocess.h
index bf1e7ea..3492e79 100644
--- a/subprocess.h
+++ b/subprocess.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
@@ -7,8 +7,11 @@
 
 #include <map>
 #include <string>
+#include <tr1/memory>
 #include <vector>
+
 #include <glib.h>
+
 #include "base/basictypes.h"
 #include "base/logging.h"
 
@@ -21,12 +24,14 @@
 
 class Subprocess {
  public:
+  typedef void(*ExecCallback)(int return_code,
+                              const std::string& output,
+                              void *p);
+
   static void Init() {
     CHECK(!subprocess_singleton_);
     subprocess_singleton_ = new Subprocess;
   }
-   
-  typedef void(*ExecCallback)(int return_code, void *p);
 
   // Returns a tag > 0 on success.
   uint32_t Exec(const std::vector<std::string>& cmd,
@@ -49,20 +54,27 @@
   static Subprocess& Get() {
     return *subprocess_singleton_;
   }
-  
+
   // Returns true iff there is at least one subprocess we're waiting on.
-  bool SubprocessInFlight() {
-    for (std::map<int, SubprocessCallbackRecord>::iterator it =
-             callback_records_.begin();
-         it != callback_records_.end(); ++it) {
-      if (it->second.callback)
-        return true;
-    }
-    return false;
-  }
+  bool SubprocessInFlight();
+
  private:
-  // The global instance
-  static Subprocess* subprocess_singleton_;
+  struct SubprocessRecord {
+    SubprocessRecord()
+        : tag(0),
+          callback(NULL),
+          callback_data(NULL),
+          gioout(NULL),
+          gioout_tag(0) {}
+    uint32_t tag;
+    ExecCallback callback;
+    void* callback_data;
+    GIOChannel* gioout;
+    guint gioout_tag;
+    std::string stdout;
+  };
+
+  Subprocess() {}
 
   // Callback for when any subprocess terminates. This calls the user
   // requested callback.
@@ -72,14 +84,19 @@
   // stdout.
   static void GRedirectStderrToStdout(gpointer user_data);
 
-  struct SubprocessCallbackRecord {
-    ExecCallback callback;
-    void* callback_data;
-  };
+  // Callback which runs whenever there is input available on the subprocess
+  // stdout pipe.
+  static gboolean GStdoutWatchCallback(GIOChannel* source,
+                                       GIOCondition condition,
+                                       gpointer data);
 
-  std::map<int, SubprocessCallbackRecord> callback_records_;
+  // The global instance.
+  static Subprocess* subprocess_singleton_;
 
-  Subprocess() {}
+  // A map from the asynchronous subprocess tag (see Exec) to the subprocess
+  // record structure for all active asynchronous subprocesses.
+  std::map<int, std::tr1::shared_ptr<SubprocessRecord> > subprocess_records_;
+
   DISALLOW_COPY_AND_ASSIGN(Subprocess);
 };
 
diff --git a/subprocess_unittest.cc b/subprocess_unittest.cc
index 679f85c..314e0ae 100644
--- a/subprocess_unittest.cc
+++ b/subprocess_unittest.cc
@@ -27,18 +27,35 @@
 namespace {
 const int kLocalHttpPort = 8088;
 
-void Callback(int return_code, void *p) {
+void Callback(int return_code, const string& output, void *p) {
   EXPECT_EQ(256, return_code);
   GMainLoop* loop = reinterpret_cast<GMainLoop*>(p);
   g_main_loop_quit(loop);
 }
 
+void CallbackEcho(int return_code, const string& output, void *p) {
+  EXPECT_EQ(0, return_code);
+  EXPECT_NE(string::npos, output.find("this is stdout"));
+  EXPECT_NE(string::npos, output.find("this is stderr"));
+  GMainLoop* loop = reinterpret_cast<GMainLoop*>(p);
+  g_main_loop_quit(loop);
+}
+
 gboolean LaunchFalseInMainLoop(gpointer data) {
   vector<string> cmd;
   cmd.push_back("/bin/false");
   Subprocess::Get().Exec(cmd, Callback, data);
   return FALSE;
 }
+
+gboolean LaunchEchoInMainLoop(gpointer data) {
+  vector<string> cmd;
+  cmd.push_back("/bin/sh");
+  cmd.push_back("-c");
+  cmd.push_back("echo this is stdout; echo this is stderr > /dev/stderr");
+  Subprocess::Get().Exec(cmd, CallbackEcho, data);
+  return FALSE;
+}
 }  // namespace {}
 
 TEST(SubprocessTest, SimpleTest) {
@@ -48,8 +65,15 @@
   g_main_loop_unref(loop);
 }
 
+TEST(SubprocessTest, EchoTest) {
+  GMainLoop *loop = g_main_loop_new(g_main_context_default(), FALSE);
+  g_timeout_add(0, &LaunchEchoInMainLoop, loop);
+  g_main_loop_run(loop);
+  g_main_loop_unref(loop);
+}
+
 namespace {
-void CallbackBad(int return_code, void *p) {
+void CallbackBad(int return_code, const string& output, void *p) {
   CHECK(false) << "should never be called.";
 }
 
diff --git a/utils.h b/utils.h
index 57fb3bb..208abe8 100644
--- a/utils.h
+++ b/utils.h
@@ -237,12 +237,17 @@
 class ScopedFilesystemUnmounter {
  public:
   explicit ScopedFilesystemUnmounter(const std::string& mountpoint)
-      : mountpoint_(mountpoint) {}
+      : mountpoint_(mountpoint),
+        should_unmount_(true) {}
   ~ScopedFilesystemUnmounter() {
-    utils::UnmountFilesystem(mountpoint_);
+    if (should_unmount_) {
+      utils::UnmountFilesystem(mountpoint_);
+    }
   }
+  void set_should_unmount(bool unmount) { should_unmount_ = unmount; }
  private:
   const std::string mountpoint_;
+  bool should_unmount_;
   DISALLOW_COPY_AND_ASSIGN(ScopedFilesystemUnmounter);
 };
 
@@ -250,15 +255,13 @@
 class ScopedFdCloser {
  public:
   explicit ScopedFdCloser(int* fd) : fd_(fd), should_close_(true) {}
-  void set_should_close(bool should_close) { should_close_ = should_close; }
   ~ScopedFdCloser() {
-    if (!should_close_)
-      return;
-    if (fd_ && (*fd_ >= 0)) {
+    if (should_close_ && fd_ && (*fd_ >= 0)) {
       close(*fd_);
       *fd_ = -1;
     }
   }
+  void set_should_close(bool should_close) { should_close_ = should_close; }
  private:
   int* fd_;
   bool should_close_;
@@ -283,14 +286,35 @@
 // Utility class to delete an empty directory when it goes out of scope.
 class ScopedDirRemover {
  public:
-  explicit ScopedDirRemover(const std::string& path) : path_(path) {}
+  explicit ScopedDirRemover(const std::string& path)
+      : path_(path),
+        should_remove_(true) {}
   ~ScopedDirRemover() {
-    if (rmdir(path_.c_str()) < 0)
+    if (should_remove_ && (rmdir(path_.c_str()) < 0)) {
       PLOG(ERROR) << "Unable to remove dir " << path_;
+    }
+  }
+  void set_should_remove(bool should_remove) { should_remove_ = should_remove; }
+
+ protected:
+  const std::string path_;
+
+ private:
+  bool should_remove_;
+  DISALLOW_COPY_AND_ASSIGN(ScopedDirRemover);
+};
+
+// Utility class to unmount a filesystem mounted on a temporary directory and
+// delete the temporary directory when it goes out of scope.
+class ScopedTempUnmounter : public ScopedDirRemover {
+ public:
+  explicit ScopedTempUnmounter(const std::string& path) :
+      ScopedDirRemover(path) {}
+  ~ScopedTempUnmounter() {
+    utils::UnmountFilesystem(path_);
   }
  private:
-  const std::string path_;
-  DISALLOW_COPY_AND_ASSIGN(ScopedDirRemover);
+  DISALLOW_COPY_AND_ASSIGN(ScopedTempUnmounter);
 };
 
 // A little object to call ActionComplete on the ActionProcessor when