Skip to content

Commit

Permalink
PROTON-2748: Raw connection async close fix and tests. First part of …
Browse files Browse the repository at this point in the history
…pull 402
  • Loading branch information
Cliff Jansen committed Sep 12, 2023
1 parent 358bd82 commit cb637b7
Show file tree
Hide file tree
Showing 4 changed files with 499 additions and 51 deletions.
96 changes: 56 additions & 40 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ struct praw_connection_t {
struct addrinfo *ai; /* Current connect address */
bool connected;
bool disconnected;
bool batch_empty;
bool hup_detected;
bool read_check;
};

static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
Expand Down Expand Up @@ -318,10 +319,7 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) {
unlock(&rc->task.mutex);
if (waking) pni_raw_wake(raw);

pn_event_t *e = pni_raw_event_next(raw);
if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED)
rc->batch_empty = true;
return e;
return pni_raw_event_next(raw);
}

task_t *pni_psocket_raw_task(psocket_t* ps) {
Expand Down Expand Up @@ -393,10 +391,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
if (rc->disconnected) {
pni_raw_connect_failed(&rc->raw_connection);
unlock(&rc->task.mutex);
rc->batch_empty = false;
return &rc->batch;
}
if (events & (EPOLLHUP | EPOLLERR)) {
// Continuation of praw_connection_maybe_connect_lh() logic.
// A wake can be the first event. Otherwise, wait for connection to complete.
bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector);
t->working = event_pending;
Expand All @@ -405,35 +403,46 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);
unlock(&rc->task.mutex);
return &rc->batch;
}
unlock(&rc->task.mutex);

if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
if (events & EPOLLERR) {
// Read and write sides closed via RST. Tear down immediately.
int soerr;
socklen_t soerrlen = sizeof(soerr);
int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen);
if (ec == 0 && soerr) {
psocket_error(rc, soerr, "async disconnect");
}
pni_raw_async_disconnect(&rc->raw_connection);
return &rc->batch;
}
if (events & EPOLLHUP) {
rc->hup_detected = true;
}

if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) {
pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
rc->read_check = false;
}
if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error);
rc->batch_empty = false;
return &rc->batch;
}

void pni_raw_connection_done(praw_connection_t *rc) {
bool notify = false;
bool ready = false;
bool have_event = false;

// If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary.
if (!rc->batch_empty) {
if (pn_collector_peek(rc->raw_connection.collector))
have_event = true;
else {
pn_event_t *e = pni_raw_batch_next(&rc->batch);
// State machine up to date.
if (e) {
have_event = true;
// Sole event. Can put back without order issues.
// Edge case, performance not important.
pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e));
}
}
}
pn_raw_connection_t *raw = &rc->raw_connection;
int fd = rc->psocket.epoll_io.fd;

// Try write
if (pni_raw_can_write(raw)) pni_raw_write(raw, fd, snd, set_error);
pni_raw_process_shutdown(raw, fd, shutr, shutw);

// Update state machine and check for possible pending event.
bool have_event = pni_raw_batch_has_events(raw);

lock(&rc->task.mutex);
pn_proactor_t *p = rc->task.proactor;
Expand All @@ -442,24 +451,31 @@ void pni_raw_connection_done(praw_connection_t *rc) {
// The task may be in the ready state even if we've got no raw connection
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task);
ready = rc->task.ready;
ready = rc->task.ready; // No need to poll. Already scheduled.
unlock(&rc->task.mutex);

pn_raw_connection_t *raw = &rc->raw_connection;
int fd = rc->psocket.epoll_io.fd;
pni_raw_process_shutdown(raw, fd, shutr, shutw);
int wanted =
(pni_raw_can_read(raw) ? EPOLLIN : 0) |
(pni_raw_can_write(raw) ? EPOLLOUT : 0);
if (wanted) {
rc->psocket.epoll_io.wanted = wanted;
rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending;
if (finished_disconnect) {
// If we're closed and we've sent the disconnect then close
pni_raw_finalize(raw);
praw_connection_cleanup(rc);
} else if (ready) {
// Already scheduled to run. Skip poll. Remember if we want a read.
rc->read_check = pni_raw_can_read(raw);
} else if (!rc->connected) {
// Connect logic has already armed the socket.
} else {
bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending;
if (finished_disconnect) {
// If we're closed and we've sent the disconnect then close
pni_raw_finalize(raw);
praw_connection_cleanup(rc);
// Must poll for IO.
int wanted =
(pni_raw_can_read(raw) ? (EPOLLIN | EPOLLRDHUP) : 0) |
(pni_raw_can_write(raw) ? EPOLLOUT : 0);

// wanted == 0 implies we block until either application wake() or EPOLLHUP | EPOLLERR.
// If wanted == 0 and hup_detected, blocking not possible, so skip arming until
// application provides read buffers.
if (wanted || !rc->hup_detected) {
rc->psocket.epoll_io.wanted = wanted;
rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
}
}

Expand Down
2 changes: 2 additions & 0 deletions c/src/proactor/raw_connection-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ void pni_raw_write_close(pn_raw_connection_t *conn);
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int));
void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int));
void pni_raw_async_disconnect(pn_raw_connection_t *conn);
bool pni_raw_can_read(pn_raw_connection_t *conn);
bool pni_raw_can_write(pn_raw_connection_t *conn);
pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn);
bool pni_raw_batch_has_events(pn_raw_connection_t *conn);
void pni_raw_initialize(pn_raw_connection_t *conn);
void pni_raw_finalize(pn_raw_connection_t *conn);

Expand Down
35 changes: 31 additions & 4 deletions c/src/proactor/raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,14 @@ bool pni_raw_can_write(pn_raw_connection_t *conn) {
return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite;
}

pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
bool pni_raw_batch_has_events(pn_raw_connection_t *conn) {
// If collector empty, advance state machine as necessary and generate next event.
// Return true if at least one event is available.
assert(conn);
do {
pn_event_t *event = pn_collector_next(conn->collector);
pn_event_t *event = pn_collector_peek(conn->collector);
if (event) {
return pni_log_event(conn, event);
return true;
} else if (conn->connectpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
conn->connectpending = false;
Expand Down Expand Up @@ -716,11 +718,20 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
conn->rrequestedbuffers = true;
} else {
return NULL;
return false;
}
} while (true);
}

pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
if (pni_raw_batch_has_events(conn)) {
pn_event_t* event = pn_collector_next(conn->collector);
assert(event);
return pni_log_event(conn, event);
}
return NULL;
}

void pni_raw_read_close(pn_raw_connection_t *conn) {
// If already fully closed nothing to do
if (pni_raw_rwclosed(conn)) {
Expand Down Expand Up @@ -781,6 +792,22 @@ void pni_raw_close(pn_raw_connection_t *conn) {
}
}

void pni_raw_async_disconnect(pn_raw_connection_t *conn) {
if (pni_raw_rwclosed(conn))
return;

if (!pni_raw_rclosed(conn)) {
conn->state = pni_raw_new_state(conn, conn_read_closed);
conn->rclosedpending = true;
}
if (!pni_raw_wclosed(conn)) {
pni_raw_release_buffers(conn);
conn->state = pni_raw_new_state(conn, conn_write_closed);
conn->wclosedpending = true;
}
pni_raw_disconnect(conn);
}

bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
assert(conn);
return pni_raw_rclosed(conn);
Expand Down
Loading

0 comments on commit cb637b7

Please sign in to comment.