0
0
mirror of https://github.com/vim/vim.git synced 2025-07-26 11:04:33 -04:00

patch 8.0.0027

Problem:    A channel is closed when reading on stderr or stdout fails, but
            there may still be something to read on another part.
Solution:   Turn ch_to_be_closed into a bitfield. (Ozaki Kiichi)
This commit is contained in:
Bram Moolenaar 2016-10-09 17:28:01 +02:00
parent 9b45794818
commit dc0ccaee68
6 changed files with 161 additions and 135 deletions

View File

@ -54,7 +54,7 @@
# define fd_close(sd) close(sd) # define fd_close(sd) close(sd)
#endif #endif
static void channel_read(channel_T *channel, int part, char *func); static void channel_read(channel_T *channel, ch_part_T part, char *func);
/* Whether a redraw is needed for appending a line to a buffer. */ /* Whether a redraw is needed for appending a line to a buffer. */
static int channel_need_redraw = FALSE; static int channel_need_redraw = FALSE;
@ -309,7 +309,7 @@ static int next_ch_id = 0;
channel_T * channel_T *
add_channel(void) add_channel(void)
{ {
int part; ch_part_T part;
channel_T *channel = (channel_T *)alloc_clear((int)sizeof(channel_T)); channel_T *channel = (channel_T *)alloc_clear((int)sizeof(channel_T));
if (channel == NULL) if (channel == NULL)
@ -318,7 +318,7 @@ add_channel(void)
channel->ch_id = next_ch_id++; channel->ch_id = next_ch_id++;
ch_log(channel, "Created channel"); ch_log(channel, "Created channel");
for (part = PART_SOCK; part <= PART_IN; ++part) for (part = PART_SOCK; part < PART_COUNT; ++part)
{ {
channel->ch_part[part].ch_fd = INVALID_FD; channel->ch_part[part].ch_fd = INVALID_FD;
#ifdef FEAT_GUI_X11 #ifdef FEAT_GUI_X11
@ -421,9 +421,7 @@ channel_free(channel_T *channel)
if (!in_free_unref_items) if (!in_free_unref_items)
{ {
if (safe_to_invoke_callback == 0) if (safe_to_invoke_callback == 0)
{
channel->ch_to_be_freed = TRUE; channel->ch_to_be_freed = TRUE;
}
else else
{ {
channel_free_contents(channel); channel_free_contents(channel);
@ -511,7 +509,7 @@ free_unused_channels(int copyID, int mask)
channel_read_fd(int fd) channel_read_fd(int fd)
{ {
channel_T *channel; channel_T *channel;
int part; ch_part_T part;
channel = channel_fd2channel(fd, &part); channel = channel_fd2channel(fd, &part);
if (channel == NULL) if (channel == NULL)
@ -557,7 +555,7 @@ messageFromServer(gpointer clientData,
#endif #endif
static void static void
channel_gui_register_one(channel_T *channel, int part) channel_gui_register_one(channel_T *channel, ch_part_T part)
{ {
if (!CH_HAS_GUI) if (!CH_HAS_GUI)
return; return;
@ -627,7 +625,7 @@ channel_gui_register_all(void)
} }
static void static void
channel_gui_unregister_one(channel_T *channel, int part) channel_gui_unregister_one(channel_T *channel, ch_part_T part)
{ {
# ifdef FEAT_GUI_X11 # ifdef FEAT_GUI_X11
if (channel->ch_part[part].ch_inputHandler != (XtInputId)NULL) if (channel->ch_part[part].ch_inputHandler != (XtInputId)NULL)
@ -653,7 +651,7 @@ channel_gui_unregister_one(channel_T *channel, int part)
static void static void
channel_gui_unregister(channel_T *channel) channel_gui_unregister(channel_T *channel)
{ {
int part; ch_part_T part;
for (part = PART_SOCK; part < PART_IN; ++part) for (part = PART_SOCK; part < PART_IN; ++part)
channel_gui_unregister_one(channel, part); channel_gui_unregister_one(channel, part);
@ -928,6 +926,7 @@ channel_open(
channel->ch_nb_close_cb = nb_close_cb; channel->ch_nb_close_cb = nb_close_cb;
channel->ch_hostname = (char *)vim_strsave((char_u *)hostname); channel->ch_hostname = (char *)vim_strsave((char_u *)hostname);
channel->ch_port = port_in; channel->ch_port = port_in;
channel->ch_to_be_closed |= (1 << PART_SOCK);
#ifdef FEAT_GUI #ifdef FEAT_GUI
channel_gui_register_one(channel, PART_SOCK); channel_gui_register_one(channel, PART_SOCK);
@ -998,12 +997,19 @@ theend:
} }
static void static void
may_close_part(sock_T *fd) ch_close_part(channel_T *channel, ch_part_T part)
{ {
sock_T *fd = &channel->ch_part[part].ch_fd;
if (*fd != INVALID_FD) if (*fd != INVALID_FD)
{ {
if (part == PART_SOCK)
sock_close(*fd);
else
fd_close(*fd); fd_close(*fd);
*fd = INVALID_FD; *fd = INVALID_FD;
channel->ch_to_be_closed &= ~(1 << part);
} }
} }
@ -1012,7 +1018,7 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
{ {
if (in != INVALID_FD) if (in != INVALID_FD)
{ {
may_close_part(&channel->CH_IN_FD); ch_close_part(channel, PART_IN);
channel->CH_IN_FD = in; channel->CH_IN_FD = in;
} }
if (out != INVALID_FD) if (out != INVALID_FD)
@ -1020,8 +1026,9 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
# if defined(FEAT_GUI) # if defined(FEAT_GUI)
channel_gui_unregister_one(channel, PART_OUT); channel_gui_unregister_one(channel, PART_OUT);
# endif # endif
may_close_part(&channel->CH_OUT_FD); ch_close_part(channel, PART_OUT);
channel->CH_OUT_FD = out; channel->CH_OUT_FD = out;
channel->ch_to_be_closed |= (1 << PART_OUT);
# if defined(FEAT_GUI) # if defined(FEAT_GUI)
channel_gui_register_one(channel, PART_OUT); channel_gui_register_one(channel, PART_OUT);
# endif # endif
@ -1031,8 +1038,9 @@ channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err)
# if defined(FEAT_GUI) # if defined(FEAT_GUI)
channel_gui_unregister_one(channel, PART_ERR); channel_gui_unregister_one(channel, PART_ERR);
# endif # endif
may_close_part(&channel->CH_ERR_FD); ch_close_part(channel, PART_ERR);
channel->CH_ERR_FD = err; channel->CH_ERR_FD = err;
channel->ch_to_be_closed |= (1 << PART_ERR);
# if defined(FEAT_GUI) # if defined(FEAT_GUI)
channel_gui_register_one(channel, PART_ERR); channel_gui_register_one(channel, PART_ERR);
# endif # endif
@ -1151,10 +1159,10 @@ set_callback(
void void
channel_set_options(channel_T *channel, jobopt_T *opt) channel_set_options(channel_T *channel, jobopt_T *opt)
{ {
int part; ch_part_T part;
if (opt->jo_set & JO_MODE) if (opt->jo_set & JO_MODE)
for (part = PART_SOCK; part <= PART_IN; ++part) for (part = PART_SOCK; part < PART_COUNT; ++part)
channel->ch_part[part].ch_mode = opt->jo_mode; channel->ch_part[part].ch_mode = opt->jo_mode;
if (opt->jo_set & JO_IN_MODE) if (opt->jo_set & JO_IN_MODE)
channel->ch_part[PART_IN].ch_mode = opt->jo_in_mode; channel->ch_part[PART_IN].ch_mode = opt->jo_in_mode;
@ -1164,7 +1172,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
channel->ch_part[PART_ERR].ch_mode = opt->jo_err_mode; channel->ch_part[PART_ERR].ch_mode = opt->jo_err_mode;
if (opt->jo_set & JO_TIMEOUT) if (opt->jo_set & JO_TIMEOUT)
for (part = PART_SOCK; part <= PART_IN; ++part) for (part = PART_SOCK; part < PART_COUNT; ++part)
channel->ch_part[part].ch_timeout = opt->jo_timeout; channel->ch_part[part].ch_timeout = opt->jo_timeout;
if (opt->jo_set & JO_OUT_TIMEOUT) if (opt->jo_set & JO_OUT_TIMEOUT)
channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout; channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout;
@ -1282,7 +1290,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
void void
channel_set_req_callback( channel_set_req_callback(
channel_T *channel, channel_T *channel,
int part, ch_part_T part,
char_u *callback, char_u *callback,
partial_T *partial, partial_T *partial,
int id) int id)
@ -1448,7 +1456,7 @@ channel_write_in(channel_T *channel)
ch_log(channel, "Finished writing all lines to channel"); ch_log(channel, "Finished writing all lines to channel");
/* Close the pipe/socket, so that the other side gets EOF. */ /* Close the pipe/socket, so that the other side gets EOF. */
may_close_part(&channel->CH_IN_FD); ch_close_part(channel, PART_IN);
} }
else else
ch_logn(channel, "Still %d more lines to write", ch_logn(channel, "Still %d more lines to write",
@ -1462,10 +1470,10 @@ channel_write_in(channel_T *channel)
channel_buffer_free(buf_T *buf) channel_buffer_free(buf_T *buf)
{ {
channel_T *channel; channel_T *channel;
int part; ch_part_T part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next) for (channel = first_channel; channel != NULL; channel = channel->ch_next)
for (part = PART_SOCK; part <= PART_IN; ++part) for (part = PART_SOCK; part < PART_COUNT; ++part)
{ {
chanpart_T *ch_part = &channel->ch_part[part]; chanpart_T *ch_part = &channel->ch_part[part];
@ -1574,7 +1582,7 @@ invoke_callback(channel_T *channel, char_u *callback, partial_T *partial,
* Returns NULL if there is nothing. * Returns NULL if there is nothing.
*/ */
readq_T * readq_T *
channel_peek(channel_T *channel, int part) channel_peek(channel_T *channel, ch_part_T part)
{ {
readq_T *head = &channel->ch_part[part].ch_head; readq_T *head = &channel->ch_part[part].ch_head;
@ -1604,7 +1612,7 @@ channel_first_nl(readq_T *node)
* Returns NULL if there is nothing. * Returns NULL if there is nothing.
*/ */
char_u * char_u *
channel_get(channel_T *channel, int part) channel_get(channel_T *channel, ch_part_T part)
{ {
readq_T *head = &channel->ch_part[part].ch_head; readq_T *head = &channel->ch_part[part].ch_head;
readq_T *node = head->rq_next; readq_T *node = head->rq_next;
@ -1628,7 +1636,7 @@ channel_get(channel_T *channel, int part)
* Replaces NUL bytes with NL. * Replaces NUL bytes with NL.
*/ */
static char_u * static char_u *
channel_get_all(channel_T *channel, int part) channel_get_all(channel_T *channel, ch_part_T part)
{ {
readq_T *head = &channel->ch_part[part].ch_head; readq_T *head = &channel->ch_part[part].ch_head;
readq_T *node = head->rq_next; readq_T *node = head->rq_next;
@ -1677,7 +1685,7 @@ channel_get_all(channel_T *channel, int part)
* Caller must check these bytes are available. * Caller must check these bytes are available.
*/ */
void void
channel_consume(channel_T *channel, int part, int len) channel_consume(channel_T *channel, ch_part_T part, int len)
{ {
readq_T *head = &channel->ch_part[part].ch_head; readq_T *head = &channel->ch_part[part].ch_head;
readq_T *node = head->rq_next; readq_T *node = head->rq_next;
@ -1693,7 +1701,7 @@ channel_consume(channel_T *channel, int part, int len)
* When "want_nl" is TRUE collapse more buffers until a NL is found. * When "want_nl" is TRUE collapse more buffers until a NL is found.
*/ */
int int
channel_collapse(channel_T *channel, int part, int want_nl) channel_collapse(channel_T *channel, ch_part_T part, int want_nl)
{ {
readq_T *head = &channel->ch_part[part].ch_head; readq_T *head = &channel->ch_part[part].ch_head;
readq_T *node = head->rq_next; readq_T *node = head->rq_next;
@ -1753,7 +1761,7 @@ channel_collapse(channel_T *channel, int part, int want_nl)
* Returns OK or FAIL. * Returns OK or FAIL.
*/ */
static int static int
channel_save(channel_T *channel, int part, char_u *buf, int len, channel_save(channel_T *channel, ch_part_T part, char_u *buf, int len,
int prepend, char *lead) int prepend, char *lead)
{ {
readq_T *node; readq_T *node;
@ -1828,7 +1836,7 @@ channel_save(channel_T *channel, int part, char_u *buf, int len,
channel_fill(js_read_T *reader) channel_fill(js_read_T *reader)
{ {
channel_T *channel = (channel_T *)reader->js_cookie; channel_T *channel = (channel_T *)reader->js_cookie;
int part = reader->js_cookie_arg; ch_part_T part = reader->js_cookie_arg;
char_u *next = channel_get(channel, part); char_u *next = channel_get(channel, part);
int unused; int unused;
int len; int len;
@ -1866,7 +1874,7 @@ channel_fill(js_read_T *reader)
* Return TRUE if there is more to read. * Return TRUE if there is more to read.
*/ */
static int static int
channel_parse_json(channel_T *channel, int part) channel_parse_json(channel_T *channel, ch_part_T part)
{ {
js_read_T reader; js_read_T reader;
typval_T listtv; typval_T listtv;
@ -2046,7 +2054,7 @@ remove_json_node(jsonq_T *head, jsonq_T *node)
* Return FAIL otherwise. * Return FAIL otherwise.
*/ */
static int static int
channel_get_json(channel_T *channel, int part, int id, typval_T **rettv) channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
{ {
jsonq_T *head = &channel->ch_part[part].ch_json_head; jsonq_T *head = &channel->ch_part[part].ch_json_head;
jsonq_T *item = head->jq_next; jsonq_T *item = head->jq_next;
@ -2080,7 +2088,7 @@ channel_get_json(channel_T *channel, int part, int id, typval_T **rettv)
* "argv[1]" etc. have further arguments, type is VAR_UNKNOWN if missing. * "argv[1]" etc. have further arguments, type is VAR_UNKNOWN if missing.
*/ */
static void static void
channel_exe_cmd(channel_T *channel, int part, typval_T *argv) channel_exe_cmd(channel_T *channel, ch_part_T part, typval_T *argv)
{ {
char_u *cmd = argv[0].vval.v_string; char_u *cmd = argv[0].vval.v_string;
char_u *arg; char_u *arg;
@ -2237,7 +2245,7 @@ invoke_one_time_callback(
} }
static void static void
append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, int part) append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, ch_part_T part)
{ {
buf_T *save_curbuf = curbuf; buf_T *save_curbuf = curbuf;
linenr_T lnum = buffer->b_ml.ml_line_count; linenr_T lnum = buffer->b_ml.ml_line_count;
@ -2332,7 +2340,7 @@ append_to_buffer(buf_T *buffer, char_u *msg, channel_T *channel, int part)
} }
static void static void
drop_messages(channel_T *channel, int part) drop_messages(channel_T *channel, ch_part_T part)
{ {
char_u *msg; char_u *msg;
@ -2349,7 +2357,7 @@ drop_messages(channel_T *channel, int part)
* Return TRUE when a message was handled, there might be another one. * Return TRUE when a message was handled, there might be another one.
*/ */
static int static int
may_invoke_callback(channel_T *channel, int part) may_invoke_callback(channel_T *channel, ch_part_T part)
{ {
char_u *msg = NULL; char_u *msg = NULL;
typval_T *listtv = NULL; typval_T *listtv = NULL;
@ -2596,7 +2604,7 @@ channel_is_open(channel_T *channel)
* Return TRUE if "channel" has JSON or other typeahead. * Return TRUE if "channel" has JSON or other typeahead.
*/ */
static int static int
channel_has_readahead(channel_T *channel, int part) channel_has_readahead(channel_T *channel, ch_part_T part)
{ {
ch_mode_T ch_mode = channel->ch_part[part].ch_mode; ch_mode_T ch_mode = channel->ch_part[part].ch_mode;
@ -2617,7 +2625,7 @@ channel_has_readahead(channel_T *channel, int part)
char * char *
channel_status(channel_T *channel, int req_part) channel_status(channel_T *channel, int req_part)
{ {
int part; ch_part_T part;
int has_readahead = FALSE; int has_readahead = FALSE;
if (channel == NULL) if (channel == NULL)
@ -2640,7 +2648,7 @@ channel_status(channel_T *channel, int req_part)
{ {
if (channel_is_open(channel)) if (channel_is_open(channel))
return "open"; return "open";
for (part = PART_SOCK; part <= PART_ERR; ++part) for (part = PART_SOCK; part < PART_IN; ++part)
if (channel_has_readahead(channel, part)) if (channel_has_readahead(channel, part))
{ {
has_readahead = TRUE; has_readahead = TRUE;
@ -2654,7 +2662,7 @@ channel_status(channel_T *channel, int req_part)
} }
static void static void
channel_part_info(channel_T *channel, dict_T *dict, char *name, int part) channel_part_info(channel_T *channel, dict_T *dict, char *name, ch_part_T part)
{ {
chanpart_T *chanpart = &channel->ch_part[part]; chanpart_T *chanpart = &channel->ch_part[part];
char namebuf[20]; /* longest is "sock_timeout" */ char namebuf[20]; /* longest is "sock_timeout" */
@ -2736,28 +2744,24 @@ channel_close(channel_T *channel, int invoke_close_cb)
channel_gui_unregister(channel); channel_gui_unregister(channel);
#endif #endif
if (channel->CH_SOCK_FD != INVALID_FD) ch_close_part(channel, PART_SOCK);
{ ch_close_part(channel, PART_IN);
sock_close(channel->CH_SOCK_FD); ch_close_part(channel, PART_OUT);
channel->CH_SOCK_FD = INVALID_FD; ch_close_part(channel, PART_ERR);
}
may_close_part(&channel->CH_IN_FD);
may_close_part(&channel->CH_OUT_FD);
may_close_part(&channel->CH_ERR_FD);
if (invoke_close_cb && channel->ch_close_cb != NULL) if (invoke_close_cb && channel->ch_close_cb != NULL)
{ {
typval_T argv[1]; typval_T argv[1];
typval_T rettv; typval_T rettv;
int dummy; int dummy;
int part; ch_part_T part;
/* Invoke callbacks before the close callback, since it's weird to /* Invoke callbacks before the close callback, since it's weird to
* first invoke the close callback. Increment the refcount to avoid * first invoke the close callback. Increment the refcount to avoid
* the channel being freed halfway. */ * the channel being freed halfway. */
++channel->ch_refcount; ++channel->ch_refcount;
ch_log(channel, "Invoking callbacks before closing"); ch_log(channel, "Invoking callbacks before closing");
for (part = PART_SOCK; part <= PART_ERR; ++part) for (part = PART_SOCK; part < PART_IN; ++part)
while (may_invoke_callback(channel, part)) while (may_invoke_callback(channel, part))
; ;
@ -2789,7 +2793,7 @@ channel_close(channel_T *channel, int invoke_close_cb)
} }
/* any remaining messages are useless now */ /* any remaining messages are useless now */
for (part = PART_SOCK; part <= PART_ERR; ++part) for (part = PART_SOCK; part < PART_IN; ++part)
drop_messages(channel, part); drop_messages(channel, part);
} }
@ -2802,14 +2806,14 @@ channel_close(channel_T *channel, int invoke_close_cb)
void void
channel_close_in(channel_T *channel) channel_close_in(channel_T *channel)
{ {
may_close_part(&channel->CH_IN_FD); ch_close_part(channel, PART_IN);
} }
/* /*
* Clear the read buffer on "channel"/"part". * Clear the read buffer on "channel"/"part".
*/ */
static void static void
channel_clear_one(channel_T *channel, int part) channel_clear_one(channel_T *channel, ch_part_T part)
{ {
jsonq_T *json_head = &channel->ch_part[part].ch_json_head; jsonq_T *json_head = &channel->ch_part[part].ch_json_head;
cbq_T *cb_head = &channel->ch_part[part].ch_cb_head; cbq_T *cb_head = &channel->ch_part[part].ch_cb_head;
@ -3043,11 +3047,20 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
} }
static void static void
channel_close_on_error(channel_T *channel, char *func) ch_close_part_on_error(
channel_T *channel, ch_part_T part, int is_err, char *func)
{ {
char msgbuf[80];
vim_snprintf(msgbuf, sizeof(msgbuf),
"%%s(): Read %s from ch_part[%d], closing",
(is_err ? "error" : "EOF"), part);
if (is_err)
/* Do not call emsg(), most likely the other end just exited. */ /* Do not call emsg(), most likely the other end just exited. */
ch_errors(channel, "%s(): Cannot read from channel, will close it soon", ch_errors(channel, msgbuf, func);
func); else
ch_logs(channel, msgbuf, func);
/* Queue a "DETACH" netbeans message in the command queue in order to /* Queue a "DETACH" netbeans message in the command queue in order to
* terminate the netbeans session later. Do not end the session here * terminate the netbeans session later. Do not end the session here
@ -3064,21 +3077,20 @@ channel_close_on_error(channel_T *channel, char *func)
channel_save(channel, PART_SOCK, (char_u *)DETACH_MSG_RAW, channel_save(channel, PART_SOCK, (char_u *)DETACH_MSG_RAW,
(int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT "); (int)STRLEN(DETACH_MSG_RAW), FALSE, "PUT ");
/* When reading from stdout is not possible, assume the other side has /* When reading is not possible close this part of the channel. Don't
* died. Don't close the channel right away, it may be the wrong moment * close the channel yet, there may be something to read on another part. */
* to invoke callbacks. */ ch_close_part(channel, part);
channel->ch_to_be_closed = TRUE;
#ifdef FEAT_GUI #ifdef FEAT_GUI
/* Stop listening to GUI events right away. */ /* Stop listening to GUI events right away. */
channel_gui_unregister(channel); channel_gui_unregister_one(channel, part);
#endif #endif
} }
static void static void
channel_close_now(channel_T *channel) channel_close_now(channel_T *channel)
{ {
ch_log(channel, "Closing channel because of previous read error"); ch_log(channel, "Closing channel because all readable fds are closed");
channel_close(channel, TRUE); channel_close(channel, TRUE);
if (channel->ch_nb_close_cb != NULL) if (channel->ch_nb_close_cb != NULL)
(*channel->ch_nb_close_cb)(); (*channel->ch_nb_close_cb)();
@ -3090,7 +3102,7 @@ channel_close_now(channel_T *channel)
* The data is put in the read queue. No callbacks are invoked here. * The data is put in the read queue. No callbacks are invoked here.
*/ */
static void static void
channel_read(channel_T *channel, int part, char *func) channel_read(channel_T *channel, ch_part_T part, char *func)
{ {
static char_u *buf = NULL; static char_u *buf = NULL;
int len = 0; int len = 0;
@ -3098,14 +3110,11 @@ channel_read(channel_T *channel, int part, char *func)
sock_T fd; sock_T fd;
int use_socket = FALSE; int use_socket = FALSE;
/* If we detected a read error don't try reading again. */
if (channel->ch_to_be_closed)
return;
fd = channel->ch_part[part].ch_fd; fd = channel->ch_part[part].ch_fd;
if (fd == INVALID_FD) if (fd == INVALID_FD)
{ {
ch_error(channel, "channel_read() called while socket is closed"); ch_errors(channel, "channel_read() called while %s part is closed",
part_names[part]);
return; return;
} }
use_socket = fd == channel->CH_SOCK_FD; use_socket = fd == channel->CH_SOCK_FD;
@ -3141,7 +3150,7 @@ channel_read(channel_T *channel, int part, char *func)
/* Reading a disconnection (readlen == 0), or an error. */ /* Reading a disconnection (readlen == 0), or an error. */
if (readlen <= 0) if (readlen <= 0)
channel_close_on_error(channel, func); ch_close_part_on_error(channel, part, (len < 0), func);
#if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK) #if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
/* signal the main loop that there is something to read */ /* signal the main loop that there is something to read */
@ -3157,7 +3166,7 @@ channel_read(channel_T *channel, int part, char *func)
* Returns NULL in case of error or timeout. * Returns NULL in case of error or timeout.
*/ */
char_u * char_u *
channel_read_block(channel_T *channel, int part, int timeout) channel_read_block(channel_T *channel, ch_part_T part, int timeout)
{ {
char_u *buf; char_u *buf;
char_u *msg; char_u *msg;
@ -3237,7 +3246,7 @@ channel_read_block(channel_T *channel, int part, int timeout)
int int
channel_read_json_block( channel_read_json_block(
channel_T *channel, channel_T *channel,
int part, ch_part_T part,
int timeout_arg, int timeout_arg,
int id, int id,
typval_T **rettv) typval_T **rettv)
@ -3323,7 +3332,7 @@ channel_read_json_block(
common_channel_read(typval_T *argvars, typval_T *rettv, int raw) common_channel_read(typval_T *argvars, typval_T *rettv, int raw)
{ {
channel_T *channel; channel_T *channel;
int part = -1; ch_part_T part = PART_COUNT;
jobopt_T opt; jobopt_T opt;
int mode; int mode;
int timeout; int timeout;
@ -3344,7 +3353,7 @@ common_channel_read(typval_T *argvars, typval_T *rettv, int raw)
channel = get_channel_arg(&argvars[0], TRUE, TRUE, part); channel = get_channel_arg(&argvars[0], TRUE, TRUE, part);
if (channel != NULL) if (channel != NULL)
{ {
if (part < 0) if (part == PART_COUNT)
part = channel_part_read(channel); part = channel_part_read(channel);
mode = channel_get_mode(channel, part); mode = channel_get_mode(channel, part);
timeout = channel_get_timeout(channel, part); timeout = channel_get_timeout(channel, part);
@ -3382,10 +3391,10 @@ theend:
* Returns NULL when the socket isn't found. * Returns NULL when the socket isn't found.
*/ */
channel_T * channel_T *
channel_fd2channel(sock_T fd, int *partp) channel_fd2channel(sock_T fd, ch_part_T *partp)
{ {
channel_T *channel; channel_T *channel;
int part; ch_part_T part;
if (fd != INVALID_FD) if (fd != INVALID_FD)
for (channel = first_channel; channel != NULL; for (channel = first_channel; channel != NULL;
@ -3411,17 +3420,13 @@ channel_fd2channel(sock_T fd, int *partp)
channel_handle_events(void) channel_handle_events(void)
{ {
channel_T *channel; channel_T *channel;
int part; ch_part_T part;
sock_T fd; sock_T fd;
for (channel = first_channel; channel != NULL; channel = channel->ch_next) for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{ {
/* If we detected a read error don't try reading again. */
if (channel->ch_to_be_closed)
continue;
/* check the socket and pipes */ /* check the socket and pipes */
for (part = PART_SOCK; part <= PART_ERR; ++part) for (part = PART_SOCK; part < PART_IN; ++part)
{ {
fd = channel->ch_part[part].ch_fd; fd = channel->ch_part[part].ch_fd;
if (fd != INVALID_FD) if (fd != INVALID_FD)
@ -3431,7 +3436,8 @@ channel_handle_events(void)
if (r == CW_READY) if (r == CW_READY)
channel_read(channel, part, "channel_handle_events"); channel_read(channel, part, "channel_handle_events");
else if (r == CW_ERROR) else if (r == CW_ERROR)
channel_close_on_error(channel, "channel_handle_events()"); ch_close_part_on_error(channel, part, TRUE,
"channel_handle_events");
} }
} }
} }
@ -3444,7 +3450,7 @@ channel_handle_events(void)
* Return FAIL or OK. * Return FAIL or OK.
*/ */
int int
channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun) channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun)
{ {
int res; int res;
sock_T fd; sock_T fd;
@ -3496,7 +3502,7 @@ channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun)
* Sets "part_read" to the read fd. * Sets "part_read" to the read fd.
* Otherwise returns NULL. * Otherwise returns NULL.
*/ */
channel_T * static channel_T *
send_common( send_common(
typval_T *argvars, typval_T *argvars,
char_u *text, char_u *text,
@ -3504,10 +3510,10 @@ send_common(
int eval, int eval,
jobopt_T *opt, jobopt_T *opt,
char *fun, char *fun,
int *part_read) ch_part_T *part_read)
{ {
channel_T *channel; channel_T *channel;
int part_send; ch_part_T part_send;
clear_job_options(opt); clear_job_options(opt);
channel = get_channel_arg(&argvars[0], TRUE, FALSE, 0); channel = get_channel_arg(&argvars[0], TRUE, FALSE, 0);
@ -3550,8 +3556,8 @@ ch_expr_common(typval_T *argvars, typval_T *rettv, int eval)
channel_T *channel; channel_T *channel;
int id; int id;
ch_mode_T ch_mode; ch_mode_T ch_mode;
int part_send; ch_part_T part_send;
int part_read; ch_part_T part_read;
jobopt_T opt; jobopt_T opt;
int timeout; int timeout;
@ -3610,7 +3616,7 @@ ch_raw_common(typval_T *argvars, typval_T *rettv, int eval)
char_u buf[NUMBUFLEN]; char_u buf[NUMBUFLEN];
char_u *text; char_u *text;
channel_T *channel; channel_T *channel;
int part_read; ch_part_T part_read;
jobopt_T opt; jobopt_T opt;
int timeout; int timeout;
@ -3644,7 +3650,7 @@ channel_poll_setup(int nfd_in, void *fds_in)
int nfd = nfd_in; int nfd = nfd_in;
channel_T *channel; channel_T *channel;
struct pollfd *fds = fds_in; struct pollfd *fds = fds_in;
int part; ch_part_T part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next) for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{ {
@ -3678,7 +3684,7 @@ channel_poll_check(int ret_in, void *fds_in)
int ret = ret_in; int ret = ret_in;
channel_T *channel; channel_T *channel;
struct pollfd *fds = fds_in; struct pollfd *fds = fds_in;
int part; ch_part_T part;
int idx; int idx;
chanpart_T *in_part; chanpart_T *in_part;
@ -3725,7 +3731,7 @@ channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in)
channel_T *channel; channel_T *channel;
fd_set *rfds = rfds_in; fd_set *rfds = rfds_in;
fd_set *wfds = wfds_in; fd_set *wfds = wfds_in;
int part; ch_part_T part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next) for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{ {
@ -3757,7 +3763,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
channel_T *channel; channel_T *channel;
fd_set *rfds = rfds_in; fd_set *rfds = rfds_in;
fd_set *wfds = wfds_in; fd_set *wfds = wfds_in;
int part; ch_part_T part;
chanpart_T *in_part; chanpart_T *in_part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next) for (channel = first_channel; channel != NULL; channel = channel->ch_next)
@ -3803,7 +3809,7 @@ channel_parse_messages(void)
channel_T *channel = first_channel; channel_T *channel = first_channel;
int ret = FALSE; int ret = FALSE;
int r; int r;
int part = PART_SOCK; ch_part_T part = PART_SOCK;
++safe_to_invoke_callback; ++safe_to_invoke_callback;
@ -3816,9 +3822,9 @@ channel_parse_messages(void)
} }
while (channel != NULL) while (channel != NULL)
{ {
if (channel->ch_to_be_closed) if (channel->ch_to_be_closed == 0)
{ {
channel->ch_to_be_closed = FALSE; channel->ch_to_be_closed = (1 << PART_COUNT);
channel_close_now(channel); channel_close_now(channel);
/* channel may have been freed, start over */ /* channel may have been freed, start over */
channel = first_channel; channel = first_channel;
@ -3899,7 +3905,7 @@ set_ref_in_channel(int copyID)
/* /*
* Return the "part" to write to for "channel". * Return the "part" to write to for "channel".
*/ */
int ch_part_T
channel_part_send(channel_T *channel) channel_part_send(channel_T *channel)
{ {
if (channel->CH_SOCK_FD == INVALID_FD) if (channel->CH_SOCK_FD == INVALID_FD)
@ -3910,7 +3916,7 @@ channel_part_send(channel_T *channel)
/* /*
* Return the default "part" to read from for "channel". * Return the default "part" to read from for "channel".
*/ */
int ch_part_T
channel_part_read(channel_T *channel) channel_part_read(channel_T *channel)
{ {
if (channel->CH_SOCK_FD == INVALID_FD) if (channel->CH_SOCK_FD == INVALID_FD)
@ -3923,7 +3929,7 @@ channel_part_read(channel_T *channel)
* If "channel" is invalid returns MODE_JSON. * If "channel" is invalid returns MODE_JSON.
*/ */
ch_mode_T ch_mode_T
channel_get_mode(channel_T *channel, int part) channel_get_mode(channel_T *channel, ch_part_T part)
{ {
if (channel == NULL) if (channel == NULL)
return MODE_JSON; return MODE_JSON;
@ -3934,7 +3940,7 @@ channel_get_mode(channel_T *channel, int part)
* Return the timeout of "channel"/"part" * Return the timeout of "channel"/"part"
*/ */
int int
channel_get_timeout(channel_T *channel, int part) channel_get_timeout(channel_T *channel, ch_part_T part)
{ {
return channel->ch_part[part].ch_timeout; return channel->ch_part[part].ch_timeout;
} }
@ -3962,7 +3968,7 @@ handle_mode(typval_T *item, jobopt_T *opt, ch_mode_T *modep, int jo)
} }
static int static int
handle_io(typval_T *item, int part, jobopt_T *opt) handle_io(typval_T *item, ch_part_T part, jobopt_T *opt)
{ {
char_u *val = get_tv_string(item); char_u *val = get_tv_string(item);
@ -4045,7 +4051,7 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported)
dict_T *dict; dict_T *dict;
int todo; int todo;
hashitem_T *hi; hashitem_T *hi;
int part; ch_part_T part;
opt->jo_set = 0; opt->jo_set = 0;
if (tv->v_type == VAR_UNKNOWN) if (tv->v_type == VAR_UNKNOWN)
@ -4343,10 +4349,10 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported)
* Returns NULL if the handle is invalid. * Returns NULL if the handle is invalid.
* When "check_open" is TRUE check that the channel can be used. * When "check_open" is TRUE check that the channel can be used.
* When "reading" is TRUE "check_open" considers typeahead useful. * When "reading" is TRUE "check_open" considers typeahead useful.
* "part" is used to check typeahead, when -1 use the default part. * "part" is used to check typeahead, when PART_COUNT use the default part.
*/ */
channel_T * channel_T *
get_channel_arg(typval_T *tv, int check_open, int reading, int part) get_channel_arg(typval_T *tv, int check_open, int reading, ch_part_T part)
{ {
channel_T *channel = NULL; channel_T *channel = NULL;
int has_readahead = FALSE; int has_readahead = FALSE;
@ -4367,7 +4373,7 @@ get_channel_arg(typval_T *tv, int check_open, int reading, int part)
} }
if (channel != NULL && reading) if (channel != NULL && reading)
has_readahead = channel_has_readahead(channel, has_readahead = channel_has_readahead(channel,
part >= 0 ? part : channel_part_read(channel)); part != PART_COUNT ? part : channel_part_read(channel));
if (check_open && (channel == NULL || (!channel_is_open(channel) if (check_open && (channel == NULL || (!channel_is_open(channel)
&& !(reading && has_readahead)))) && !(reading && has_readahead))))
@ -4659,7 +4665,7 @@ job_start(typval_T *argvars)
garray_T ga; garray_T ga;
#endif #endif
jobopt_T opt; jobopt_T opt;
int part; ch_part_T part;
job = job_alloc(); job = job_alloc();
if (job == NULL) if (job == NULL)
@ -4679,7 +4685,7 @@ job_start(typval_T *argvars)
goto theend; goto theend;
/* Check that when io is "file" that there is a file name. */ /* Check that when io is "file" that there is a file name. */
for (part = PART_OUT; part <= PART_IN; ++part) for (part = PART_OUT; part < PART_COUNT; ++part)
if ((opt.jo_set & (JO_OUT_IO << (part - PART_OUT))) if ((opt.jo_set & (JO_OUT_IO << (part - PART_OUT)))
&& opt.jo_io[part] == JIO_FILE && opt.jo_io[part] == JIO_FILE
&& (!(opt.jo_set & (JO_OUT_NAME << (part - PART_OUT))) && (!(opt.jo_set & (JO_OUT_NAME << (part - PART_OUT)))

View File

@ -5622,7 +5622,7 @@ set_ref_in_item(
else if (tv->v_type == VAR_CHANNEL) else if (tv->v_type == VAR_CHANNEL)
{ {
channel_T *ch =tv->vval.v_channel; channel_T *ch =tv->vval.v_channel;
int part; ch_part_T part;
typval_T dtv; typval_T dtv;
jsonq_T *jq; jsonq_T *jq;
cbq_T *cq; cbq_T *cq;
@ -5630,7 +5630,7 @@ set_ref_in_item(
if (ch != NULL && ch->ch_copyID != copyID) if (ch != NULL && ch->ch_copyID != copyID)
{ {
ch->ch_copyID = copyID; ch->ch_copyID = copyID;
for (part = PART_SOCK; part <= PART_IN; ++part) for (part = PART_SOCK; part < PART_COUNT; ++part)
{ {
for (jq = ch->ch_part[part].ch_json_head.jq_next; jq != NULL; for (jq = ch->ch_part[part].ch_json_head.jq_next; jq != NULL;
jq = jq->jq_next) jq = jq->jq_next)

View File

@ -14,15 +14,15 @@ channel_T *channel_open_func(typval_T *argvars);
void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err); void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err);
void channel_set_job(channel_T *channel, job_T *job, jobopt_T *options); void channel_set_job(channel_T *channel, job_T *job, jobopt_T *options);
void channel_set_options(channel_T *channel, jobopt_T *opt); void channel_set_options(channel_T *channel, jobopt_T *opt);
void channel_set_req_callback(channel_T *channel, int part, char_u *callback, partial_T *partial, int id); void channel_set_req_callback(channel_T *channel, ch_part_T part, char_u *callback, partial_T *partial, int id);
void channel_buffer_free(buf_T *buf); void channel_buffer_free(buf_T *buf);
void channel_write_any_lines(void); void channel_write_any_lines(void);
void channel_write_new_lines(buf_T *buf); void channel_write_new_lines(buf_T *buf);
readq_T *channel_peek(channel_T *channel, int part); readq_T *channel_peek(channel_T *channel, ch_part_T part);
char_u *channel_first_nl(readq_T *node); char_u *channel_first_nl(readq_T *node);
char_u *channel_get(channel_T *channel, int part); char_u *channel_get(channel_T *channel, ch_part_T part);
void channel_consume(channel_T *channel, int part, int len); void channel_consume(channel_T *channel, ch_part_T part, int len);
int channel_collapse(channel_T *channel, int part, int want_nl); int channel_collapse(channel_T *channel, ch_part_T part, int want_nl);
int channel_can_write_to(channel_T *channel); int channel_can_write_to(channel_T *channel);
int channel_is_open(channel_T *channel); int channel_is_open(channel_T *channel);
char *channel_status(channel_T *channel, int req_part); char *channel_status(channel_T *channel, int req_part);
@ -31,13 +31,12 @@ void channel_close(channel_T *channel, int invoke_close_cb);
void channel_close_in(channel_T *channel); void channel_close_in(channel_T *channel);
void channel_clear(channel_T *channel); void channel_clear(channel_T *channel);
void channel_free_all(void); void channel_free_all(void);
char_u *channel_read_block(channel_T *channel, int part, int timeout); char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
int channel_read_json_block(channel_T *channel, int part, int timeout_arg, int id, typval_T **rettv); int channel_read_json_block(channel_T *channel, ch_part_T part, int timeout_arg, int id, typval_T **rettv);
void common_channel_read(typval_T *argvars, typval_T *rettv, int raw); void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
channel_T *channel_fd2channel(sock_T fd, int *partp); channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
void channel_handle_events(void); void channel_handle_events(void);
int channel_send(channel_T *channel, int part, char_u *buf, int len, char *fun); int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun);
channel_T *send_common(typval_T *argvars, char_u *text, int id, int eval, jobopt_T *opt, char *fun, int *part_read);
void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval); void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval);
void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval); void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval);
int channel_poll_setup(int nfd_in, void *fds_in); int channel_poll_setup(int nfd_in, void *fds_in);
@ -46,14 +45,14 @@ int channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in);
int channel_select_check(int ret_in, void *rfds_in, void *wfds_in); int channel_select_check(int ret_in, void *rfds_in, void *wfds_in);
int channel_parse_messages(void); int channel_parse_messages(void);
int set_ref_in_channel(int copyID); int set_ref_in_channel(int copyID);
int channel_part_send(channel_T *channel); ch_part_T channel_part_send(channel_T *channel);
int channel_part_read(channel_T *channel); ch_part_T channel_part_read(channel_T *channel);
ch_mode_T channel_get_mode(channel_T *channel, int part); ch_mode_T channel_get_mode(channel_T *channel, ch_part_T part);
int channel_get_timeout(channel_T *channel, int part); int channel_get_timeout(channel_T *channel, ch_part_T part);
void clear_job_options(jobopt_T *opt); void clear_job_options(jobopt_T *opt);
void free_job_options(jobopt_T *opt); void free_job_options(jobopt_T *opt);
int get_job_options(typval_T *tv, jobopt_T *opt, int supported); int get_job_options(typval_T *tv, jobopt_T *opt, int supported);
channel_T *get_channel_arg(typval_T *tv, int check_open, int reading, int part); channel_T *get_channel_arg(typval_T *tv, int check_open, int reading, ch_part_T part);
void job_free_all(void); void job_free_all(void);
int set_ref_in_job(int copyID); int set_ref_in_job(int copyID);
void job_unref(job_T *job); void job_unref(job_T *job);

View File

@ -1499,19 +1499,21 @@ typedef enum {
/* Ordering matters, it is used in for loops: IN is last, only SOCK/OUT/ERR /* Ordering matters, it is used in for loops: IN is last, only SOCK/OUT/ERR
* are polled. */ * are polled. */
#define PART_SOCK 0 typedef enum {
PART_SOCK = 0,
#define CH_SOCK_FD ch_part[PART_SOCK].ch_fd #define CH_SOCK_FD ch_part[PART_SOCK].ch_fd
#ifdef FEAT_JOB_CHANNEL #ifdef FEAT_JOB_CHANNEL
# define INVALID_FD (-1) PART_OUT,
# define PART_OUT 1
# define PART_ERR 2
# define PART_IN 3
# define CH_OUT_FD ch_part[PART_OUT].ch_fd # define CH_OUT_FD ch_part[PART_OUT].ch_fd
PART_ERR,
# define CH_ERR_FD ch_part[PART_ERR].ch_fd # define CH_ERR_FD ch_part[PART_ERR].ch_fd
PART_IN,
# define CH_IN_FD ch_part[PART_IN].ch_fd # define CH_IN_FD ch_part[PART_IN].ch_fd
#endif #endif
PART_COUNT
} ch_part_T;
#define INVALID_FD (-1)
/* The per-fd info for a channel. */ /* The per-fd info for a channel. */
typedef struct { typedef struct {
@ -1566,14 +1568,14 @@ struct channel_S {
int ch_id; /* ID of the channel */ int ch_id; /* ID of the channel */
int ch_last_msg_id; /* ID of the last message */ int ch_last_msg_id; /* ID of the last message */
chanpart_T ch_part[4]; /* info for socket, out, err and in */ chanpart_T ch_part[PART_COUNT]; /* info for socket, out, err and in */
char *ch_hostname; /* only for socket, allocated */ char *ch_hostname; /* only for socket, allocated */
int ch_port; /* only for socket */ int ch_port; /* only for socket */
int ch_to_be_closed; /* When TRUE reading or writing failed and int ch_to_be_closed; /* bitset of readable fds to be closed.
* the channel must be closed when it's safe * When all readable fds have been closed,
* to invoke callbacks. */ * set to (1 << PART_COUNT). */
int ch_to_be_freed; /* When TRUE channel must be freed when it's int ch_to_be_freed; /* When TRUE channel must be freed when it's
* safe to invoke callbacks. */ * safe to invoke callbacks. */
int ch_error; /* When TRUE an error was reported. Avoids int ch_error; /* When TRUE an error was reported. Avoids

View File

@ -1505,6 +1505,23 @@ func Test_read_nonl_line()
call assert_equal(3, g:linecount) call assert_equal(3, g:linecount)
endfunc endfunc
func Test_read_from_terminated_job()
if !has('job')
return
endif
let g:linecount = 0
if has('win32')
" workaround: 'shellescape' does improper escaping double quotes
let arg = 'import os,sys;os.close(1);sys.stderr.write(\"test\n\")'
else
let arg = 'import os,sys;os.close(1);sys.stderr.write("test\n")'
endif
call job_start([s:python, '-c', arg], {'callback': 'MyLineCountCb'})
call WaitFor('1 <= g:linecount')
call assert_equal(1, g:linecount)
endfunc
function Ch_test_close_lambda(port) function Ch_test_close_lambda(port)
let handle = ch_open('localhost:' . a:port, s:chopt) let handle = ch_open('localhost:' . a:port, s:chopt)
if ch_status(handle) == "fail" if ch_status(handle) == "fail"

View File

@ -764,6 +764,8 @@ static char *(features[]) =
static int included_patches[] = static int included_patches[] =
{ /* Add new patch number below this line */ { /* Add new patch number below this line */
/**/
27,
/**/ /**/
26, 26,
/**/ /**/