adb: add IOVector.
An IOVector is a collection of immutable reference counted blocks which
can have its head detached at an arbitrary index. This is extremely
useful for implementing packet-framed protocols like adb on top of a
stream protocol like TCP: a stream reader can read blocks, append them
to the end of the IOVector, and then pull packets off of the front.
This also lends itself naturally towards scatter/gather I/O, which will
enable us to read data from disk and send it across the wire with a
theoretical minimum number of copies in USB, and one extra copy over
TCP.
Since this is basically a generalization of std::deque<Range>, delete
Range and replace its uses with IOVector.
Test: adb_test
Test: wine adb_test.exe
Change-Id: I06561ad0bb25a3a51b378b61d257b5b04b41d9c4
diff --git a/adb/sockets.cpp b/adb/sockets.cpp
index 9a6dcbe..de3215d 100644
--- a/adb/sockets.cpp
+++ b/adb/sockets.cpp
@@ -113,14 +113,14 @@
};
static SocketFlushResult local_socket_flush_incoming(asocket* s) {
- while (!s->packet_queue.empty()) {
- Range& r = s->packet_queue.front();
-
- int rc = adb_write(s->fd, r.data(), r.size());
- if (rc == static_cast<int>(r.size())) {
- s->packet_queue.pop_front();
+ if (!s->packet_queue.empty()) {
+ std::vector<adb_iovec> iov = s->packet_queue.iovecs();
+ ssize_t rc = adb_writev(s->fd, iov.data(), iov.size());
+ if (rc > 0 && static_cast<size_t>(rc) == s->packet_queue.size()) {
+ s->packet_queue.clear();
} else if (rc > 0) {
- r.drop_front(rc);
+ // TODO: Implement a faster drop_front?
+ s->packet_queue.take_front(rc);
fdevent_add(s->fde, FDE_WRITE);
return SocketFlushResult::TryAgain;
} else if (rc == -1 && errno == EAGAIN) {
@@ -130,7 +130,6 @@
// We failed to write, but it's possible that we can still read from the socket.
// Give that a try before giving up.
s->has_write_error = true;
- break;
}
}
@@ -217,8 +216,7 @@
static int local_socket_enqueue(asocket* s, apacket::payload_type data) {
D("LS(%d): enqueue %zu", s->id, data.size());
- Range r(std::move(data));
- s->packet_queue.push_back(std::move(r));
+ s->packet_queue.append(std::move(data));
switch (local_socket_flush_incoming(s)) {
case SocketFlushResult::Destroyed:
return -1;
@@ -622,7 +620,7 @@
D("SS(%d): enqueue %zu", s->id, data.size());
if (s->smart_socket_data.empty()) {
- // TODO: Make this a BlockChain?
+ // TODO: Make this an IOVector?
s->smart_socket_data.assign(data.begin(), data.end());
} else {
std::copy(data.begin(), data.end(), std::back_inserter(s->smart_socket_data));