diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 9b85b15f1..c0e731066 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,7 +50,8 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; - bool batch_empty; + bool hup_detected; + bool read_check; }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -318,10 +319,7 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); - pn_event_t *e = pni_raw_event_next(raw); - if (!e || pn_event_type(e) == PN_RAW_CONNECTION_DISCONNECTED) - rc->batch_empty = true; - return e; + return pni_raw_event_next(raw); } task_t *pni_psocket_raw_task(psocket_t* ps) { @@ -393,10 +391,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool if (rc->disconnected) { pni_raw_connect_failed(&rc->raw_connection); unlock(&rc->task.mutex); - rc->batch_empty = false; return &rc->batch; } if (events & (EPOLLHUP | EPOLLERR)) { + // Continuation of praw_connection_maybe_connect_lh() logic. // A wake can be the first event. Otherwise, wait for connection to complete. bool event_pending = task_wake || pni_raw_wake_is_pending(&rc->raw_connection) || pn_collector_peek(rc->raw_connection.collector); t->working = event_pending; @@ -405,35 +403,46 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool } if (events & EPOLLOUT) praw_connection_connected_lh(rc); + unlock(&rc->task.mutex); + return &rc->batch; } unlock(&rc->task.mutex); - if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); + if (events & EPOLLERR) { + // Read and write sides closed via RST. Tear down immediately. + int soerr; + socklen_t soerrlen = sizeof(soerr); + int ec = getsockopt(fd, SOL_SOCKET, SO_ERROR, &soerr, &soerrlen); + if (ec == 0 && soerr) { + psocket_error(rc, soerr, "async disconnect"); + } + pni_raw_async_disconnect(&rc->raw_connection); + return &rc->batch; + } + if (events & EPOLLHUP) { + rc->hup_detected = true; + } + + if (events & (EPOLLIN | EPOLLRDHUP) || rc->read_check) { + pni_raw_read(&rc->raw_connection, fd, rcv, set_error); + rc->read_check = false; + } if (events & EPOLLOUT) pni_raw_write(&rc->raw_connection, fd, snd, set_error); - rc->batch_empty = false; return &rc->batch; } void pni_raw_connection_done(praw_connection_t *rc) { bool notify = false; bool ready = false; - bool have_event = false; - - // If !batch_empty, can't be sure state machine up to date, so reschedule task if necessary. - if (!rc->batch_empty) { - if (pn_collector_peek(rc->raw_connection.collector)) - have_event = true; - else { - pn_event_t *e = pni_raw_batch_next(&rc->batch); - // State machine up to date. - if (e) { - have_event = true; - // Sole event. Can put back without order issues. - // Edge case, performance not important. - pn_collector_put(rc->raw_connection.collector, pn_event_class(e), pn_event_context(e), pn_event_type(e)); - } - } - } + pn_raw_connection_t *raw = &rc->raw_connection; + int fd = rc->psocket.epoll_io.fd; + + // Try write + if (pni_raw_can_write(raw)) pni_raw_write(raw, fd, snd, set_error); + pni_raw_process_shutdown(raw, fd, shutr, shutw); + + // Update state machine and check for possible pending event. + bool have_event = pni_raw_batch_has_events(raw); lock(&rc->task.mutex); pn_proactor_t *p = rc->task.proactor; @@ -442,24 +451,31 @@ void pni_raw_connection_done(praw_connection_t *rc) { // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() notify = (pni_task_wake_pending(&rc->task) || have_event) && schedule(&rc->task); - ready = rc->task.ready; + ready = rc->task.ready; // No need to poll. Already scheduled. unlock(&rc->task.mutex); - pn_raw_connection_t *raw = &rc->raw_connection; - int fd = rc->psocket.epoll_io.fd; - pni_raw_process_shutdown(raw, fd, shutr, shutw); - int wanted = - (pni_raw_can_read(raw) ? EPOLLIN : 0) | - (pni_raw_can_write(raw) ? EPOLLOUT : 0); - if (wanted) { - rc->psocket.epoll_io.wanted = wanted; - rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error + bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending; + if (finished_disconnect) { + // If we're closed and we've sent the disconnect then close + pni_raw_finalize(raw); + praw_connection_cleanup(rc); + } else if (ready) { + // Already scheduled to run. Skip poll. Remember if we want a read. + rc->read_check = pni_raw_can_read(raw); + } else if (!rc->connected) { + // Connect logic has already armed the socket. } else { - bool finished_disconnect = raw->state==conn_fini && !ready && !raw->disconnectpending; - if (finished_disconnect) { - // If we're closed and we've sent the disconnect then close - pni_raw_finalize(raw); - praw_connection_cleanup(rc); + // Must poll for IO. + int wanted = + (pni_raw_can_read(raw) ? (EPOLLIN | EPOLLRDHUP) : 0) | + (pni_raw_can_write(raw) ? EPOLLOUT : 0); + + // wanted == 0 implies we block until either application wake() or EPOLLHUP | EPOLLERR. + // If wanted == 0 and hup_detected, blocking not possible, so skip arming until + // application provides read buffers. + if (wanted || !rc->hup_detected) { + rc->psocket.epoll_io.wanted = wanted; + rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error } } diff --git a/c/src/proactor/raw_connection-internal.h b/c/src/proactor/raw_connection-internal.h index 47b0ea925..cdfd5a852 100644 --- a/c/src/proactor/raw_connection-internal.h +++ b/c/src/proactor/raw_connection-internal.h @@ -134,9 +134,11 @@ void pni_raw_write_close(pn_raw_connection_t *conn); void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int)); void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void (*set_error)(pn_raw_connection_t *, const char *, int)); void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int)); +void pni_raw_async_disconnect(pn_raw_connection_t *conn); bool pni_raw_can_read(pn_raw_connection_t *conn); bool pni_raw_can_write(pn_raw_connection_t *conn); pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn); +bool pni_raw_batch_has_events(pn_raw_connection_t *conn); void pni_raw_initialize(pn_raw_connection_t *conn); void pni_raw_finalize(pn_raw_connection_t *conn); diff --git a/c/src/proactor/raw_connection.c b/c/src/proactor/raw_connection.c index 3d8b976c6..89fbbd1dd 100644 --- a/c/src/proactor/raw_connection.c +++ b/c/src/proactor/raw_connection.c @@ -669,12 +669,14 @@ bool pni_raw_can_write(pn_raw_connection_t *conn) { return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite; } -pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { +bool pni_raw_batch_has_events(pn_raw_connection_t *conn) { + // If collector empty, advance state machine as necessary and generate next event. + // Return true if at least one event is available. assert(conn); do { - pn_event_t *event = pn_collector_next(conn->collector); + pn_event_t *event = pn_collector_peek(conn->collector); if (event) { - return pni_log_event(conn, event); + return true; } else if (conn->connectpending) { pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED); conn->connectpending = false; @@ -716,11 +718,20 @@ pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS); conn->rrequestedbuffers = true; } else { - return NULL; + return false; } } while (true); } +pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) { + if (pni_raw_batch_has_events(conn)) { + pn_event_t* event = pn_collector_next(conn->collector); + assert(event); + return pni_log_event(conn, event); + } + return NULL; +} + void pni_raw_read_close(pn_raw_connection_t *conn) { // If already fully closed nothing to do if (pni_raw_rwclosed(conn)) { @@ -781,6 +792,22 @@ void pni_raw_close(pn_raw_connection_t *conn) { } } +void pni_raw_async_disconnect(pn_raw_connection_t *conn) { + if (pni_raw_rwclosed(conn)) + return; + + if (!pni_raw_rclosed(conn)) { + conn->state = pni_raw_new_state(conn, conn_read_closed); + conn->rclosedpending = true; + } + if (!pni_raw_wclosed(conn)) { + pni_raw_release_buffers(conn); + conn->state = pni_raw_new_state(conn, conn_write_closed); + conn->wclosedpending = true; + } + pni_raw_disconnect(conn); +} + bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) { assert(conn); return pni_raw_rclosed(conn); diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp index 4a5dc23d3..80ddc2028 100644 --- a/c/tests/raw_wake_test.cpp +++ b/c/tests/raw_wake_test.cpp @@ -28,11 +28,22 @@ #include #include #include +#include #endif #include -// WAKE tests require a running proactor. +// Raw connection tests driven by a proactor. + +// These tests often cheat by directly calling API functions that +// would normally be called in an event callback for thread safety +// reasons. This can usually work because the proactors and API calls +// are all called from a single thread so there is no contention, but +// the raw connection may require a wake so that the state machine and +// polling mask can be updated. Note that wakes stop working around +// the time the raw connection thinks it is about to be fully closed, +// so close operations may need to be done in event callbacks to +// avoid wake uncertainty. #include "../src/proactor/proactor-internal.h" #include "./pn_test_proactor.hpp" @@ -45,14 +56,32 @@ namespace { class common_handler : public handler { bool close_on_wake_; + bool write_close_on_wake_; + bool stop_on_wake_; + bool abort_on_wake_; + int closed_read_count_; + int closed_write_count_; + int disconnect_count_; + bool disconnect_error_; pn_raw_connection_t *last_server_; + pn_raw_buffer_t write_buff_; public: - explicit common_handler() : close_on_wake_(false), last_server_(0) {} + explicit common_handler() : close_on_wake_(false), write_close_on_wake_(0), stop_on_wake_(false), + abort_on_wake_(false), closed_read_count_(0), closed_write_count_(0), + disconnect_count_(0), disconnect_error_(false), + last_server_(0), write_buff_({0}) {} void set_close_on_wake(bool b) { close_on_wake_ = b; } - + void set_write_close_on_wake(bool b) { write_close_on_wake_ = b; } + void set_stop_on_wake(bool b) { stop_on_wake_ = b; } + void set_abort_on_wake(bool b) { abort_on_wake_ = b; } + int closed_read_count() { return closed_read_count_; } + int closed_write_count() { return closed_write_count_; } + int disconnect_count() { return disconnect_count_; } + bool disconnect_error() { return disconnect_error_; } pn_raw_connection_t *last_server() { return last_server_; } + void set_write_on_wake(pn_raw_buffer_t *b) { write_buff_ = *b; } bool handle(pn_event_t *e) override { switch (pn_event_type(e)) { @@ -71,13 +100,41 @@ class common_handler : public handler { } break; case PN_RAW_CONNECTION_WAKE: { - if (close_on_wake_) { - pn_raw_connection_t *rc = pn_event_raw_connection(e); + if (abort_on_wake_) abort(); + pn_raw_connection_t *rc = pn_event_raw_connection(e); + + if (write_buff_.size) { + // Add the buff for writing before any close operation. + CHECK(pn_raw_connection_write_buffers(rc, &write_buff_, 1) == 1); + write_buff_.size = 0; + } + if (write_close_on_wake_) + pn_raw_connection_write_close(rc); + if (close_on_wake_) pn_raw_connection_close(rc); + return stop_on_wake_; + } break; + + case PN_RAW_CONNECTION_DISCONNECTED: { + disconnect_count_++; + pn_raw_connection_t *rc = pn_event_raw_connection(e); + pn_condition_t *cond = pn_raw_connection_condition(rc); + if (disconnect_count_ == 1 && pn_condition_is_set(cond)) { + const char *nm = pn_condition_get_name(cond); + const char *ds = pn_condition_get_description(cond); + if (nm && strlen(nm) > 0 && ds && strlen(ds) > 0) + disconnect_error_ = true; } - return true; + return false; } break; + case PN_RAW_CONNECTION_CLOSED_READ: + closed_read_count_++; + return false; + + case PN_RAW_CONNECTION_CLOSED_WRITE: + closed_write_count_++; + return false; default: return false; @@ -85,9 +142,127 @@ class common_handler : public handler { } }; +static const size_t buffsz = 128; + +// Basic test consisting of +// client is an OS socket. +// server is a pn_raw_connection_t with one shared read/write buffer. +// pn_listener_t used to put the two together. +struct basic_test { + common_handler h; + proactor p; + pn_listener_t *l; + int sockfd; // client + pn_raw_connection_t *server_rc; + char buff[buffsz]; + bool buff_in_use; + + basic_test() : h(), p(&h) { + l = p.listen(); + REQUIRE_RUN(p, PN_LISTENER_OPEN); + sockfd = socket(AF_INET, SOCK_STREAM, 0); + REQUIRE(sockfd >= 0); + struct sockaddr_in laddr; + memset(&laddr, 0, sizeof(laddr)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons(atoi(pn_test::listening_port(l).c_str())); + laddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + connect(sockfd, (const struct sockaddr*) &laddr, sizeof(laddr)); + + REQUIRE_RUN(p, PN_LISTENER_ACCEPT); + server_rc = h.last_server(); + REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); + pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0}; + CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1); + buff_in_use = true; + + pn_raw_connection_wake(server_rc); + REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); + CHECK(pn_proactor_get(p) == NULL); /* idle */ + } + + ~basic_test() { + pn_listener_close(l); + REQUIRE_RUN(p, PN_LISTENER_CLOSE); + REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); + if (sockfd >= 0) close(sockfd); + bool sanity = h.closed_read_count() == 1 && h.closed_write_count() == 1 && + h.disconnect_count() == 1; + REQUIRE(sanity == true); + } + + void socket_write_close() { + if (sockfd < 0) return; + shutdown(sockfd, SHUT_WR); + } + + void socket_graceful_close() { + if (sockfd < 0) return; + close(sockfd); + sockfd = -1; + } + + bool socket_hard_close() { + // RST (not FIN), hard/abort close + if (sockfd < 0) return false; + struct linger lngr; + lngr.l_onoff = 1; + lngr.l_linger = 0; + if (sockfd < 0) return false; + if (setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &lngr, sizeof(lngr)) == 0) { + if (close(sockfd) == 0) { + sockfd = -1; + return true; + } + } + return false; + } + + void drain_read_buffer() { + assert(buff_in_use); + send(sockfd, "FOO", 3, 0); + REQUIRE_RUN(p, PN_RAW_CONNECTION_READ); + pn_raw_buffer_t rb = {0}; + REQUIRE(pn_raw_connection_take_read_buffers(server_rc, &rb, 1) == 1); + REQUIRE(rb.size == 3); + buff_in_use = false; + } + + void give_read_buffer() { + assert(!buff_in_use); + pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0}; + CHECK(pn_raw_connection_give_read_buffers(server_rc, &rb, 1) == 1); + buff_in_use = true; + } + + void write_next_wake(const char *m) { + assert(!buff_in_use); + pn_raw_buffer_t rb = {0, buff, buffsz, 0, 0}; + size_t l = strlen(m); + assert(l < buffsz); + strcpy(rb.bytes, m); + rb.size = l; + h.set_write_on_wake(&rb); + } + + int drain_events() { + int ec = 0; + pn_event_batch_t *batch = NULL; + while (batch = pn_proactor_get(p.get())) { + pn_event_t *e; + while (e = pn_event_batch_next(batch)) { + ec++; + h.dispatch(e); + } + pn_proactor_done(p.get(), batch); + } + return ec; + } +}; } // namespace + // Test waking up a connection that is idle TEST_CASE("proactor_raw_connection_wake") { common_handler h; @@ -104,7 +279,7 @@ TEST_CASE("proactor_raw_connection_wake") { REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); CHECK(pn_proactor_get(p) == NULL); /* idle */ - pn_raw_connection_wake(rc); + pn_raw_connection_wake(rc); REQUIRE_RUN(p, PN_RAW_CONNECTION_WAKE); CHECK(pn_proactor_get(p) == NULL); /* idle */ @@ -119,3 +294,231 @@ TEST_CASE("proactor_raw_connection_wake") { REQUIRE_RUN(p, PN_LISTENER_CLOSE); REQUIRE_RUN(p, PN_PROACTOR_INACTIVE); } + +// Normal close +TEST_CASE("raw_connection_graceful_close") { + struct basic_test x; + x.socket_graceful_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == false); +} + +// HARD close +TEST_CASE("raw_connection_hardclose") { + struct basic_test x; + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// HARD close, no read buffer +TEST_CASE("raw_connection_hardclose_nrb") { + struct basic_test x; + // Drain read buffer without replenishing + x.drain_read_buffer(); + x.drain_events(); + CHECK(pn_proactor_get(x.p) == NULL); /* idle */ + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// HARD close after read close +TEST_CASE("raw_connection_readclose_then_hardclose") { + struct basic_test x; + x.socket_write_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + x.drain_events(); + REQUIRE(x.h.disconnect_count() == 0); + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// HARD close after read close, no read buffer +TEST_CASE("raw_connection_readclose_then_hardclose_nrb") { + struct basic_test x; + // Drain read buffer without replenishing + x.drain_read_buffer(); + x.drain_events(); + CHECK(pn_proactor_get(x.p) == NULL); /* idle */ + // Shut of read side should be ignored with no read buffer. + x.socket_write_close(); + CHECK(pn_proactor_get(x.p) == NULL); /* still idle */ + + // Confirm raw connection shuts down, even with no read buffer + x.socket_hard_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// Normal close on socket delays CLOSED_READ event until application makes read buffers available +TEST_CASE("raw_connection_delay_readclose") { + struct basic_test x; + x.drain_read_buffer(); + x.socket_graceful_close(); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 0); + + x.give_read_buffer(); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_READ); + REQUIRE(x.h.closed_read_count() == 1); + + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +TEST_CASE("raw_connection_rst_on_write") { + struct basic_test x; + x.drain_read_buffer(); + + // Send some data + x.write_next_wake("foo"); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WRITTEN); + pn_raw_buffer_t rb = {0}; + CHECK(pn_raw_connection_take_written_buffers(x.server_rc, &rb, 1) == 1); + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3); + + // Repeat, with closed peer socket. + x.socket_graceful_close(); + x.write_next_wake("bar"); + pn_raw_connection_wake(x.server_rc); + // Write or subsequent poll should fail EPIPE + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.disconnect_error() == true); +} + +// One sided close. No cooperation from peer. +TEST_CASE("raw_connection_full_close") { + struct basic_test x; + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +// As above. No read buffer. +TEST_CASE("raw_connection_full_close_nrb") { + struct basic_test x; + x.drain_read_buffer(); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +// One sided close, pending write. +TEST_CASE("raw_connection_close_wdrain") { + struct basic_test x; + x.drain_read_buffer(); + // write and then close on next wake + x.write_next_wake("fubar"); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + // Now check fubar made it + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 5); + REQUIRE(strncmp("fubar", b, 5) == 0); +} + +// One sided write_close then close. +TEST_CASE("raw_connection_wclose_full_close") { + struct basic_test x; + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 0); + + x.h.set_write_close_on_wake(false); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +TEST_CASE("raw_connection_wclose_full_close_nrb") { + struct basic_test x; + x.drain_read_buffer(); + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_CLOSED_WRITE); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 0); + + x.h.set_write_close_on_wake(false); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); +} + +TEST_CASE("raw_connection_wclose_full_close_wdrain") { + struct basic_test x; + x.drain_read_buffer(); + // write and then wclose then close on next wake + x.write_next_wake("bar"); + x.h.set_write_close_on_wake(true); + x.h.set_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_WAKE); + // No send/recv/close/shutdown activity from peer socket. + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + // Now check bar made it + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 3); + REQUIRE(strncmp("bar", b, 3) == 0); +} + +// Half closes each direction. Raw connection then peer. +TEST_CASE("raw_connection_wclose_then_rclose") { + struct basic_test x; + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + x.drain_events(); + REQUIRE(x.h.closed_write_count() == 1); + REQUIRE(x.h.closed_read_count() == 0); + + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF + x.socket_write_close(); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + REQUIRE(x.h.closed_read_count() == 1); +} + +// As above but peer first then raw connection. +TEST_CASE("raw_connection_rclose_then_wclose") { + struct basic_test x; + x.socket_write_close(); + x.drain_events(); + REQUIRE(x.h.closed_read_count() == 1); + REQUIRE(x.h.closed_write_count() == 0); + + x.h.set_write_close_on_wake(true); + pn_raw_connection_wake(x.server_rc); + REQUIRE_RUN(x.p, PN_RAW_CONNECTION_DISCONNECTED); + char b[buffsz]; + REQUIRE(recv(x.sockfd, b, buffsz, 0) == 0); // EOF + REQUIRE(x.h.closed_write_count() == 1); +} +