Improve the ability to cancel an in-flight request by using an

interrupt, via SIGCONT, to force the read or write system call to
return prematurely.

Reviewed by:	grehan
This commit is contained in:
Tycho Nightingale
2014-11-04 01:06:33 +00:00
parent 2f1217877e
commit ae45750d6c
+146 -24
View File
@@ -43,9 +43,13 @@ __FBSDID("$FreeBSD$");
#include <string.h> #include <string.h>
#include <pthread.h> #include <pthread.h>
#include <pthread_np.h> #include <pthread_np.h>
#include <signal.h>
#include <unistd.h> #include <unistd.h>
#include <machine/atomic.h>
#include "bhyverun.h" #include "bhyverun.h"
#include "mevent.h"
#include "block_if.h" #include "block_if.h"
#define BLOCKIF_SIG 0xb109b109 #define BLOCKIF_SIG 0xb109b109
@@ -60,7 +64,9 @@ enum blockop {
enum blockstat { enum blockstat {
BST_FREE, BST_FREE,
BST_INUSE BST_PEND,
BST_BUSY,
BST_DONE
}; };
struct blockif_elem { struct blockif_elem {
@@ -68,6 +74,7 @@ struct blockif_elem {
struct blockif_req *be_req; struct blockif_req *be_req;
enum blockop be_op; enum blockop be_op;
enum blockstat be_status; enum blockstat be_status;
pthread_t be_tid;
}; };
struct blockif_ctxt { struct blockif_ctxt {
@@ -81,13 +88,25 @@ struct blockif_ctxt {
pthread_cond_t bc_cond; pthread_cond_t bc_cond;
int bc_closing; int bc_closing;
/* Request elements and free/inuse queues */ /* Request elements and free/pending/busy queues */
TAILQ_HEAD(, blockif_elem) bc_freeq; TAILQ_HEAD(, blockif_elem) bc_freeq;
TAILQ_HEAD(, blockif_elem) bc_inuseq; TAILQ_HEAD(, blockif_elem) bc_pendq;
TAILQ_HEAD(, blockif_elem) bc_busyq;
u_int bc_req_count; u_int bc_req_count;
struct blockif_elem bc_reqs[BLOCKIF_MAXREQ]; struct blockif_elem bc_reqs[BLOCKIF_MAXREQ];
}; };
static pthread_once_t blockif_once = PTHREAD_ONCE_INIT;
struct blockif_sig_elem {
pthread_mutex_t bse_mtx;
pthread_cond_t bse_cond;
int bse_pending;
struct blockif_sig_elem *bse_next;
};
static struct blockif_sig_elem *blockif_bse_head;
static int static int
blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq, blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq,
enum blockop op) enum blockop op)
@@ -101,10 +120,10 @@ blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq,
assert(be->be_status == BST_FREE); assert(be->be_status == BST_FREE);
TAILQ_REMOVE(&bc->bc_freeq, be, be_link); TAILQ_REMOVE(&bc->bc_freeq, be, be_link);
be->be_status = BST_INUSE; be->be_status = BST_PEND;
be->be_req = breq; be->be_req = breq;
be->be_op = op; be->be_op = op;
TAILQ_INSERT_TAIL(&bc->bc_inuseq, be, be_link); TAILQ_INSERT_TAIL(&bc->bc_pendq, be, be_link);
bc->bc_req_count++; bc->bc_req_count++;
@@ -112,26 +131,38 @@ blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq,
} }
static int static int
blockif_dequeue(struct blockif_ctxt *bc, struct blockif_elem *el) blockif_dequeue(struct blockif_ctxt *bc, struct blockif_elem **bep)
{ {
struct blockif_elem *be; struct blockif_elem *be;
if (bc->bc_req_count == 0) if (bc->bc_req_count == 0)
return (ENOENT); return (ENOENT);
be = TAILQ_FIRST(&bc->bc_inuseq); be = TAILQ_FIRST(&bc->bc_pendq);
assert(be != NULL); assert(be != NULL);
assert(be->be_status == BST_INUSE); assert(be->be_status == BST_PEND);
*el = *be; TAILQ_REMOVE(&bc->bc_pendq, be, be_link);
be->be_status = BST_BUSY;
be->be_tid = bc->bc_btid;
TAILQ_INSERT_TAIL(&bc->bc_busyq, be, be_link);
TAILQ_REMOVE(&bc->bc_inuseq, be, be_link); *bep = be;
return (0);
}
static void
blockif_complete(struct blockif_ctxt *bc, struct blockif_elem *be)
{
assert(be->be_status == BST_DONE);
TAILQ_REMOVE(&bc->bc_busyq, be, be_link);
be->be_tid = 0;
be->be_status = BST_FREE; be->be_status = BST_FREE;
be->be_req = NULL; be->be_req = NULL;
TAILQ_INSERT_TAIL(&bc->bc_freeq, be, be_link); TAILQ_INSERT_TAIL(&bc->bc_freeq, be, be_link);
bc->bc_req_count--;
return (0); bc->bc_req_count--;
} }
static void static void
@@ -163,6 +194,8 @@ blockif_proc(struct blockif_ctxt *bc, struct blockif_elem *be)
break; break;
} }
be->be_status = BST_DONE;
(*br->br_callback)(br, err); (*br->br_callback)(br, err);
} }
@@ -170,16 +203,17 @@ static void *
blockif_thr(void *arg) blockif_thr(void *arg)
{ {
struct blockif_ctxt *bc; struct blockif_ctxt *bc;
struct blockif_elem req; struct blockif_elem *be;
bc = arg; bc = arg;
for (;;) { for (;;) {
pthread_mutex_lock(&bc->bc_mtx); pthread_mutex_lock(&bc->bc_mtx);
while (!blockif_dequeue(bc, &req)) { while (!blockif_dequeue(bc, &be)) {
pthread_mutex_unlock(&bc->bc_mtx); pthread_mutex_unlock(&bc->bc_mtx);
blockif_proc(bc, &req); blockif_proc(bc, be);
pthread_mutex_lock(&bc->bc_mtx); pthread_mutex_lock(&bc->bc_mtx);
blockif_complete(bc, be);
} }
pthread_cond_wait(&bc->bc_cond, &bc->bc_mtx); pthread_cond_wait(&bc->bc_cond, &bc->bc_mtx);
pthread_mutex_unlock(&bc->bc_mtx); pthread_mutex_unlock(&bc->bc_mtx);
@@ -195,6 +229,38 @@ blockif_thr(void *arg)
return (NULL); return (NULL);
} }
static void
blockif_sigcont_handler(int signal, enum ev_type type, void *arg)
{
struct blockif_sig_elem *bse;
for (;;) {
/*
* Process the entire list even if not intended for
* this thread.
*/
do {
bse = blockif_bse_head;
if (bse == NULL)
return;
} while (!atomic_cmpset_ptr((uintptr_t *)&blockif_bse_head,
(uintptr_t)bse,
(uintptr_t)bse->bse_next));
pthread_mutex_lock(&bse->bse_mtx);
bse->bse_pending = 0;
pthread_cond_signal(&bse->bse_cond);
pthread_mutex_unlock(&bse->bse_mtx);
}
}
static void
blockif_init(void)
{
mevent_add(SIGCONT, EVF_SIGNAL, blockif_sigcont_handler, NULL);
(void) signal(SIGCONT, SIG_IGN);
}
struct blockif_ctxt * struct blockif_ctxt *
blockif_open(const char *optstr, const char *ident) blockif_open(const char *optstr, const char *ident)
{ {
@@ -206,6 +272,8 @@ blockif_open(const char *optstr, const char *ident)
int extra, fd, i, sectsz; int extra, fd, i, sectsz;
int nocache, sync, ro; int nocache, sync, ro;
pthread_once(&blockif_once, blockif_init);
nocache = 0; nocache = 0;
sync = 0; sync = 0;
ro = 0; ro = 0;
@@ -280,7 +348,8 @@ blockif_open(const char *optstr, const char *ident)
pthread_mutex_init(&bc->bc_mtx, NULL); pthread_mutex_init(&bc->bc_mtx, NULL);
pthread_cond_init(&bc->bc_cond, NULL); pthread_cond_init(&bc->bc_cond, NULL);
TAILQ_INIT(&bc->bc_freeq); TAILQ_INIT(&bc->bc_freeq);
TAILQ_INIT(&bc->bc_inuseq); TAILQ_INIT(&bc->bc_pendq);
TAILQ_INIT(&bc->bc_busyq);
bc->bc_req_count = 0; bc->bc_req_count = 0;
for (i = 0; i < BLOCKIF_MAXREQ; i++) { for (i = 0; i < BLOCKIF_MAXREQ; i++) {
bc->bc_reqs[i].be_status = BST_FREE; bc->bc_reqs[i].be_status = BST_FREE;
@@ -357,23 +426,76 @@ blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
assert(bc->bc_magic == BLOCKIF_SIG); assert(bc->bc_magic == BLOCKIF_SIG);
pthread_mutex_lock(&bc->bc_mtx); pthread_mutex_lock(&bc->bc_mtx);
TAILQ_FOREACH(be, &bc->bc_inuseq, be_link) { /*
* Check pending requests.
*/
TAILQ_FOREACH(be, &bc->bc_pendq, be_link) {
if (be->be_req == breq)
break;
}
if (be != NULL) {
/*
* Found it.
*/
TAILQ_REMOVE(&bc->bc_pendq, be, be_link);
be->be_status = BST_FREE;
be->be_req = NULL;
TAILQ_INSERT_TAIL(&bc->bc_freeq, be, be_link);
bc->bc_req_count--;
pthread_mutex_unlock(&bc->bc_mtx);
return (0);
}
/*
* Check in-flight requests.
*/
TAILQ_FOREACH(be, &bc->bc_busyq, be_link) {
if (be->be_req == breq) if (be->be_req == breq)
break; break;
} }
if (be == NULL) { if (be == NULL) {
/*
* Didn't find it.
*/
pthread_mutex_unlock(&bc->bc_mtx); pthread_mutex_unlock(&bc->bc_mtx);
return (EINVAL); return (EINVAL);
} }
TAILQ_REMOVE(&bc->bc_inuseq, be, be_link); /*
be->be_status = BST_FREE; * Interrupt the processing thread to force it return
be->be_req = NULL; * prematurely via it's normal callback path.
TAILQ_INSERT_TAIL(&bc->bc_freeq, be, be_link); */
bc->bc_req_count--; while (be->be_status == BST_BUSY) {
struct blockif_sig_elem bse, *old_head;
pthread_mutex_init(&bse.bse_mtx, NULL);
pthread_cond_init(&bse.bse_cond, NULL);
bse.bse_pending = 1;
do {
old_head = blockif_bse_head;
bse.bse_next = old_head;
} while (!atomic_cmpset_ptr((uintptr_t *)&blockif_bse_head,
(uintptr_t)old_head,
(uintptr_t)&bse));
pthread_kill(be->be_tid, SIGCONT);
pthread_mutex_lock(&bse.bse_mtx);
while (bse.bse_pending)
pthread_cond_wait(&bse.bse_cond, &bse.bse_mtx);
pthread_mutex_unlock(&bse.bse_mtx);
}
pthread_mutex_unlock(&bc->bc_mtx); pthread_mutex_unlock(&bc->bc_mtx);
return (0); /*
* The processing thread has been interrupted. Since it's not
* clear if the callback has been invoked yet, return EBUSY.
*/
return (EBUSY);
} }
int int