AU: Execute postinst asynchronously so that the D-Bus service is not blocked.

This CL also cleans up Subprocess memory handling a bit and extends it to
capture the output of asynchronous subprocesses. Also adds a scoped temp mount
unmounter utility class.

BUG=8937
TEST=unit tests, tested on the device -- making sure no D-Bus calls timeou
during postinstall.

Change-Id: I219dda3dc98d875ff05050f1a32ffcc925db1d53

Review URL: http://codereview.chromium.org/4690006
diff --git a/subprocess.cc b/subprocess.cc
index af5fb75..9c19367 100755
--- a/subprocess.cc
+++ b/subprocess.cc
@@ -13,26 +13,55 @@
 #include "base/string_util.h"
 
 using std::string;
+using std::tr1::shared_ptr;
 using std::vector;
 
 namespace chromeos_update_engine {
 
 void Subprocess::GChildExitedCallback(GPid pid, gint status, gpointer data) {
-  COMPILE_ASSERT(sizeof(guint) == sizeof(uint32_t),
-                 guint_uint32_size_mismatch);
-  guint* tag = reinterpret_cast<guint*>(data);
-  const SubprocessCallbackRecord& record = Get().callback_records_[*tag];
-  if (record.callback)
-    record.callback(status, record.callback_data);
+  SubprocessRecord* record = reinterpret_cast<SubprocessRecord*>(data);
+
+  // Make sure we read any remaining process output. Then close the pipe.
+  GStdoutWatchCallback(record->gioout, G_IO_IN, &record->stdout);
+  int fd = g_io_channel_unix_get_fd(record->gioout);
+  g_source_remove(record->gioout_tag);
+  g_io_channel_unref(record->gioout);
+  close(fd);
+
   g_spawn_close_pid(pid);
-  Get().callback_records_.erase(*tag);
-  delete tag;
+  if (status) {
+    LOG(INFO) << "Subprocess status: " << status;
+  }
+  if (!record->stdout.empty()) {
+    LOG(INFO) << "Subprocess output:\n" << record->stdout;
+  }
+  if (record->callback) {
+    record->callback(status, record->stdout, record->callback_data);
+  }
+  Get().subprocess_records_.erase(record->tag);
 }
 
 void Subprocess::GRedirectStderrToStdout(gpointer user_data) {
   dup2(1, 2);
 }
 
+gboolean Subprocess::GStdoutWatchCallback(GIOChannel* source,
+                                          GIOCondition condition,
+                                          gpointer data) {
+  string* stdout = reinterpret_cast<string*>(data);
+  char buf[1024];
+  gsize bytes_read;
+  while (g_io_channel_read_chars(source,
+                                 buf,
+                                 arraysize(buf),
+                                 &bytes_read,
+                                 NULL) == G_IO_STATUS_NORMAL &&
+         bytes_read > 0) {
+    stdout->append(buf, bytes_read);
+  }
+  return TRUE;  // Keep the callback source. It's freed in GChilExitedCallback.
+}
+
 namespace {
 void FreeArgv(char** argv) {
   for (int i = 0; argv[i]; i++) {
@@ -106,33 +135,47 @@
   }
   ScopedFreeArgPointer argp_free(argp);
 
-  SubprocessCallbackRecord callback_record;
-  callback_record.callback = callback;
-  callback_record.callback_data = p;
-
-  bool success = g_spawn_async(NULL,  // working directory
-                               argv.get(),
-                               argp,
-                               G_SPAWN_DO_NOT_REAP_CHILD,  // flags
-                               NULL,  // child setup function
-                               NULL,  // child setup data pointer
-                               &child_pid,
-                               &err);
+  shared_ptr<SubprocessRecord> record(new SubprocessRecord);
+  record->callback = callback;
+  record->callback_data = p;
+  gint stdout_fd = -1;
+  bool success = g_spawn_async_with_pipes(
+      NULL,  // working directory
+      argv.get(),
+      argp,
+      G_SPAWN_DO_NOT_REAP_CHILD,  // flags
+      GRedirectStderrToStdout,  // child setup function
+      NULL,  // child setup data pointer
+      &child_pid,
+      NULL,
+      &stdout_fd,
+      NULL,
+      &err);
   FreeArgv(argv.get());
   if (!success) {
     LOG(ERROR) << "g_spawn_async failed";
     return 0;
   }
-  guint* tag = new guint;
-  *tag = g_child_watch_add(child_pid, GChildExitedCallback, tag);
-  callback_records_[*tag] = callback_record;
-  return *tag;
+  record->tag =
+      g_child_watch_add(child_pid, GChildExitedCallback, record.get());
+  subprocess_records_[record->tag] = record;
+
+  // Capture the subprocess output.
+  record->gioout = g_io_channel_unix_new(stdout_fd);
+  g_io_channel_set_encoding(record->gioout, NULL, NULL);
+  LOG_IF(WARNING,
+         g_io_channel_set_flags(record->gioout, G_IO_FLAG_NONBLOCK, NULL) !=
+         G_IO_STATUS_NORMAL) << "Unable to set non-blocking I/O mode.";
+  record->gioout_tag = g_io_add_watch(
+      record->gioout,
+      static_cast<GIOCondition>(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),
+      GStdoutWatchCallback,
+      &record->stdout);
+  return record->tag;
 }
 
 void Subprocess::CancelExec(uint32_t tag) {
-  if (callback_records_[tag].callback) {
-    callback_records_[tag].callback = NULL;
-  }
+  subprocess_records_[tag]->callback = NULL;
 }
 
 bool Subprocess::SynchronousExecFlags(const std::vector<std::string>& cmd,
@@ -178,6 +221,16 @@
   return success;
 }
 
+bool Subprocess::SubprocessInFlight() {
+  for (std::map<int, shared_ptr<SubprocessRecord> >::iterator it =
+           subprocess_records_.begin();
+       it != subprocess_records_.end(); ++it) {
+    if (it->second->callback)
+      return true;
+  }
+  return false;
+}
+
 Subprocess* Subprocess::subprocess_singleton_ = NULL;
 
 }  // namespace chromeos_update_engine