Restructure FilesystemCopierAction to use ping-pong buffers.
This would allow some parallel processing -- calculating hashes in parallel with
read in parallel with write. Performance benefit at this stage seems
marginal. The major reason for the change is to reuse the FilesystemCopierAction
for applied update verification -- hopefully speeding it up and making AU
responsive to D-Bus signals during that stage. That would be a separate CL
though.
BUG=9140
TEST=unit tests, tested update on the device
Change-Id: I9d07ebbf4d536362c0a955addcaa1d48d5e9b115
Review URL: http://codereview.chromium.org/5603001
diff --git a/filesystem_copier_action.cc b/filesystem_copier_action.cc
index 1452b4d..f3f71bd 100755
--- a/filesystem_copier_action.cc
+++ b/filesystem_copier_action.cc
@@ -32,9 +32,32 @@
namespace chromeos_update_engine {
namespace {
-const off_t kCopyFileBufferSize = 2 * 1024 * 1024;
+const off_t kCopyFileBufferSize = 512 * 1024;
} // namespace {}
+FilesystemCopierAction::FilesystemCopierAction(
+ bool copying_kernel_install_path)
+ : copying_kernel_install_path_(copying_kernel_install_path),
+ src_stream_(NULL),
+ dst_stream_(NULL),
+ read_done_(false),
+ failed_(false),
+ cancelled_(false),
+ filesystem_size_(kint64max) {
+ // A lot of code works on the implicit assumption that processing is done on
+ // exactly 2 ping-pong buffers.
+ COMPILE_ASSERT(arraysize(buffer_) == 2 &&
+ arraysize(buffer_state_) == 2 &&
+ arraysize(buffer_valid_size_) == 2 &&
+ arraysize(canceller_) == 2,
+ ping_pong_buffers_not_two);
+ for (int i = 0; i < 2; ++i) {
+ buffer_state_[i] = kBufferStateEmpty;
+ buffer_valid_size_[i] = 0;
+ canceller_[i] = NULL;
+ }
+}
+
void FilesystemCopierAction::PerformAction() {
// Will tell the ActionProcessor we've failed if we return.
ScopedActionCompleter abort_action_completer(processor_, this);
@@ -84,35 +107,35 @@
src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE);
- buffer_.resize(kCopyFileBufferSize);
+ for (int i = 0; i < 2; i++) {
+ buffer_[i].resize(kCopyFileBufferSize);
+ canceller_[i] = g_cancellable_new();
+ }
- // Set up the first read
- canceller_ = g_cancellable_new();
-
- g_input_stream_read_async(src_stream_,
- &buffer_[0],
- GetBytesToRead(),
- G_PRIORITY_DEFAULT,
- canceller_,
- &FilesystemCopierAction::StaticAsyncReadyCallback,
- this);
- read_in_flight_ = true;
+ // Start the first read.
+ SpawnAsyncActions();
abort_action_completer.set_should_complete(false);
}
void FilesystemCopierAction::TerminateProcessing() {
- if (canceller_) {
- g_cancellable_cancel(canceller_);
+ for (int i = 0; i < 2; i++) {
+ if (canceller_[i]) {
+ g_cancellable_cancel(canceller_[i]);
+ }
}
}
-void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) {
+void FilesystemCopierAction::Cleanup(bool success) {
+ for (int i = 0; i < 2; i++) {
+ g_object_unref(canceller_[i]);
+ canceller_[i] = NULL;
+ }
g_object_unref(src_stream_);
src_stream_ = NULL;
g_object_unref(dst_stream_);
dst_stream_ = NULL;
- if (was_cancelled)
+ if (cancelled_)
return;
if (success && HasOutputPipe())
SetOutputObject(install_plan_);
@@ -121,85 +144,141 @@
success ? kActionCodeSuccess : kActionCodeError);
}
-void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object,
- GAsyncResult *res) {
+void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object,
+ GAsyncResult *res) {
+ int index = buffer_state_[0] == kBufferStateReading ? 0 : 1;
+ CHECK(buffer_state_[index] == kBufferStateReading);
+
GError* error = NULL;
- CHECK(canceller_);
- bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE;
- g_object_unref(canceller_);
- canceller_ = NULL;
+ CHECK(canceller_[index]);
+ cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
- if (read_in_flight_) {
- ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
- if (bytes_read < 0) {
- LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error);
- Cleanup(false, was_cancelled);
- return;
+ ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
+ if (bytes_read < 0) {
+ LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error);
+ failed_ = true;
+ buffer_state_[index] = kBufferStateEmpty;
+ } else if (bytes_read == 0) {
+ read_done_ = true;
+ buffer_state_[index] = kBufferStateEmpty;
+ } else {
+ buffer_valid_size_[index] = bytes_read;
+ buffer_state_[index] = kBufferStateFull;
+ }
+
+ if (bytes_read > 0) {
+ filesystem_size_ -= bytes_read;
+ }
+
+ SpawnAsyncActions();
+
+ if (bytes_read > 0) {
+ if (!hasher_.Update(buffer_[index].data(), bytes_read)) {
+ LOG(ERROR) << "Unable to update the hash.";
+ failed_ = true;
}
+ }
+}
- if (bytes_read == 0) {
- // We're done!
- if (!hasher_.Finalize()) {
- LOG(ERROR) << "Unable to finalize the hash.";
- Cleanup(false, was_cancelled);
- return;
- }
+void FilesystemCopierAction::StaticAsyncReadReadyCallback(
+ GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data) {
+ reinterpret_cast<FilesystemCopierAction*>(user_data)->
+ AsyncReadReadyCallback(source_object, res);
+}
+
+void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object,
+ GAsyncResult *res) {
+ int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1;
+ CHECK(buffer_state_[index] == kBufferStateWriting);
+ buffer_state_[index] = kBufferStateEmpty;
+
+ GError* error = NULL;
+ CHECK(canceller_[index]);
+ cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
+
+ ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
+ res,
+ &error);
+ if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) {
+ if (bytes_written < 0) {
+ LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error);
+ } else {
+ LOG(ERROR) << "Write was short: wrote " << bytes_written
+ << " but expected to write " << buffer_valid_size_[index];
+ }
+ failed_ = true;
+ }
+
+ SpawnAsyncActions();
+}
+
+void FilesystemCopierAction::StaticAsyncWriteReadyCallback(
+ GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data) {
+ reinterpret_cast<FilesystemCopierAction*>(user_data)->
+ AsyncWriteReadyCallback(source_object, res);
+}
+
+void FilesystemCopierAction::SpawnAsyncActions() {
+ bool reading = false;
+ bool writing = false;
+ for (int i = 0; i < 2; i++) {
+ if (buffer_state_[i] == kBufferStateReading) {
+ reading = true;
+ }
+ if (buffer_state_[i] == kBufferStateWriting) {
+ writing = true;
+ }
+ }
+ if (failed_ || cancelled_) {
+ if (!reading && !writing) {
+ Cleanup(false);
+ }
+ return;
+ }
+ for (int i = 0; i < 2; i++) {
+ if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) {
+ g_input_stream_read_async(
+ src_stream_,
+ buffer_[i].data(),
+ GetBytesToRead(),
+ G_PRIORITY_DEFAULT,
+ canceller_[i],
+ &FilesystemCopierAction::StaticAsyncReadReadyCallback,
+ this);
+ reading = true;
+ buffer_state_[i] = kBufferStateReading;
+ } else if (!writing && buffer_state_[i] == kBufferStateFull) {
+ g_output_stream_write_async(
+ dst_stream_,
+ buffer_[i].data(),
+ buffer_valid_size_[i],
+ G_PRIORITY_DEFAULT,
+ canceller_[i],
+ &FilesystemCopierAction::StaticAsyncWriteReadyCallback,
+ this);
+ writing = true;
+ buffer_state_[i] = kBufferStateWriting;
+ }
+ }
+ if (!reading && !writing) {
+ // We're done!
+ if (hasher_.Finalize()) {
LOG(INFO) << "hash: " << hasher_.hash();
if (copying_kernel_install_path_) {
install_plan_.current_kernel_hash = hasher_.raw_hash();
} else {
install_plan_.current_rootfs_hash = hasher_.raw_hash();
}
- Cleanup(true, was_cancelled);
- return;
- }
- if (!hasher_.Update(buffer_.data(), bytes_read)) {
- LOG(ERROR) << "Unable to update the hash.";
- Cleanup(false, was_cancelled);
- return;
- }
- filesystem_size_ -= bytes_read;
-
- // Kick off a write
- read_in_flight_ = false;
- buffer_valid_size_ = bytes_read;
- canceller_ = g_cancellable_new();
- g_output_stream_write_async(
- dst_stream_,
- &buffer_[0],
- bytes_read,
- G_PRIORITY_DEFAULT,
- canceller_,
- &FilesystemCopierAction::StaticAsyncReadyCallback,
- this);
- return;
- }
-
- ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
- res,
- &error);
- if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) {
- if (bytes_written < 0) {
- LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error);
+ Cleanup(true);
} else {
- LOG(ERROR) << "Write was short: wrote " << bytes_written
- << " but expected to write " << buffer_valid_size_;
+ LOG(ERROR) << "Unable to finalize the hash.";
+ Cleanup(false);
}
- Cleanup(false, was_cancelled);
- return;
}
-
- // Kick off a read
- read_in_flight_ = true;
- canceller_ = g_cancellable_new();
- g_input_stream_read_async(
- src_stream_,
- &buffer_[0],
- GetBytesToRead(),
- G_PRIORITY_DEFAULT,
- canceller_,
- &FilesystemCopierAction::StaticAsyncReadyCallback,
- this);
}
void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
@@ -214,7 +293,7 @@
}
int64_t FilesystemCopierAction::GetBytesToRead() {
- return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_);
+ return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_);
}
} // namespace chromeos_update_engine