patch 7.4.1191
Problem: The channel feature isn't working yet.
Solution: Add the connect(), disconnect(), sendexpr() and sendraw()
functions. Add initial documentation. Add a demo server.
diff --git a/src/channel.c b/src/channel.c
index fc738c5..952123c 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -77,11 +77,11 @@
typedef struct readqueue queue_T;
typedef struct {
- sock_T ch_fd; /* the socket, -1 for a closed channel */
- int ch_idx; /* used by channel_poll_setup() */
- queue_T ch_head; /* dummy node, header for circular queue */
+ sock_T ch_fd; /* the socket, -1 for a closed channel */
+ int ch_idx; /* used by channel_poll_setup() */
+ queue_T ch_head; /* dummy node, header for circular queue */
- int ch_error; /* When TRUE an error was reported. Avoids giving
+ int ch_error; /* When TRUE an error was reported. Avoids giving
* pages full of error messages when the other side
* has exited, only mention the first error until the
* connection works again. */
@@ -89,13 +89,19 @@
XtInputId ch_inputHandler; /* Cookie for input */
#endif
#ifdef FEAT_GUI_GTK
- gint ch_inputHandler; /* Cookie for input */
+ gint ch_inputHandler; /* Cookie for input */
#endif
#ifdef FEAT_GUI_W32
- int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */
+ int ch_inputHandler; /* simply ret.value of WSAAsyncSelect() */
#endif
- void (*ch_close_cb)(void); /* callback invoked when channel is closed */
+ void (*ch_close_cb)(void); /* callback for when channel is closed */
+
+ char_u *ch_callback; /* function to call when a msg is not handled */
+ char_u *ch_req_callback; /* function to call for current request */
+ int ch_will_block; /* do not use callback right now */
+
+ int ch_json_mode;
} channel_T;
/*
@@ -190,7 +196,7 @@
channel->ch_inputHandler =
XtAppAddInput((XtAppContext)app_context, channel->ch_fd,
(XtPointer)(XtInputReadMask + XtInputExceptMask),
- messageFromNetbeans, (XtPointer)idx);
+ messageFromNetbeans, (XtPointer)(long)idx);
# else
# ifdef FEAT_GUI_GTK
/*
@@ -383,12 +389,152 @@
}
/*
+ * Set the json mode of channel "idx" to TRUE or FALSE.
+ */
+ void
+channel_set_json_mode(int idx, int json_mode)
+{
+ channels[idx].ch_json_mode = json_mode;
+}
+
+/*
+ * Set the callback for channel "idx".
+ */
+ void
+channel_set_callback(int idx, char_u *callback)
+{
+ vim_free(channels[idx].ch_callback);
+ channels[idx].ch_callback = vim_strsave(callback);
+}
+
+/*
+ * Set the callback for channel "idx" for the next response.
+ */
+ void
+channel_set_req_callback(int idx, char_u *callback)
+{
+ vim_free(channels[idx].ch_req_callback);
+ channels[idx].ch_req_callback = callback == NULL
+ ? NULL : vim_strsave(callback);
+}
+
+/*
+ * Set the flag that the callback for channel "idx" should not be used now.
+ */
+ void
+channel_will_block(int idx)
+{
+ channels[idx].ch_will_block = TRUE;
+}
+
+/*
+ * Decode JSON "msg", which must have the form "[nr, expr]".
+ * Put "expr" in "tv".
+ * Return OK or FAIL.
+ */
+ int
+channel_decode_json(char_u *msg, typval_T *tv)
+{
+ js_read_T reader;
+ typval_T listtv;
+
+ reader.js_buf = msg;
+ reader.js_eof = TRUE;
+ reader.js_used = 0;
+ json_decode(&reader, &listtv);
+ /* TODO: use the sequence number */
+ if (listtv.v_type == VAR_LIST
+ && listtv.vval.v_list->lv_len == 2
+ && listtv.vval.v_list->lv_first->li_tv.v_type == VAR_NUMBER)
+ {
+ /* Move the item from the list and then change the type to avoid the
+ * item being freed. */
+ *tv = listtv.vval.v_list->lv_last->li_tv;
+ listtv.vval.v_list->lv_last->li_tv.v_type = VAR_NUMBER;
+ list_unref(listtv.vval.v_list);
+ return OK;
+ }
+
+ /* give error message? */
+ clear_tv(&listtv);
+ return FAIL;
+}
+
+/*
+ * Invoke the "callback" on channel "idx".
+ */
+ static void
+invoke_callback(int idx, char_u *callback)
+{
+ typval_T argv[3];
+ typval_T rettv;
+ int dummy;
+ char_u *msg;
+ int ret = OK;
+
+ argv[0].v_type = VAR_NUMBER;
+ argv[0].vval.v_number = idx;
+
+ /* Concatenate everything into one buffer.
+ * TODO: only read what the callback will use.
+ * TODO: avoid multiple allocations. */
+ while (channel_collapse(idx) == OK)
+ ;
+ msg = channel_get(idx);
+
+ if (channels[idx].ch_json_mode)
+ ret = channel_decode_json(msg, &argv[1]);
+ else
+ {
+ argv[1].v_type = VAR_STRING;
+ argv[1].vval.v_string = msg;
+ }
+
+ if (ret == OK)
+ {
+ call_func(callback, (int)STRLEN(callback),
+ &rettv, 2, argv, 0L, 0L, &dummy, TRUE, NULL);
+ /* If an echo command was used the cursor needs to be put back where
+ * it belongs. */
+ setcursor();
+ cursor_on();
+ out_flush();
+ }
+ vim_free(msg);
+}
+
+/*
+ * Invoke a callback for channel "idx" if needed.
+ */
+ static void
+may_invoke_callback(int idx)
+{
+ if (channels[idx].ch_will_block)
+ return;
+ if (channel_peek(idx) == NULL)
+ return;
+
+ if (channels[idx].ch_req_callback != NULL)
+ {
+ /* invoke the one-time callback */
+ invoke_callback(idx, channels[idx].ch_req_callback);
+ channels[idx].ch_req_callback = NULL;
+ return;
+ }
+
+ if (channels[idx].ch_callback != NULL)
+ /* invoke the channel callback */
+ invoke_callback(idx, channels[idx].ch_callback);
+}
+
+/*
* Return TRUE when channel "idx" is open.
+ * Also returns FALSE or invalid "idx".
*/
int
channel_is_open(int idx)
{
- return channels[idx].ch_fd >= 0;
+ return idx >= 0 && idx < channel_count && channels[idx].ch_fd >= 0;
}
/*
@@ -407,6 +553,8 @@
#ifdef FEAT_GUI
channel_gui_unregister(idx);
#endif
+ vim_free(channel->ch_callback);
+ channel->ch_callback = NULL;
}
}
@@ -551,7 +699,57 @@
#define MAXMSGSIZE 4096
/*
- * Read from channel "idx". The data is put in the read queue.
+ * Check for reading from "fd" with "timeout" msec.
+ * Return FAIL when there is nothing to read.
+ */
+ static int
+channel_wait(int fd, int timeout)
+{
+#ifdef HAVE_SELECT
+ struct timeval tval;
+ fd_set rfds;
+ int ret;
+
+ FD_ZERO(&rfds);
+ FD_SET(fd, &rfds);
+ tval.tv_sec = timeout / 1000;
+ tval.tv_usec = (timeout % 1000) * 1000;
+ for (;;)
+ {
+ ret = select(fd + 1, &rfds, NULL, NULL, &tval);
+# ifdef EINTR
+ if (ret == -1 && errno == EINTR)
+ continue;
+# endif
+ if (ret <= 0)
+ return FAIL;
+ break;
+ }
+#else
+ struct pollfd fds;
+
+ fds.fd = fd;
+ fds.events = POLLIN;
+ if (poll(&fds, 1, timeout) <= 0)
+ return FAIL;
+#endif
+ return OK;
+}
+
+/*
+ * Return a unique ID to be used in a message.
+ */
+ int
+channel_get_id()
+{
+ static int next_id = 1;
+
+ return next_id++;
+}
+
+/*
+ * Read from channel "idx" for as long as there is something to read.
+ * The data is put in the read queue.
*/
void
channel_read(int idx)
@@ -559,14 +757,6 @@
static char_u *buf = NULL;
int len = 0;
int readlen = 0;
-#ifdef HAVE_SELECT
- struct timeval tval;
- fd_set rfds;
-#else
-# ifdef HAVE_POLL
- struct pollfd fds;
-# endif
-#endif
channel_T *channel = &channels[idx];
if (channel->ch_fd < 0)
@@ -588,21 +778,8 @@
* MAXMSGSIZE long. */
for (;;)
{
-#ifdef HAVE_SELECT
- FD_ZERO(&rfds);
- FD_SET(channel->ch_fd, &rfds);
- tval.tv_sec = 0;
- tval.tv_usec = 0;
- if (select(channel->ch_fd + 1, &rfds, NULL, NULL, &tval) <= 0)
+ if (channel_wait(channel->ch_fd, 0) == FAIL)
break;
-#else
-# ifdef HAVE_POLL
- fds.fd = channel->ch_fd;
- fds.events = POLLIN;
- if (poll(&fds, 1, 0) <= 0)
- break;
-# endif
-#endif
len = sock_read(channel->ch_fd, buf, MAXMSGSIZE);
if (len <= 0)
break; /* error or nothing more to read */
@@ -641,12 +818,44 @@
}
}
+ may_invoke_callback(idx);
+
#if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
if (CH_HAS_GUI && gtk_main_level() > 0)
gtk_main_quit();
#endif
}
+/*
+ * Read from channel "idx". Blocks until there is something to read or the
+ * timeout expires.
+ * Returns what was read in allocated memory.
+ * Returns NULL in case of error or timeout.
+ */
+ char_u *
+channel_read_block(int idx)
+{
+ if (channel_peek(idx) == NULL)
+ {
+ /* Wait for up to 2 seconds.
+ * TODO: use timeout set on the channel. */
+ if (channel_wait(channels[idx].ch_fd, 2000) == FAIL)
+ {
+ channels[idx].ch_will_block = FALSE;
+ return NULL;
+ }
+ channel_read(idx);
+ }
+
+ /* Concatenate everything into one buffer.
+ * TODO: avoid multiple allocations. */
+ while (channel_collapse(idx) == OK)
+ ;
+
+ channels[idx].ch_will_block = FALSE;
+ return channel_get(idx);
+}
+
# if defined(FEAT_GUI_W32) || defined(PROTO)
/*
* Lookup the channel index from the socket.
@@ -668,8 +877,9 @@
/*
* Write "buf" (NUL terminated string) to channel "idx".
* When "fun" is not NULL an error message might be given.
+ * Return FAIL or OK.
*/
- void
+ int
channel_send(int idx, char_u *buf, char *fun)
{
channel_T *channel = &channels[idx];
@@ -683,8 +893,10 @@
EMSG2("E630: %s(): write while not connected", fun);
}
channel->ch_error = TRUE;
+ return FAIL;
}
- else if (sock_write(channel->ch_fd, buf, len) != len)
+
+ if (sock_write(channel->ch_fd, buf, len) != len)
{
if (!channel->ch_error && fun != NULL)
{
@@ -692,9 +904,11 @@
EMSG2("E631: %s(): write failed", fun);
}
channel->ch_error = TRUE;
+ return FAIL;
}
- else
- channel->ch_error = FALSE;
+
+ channel->ch_error = FALSE;
+ return OK;
}
# if (defined(UNIX) && !defined(HAVE_SELECT)) || defined(PROTO)