Skip to content

Commit

Permalink
box: finish client fibers on shutdown
Browse files Browse the repository at this point in the history
In the process of graceful shutdown it is convenient to first finish
all client (non system) fibers. Otherwise we should be ready for any
subsystem to handle request from client fiber during or after subsystem
shutdown. This would make code more complex.

We first cancel client fibers and then wait for their finishing. The
fiber may not respond to cancel and hang which cause shutdown hang
but this is the approach we choose for iproto shutdown already.

Note that as a result of this approach application will panic if
it is shutdown during execution of initialization script (in
particular if this script is doing box.cfg).

There are changes in application/test to adopt to client fibers
shutdown:

- make code cancellable (only to pass existing tests, we did not
  investigate all the possible places that should be made such).

- make console stop sending echo to client before client fibers
  shutdown. Otherwise as console server fiber is client one we will send
  message that fiber is cancelled on shutdown which breaks a lot of
  existing tests. This approach is on par with iproto shutdown.

- some tests (7743, replication-luatest/shutdown, replication/anon,
  replication/force_recovery etc etc) test shutdown during execution of
  init script. Now panic is expected so change them accordingly.

- some tests (8530, errinj_vylog) use injection that block client
  fiber finishing. In that tests we don't need graceful shutdown so
  let's just kill tarantool instead.

- we change test in vinyl/errinj for tarantoolgh-3225. We don't really need
  to check when vinyl reader is blocked as it executes small tasks
  (we assume reading syscall will not hang). Also change test for
  vinyl dump shutdown by slowing dump down instead of blocking it
  entirely. This is required to finish in time client fibers in
  the test.

- other similar changes

Also we can drop code from replication shutdown which is required to
handle client requests during/after shutdown.

Part of tarantool#8423

NO_CHANGELOG=internal
NO_DOC=internal
  • Loading branch information
nshy authored and locker committed Jan 29, 2024
1 parent 216b624 commit bf62065
Show file tree
Hide file tree
Showing 44 changed files with 357 additions and 134 deletions.
12 changes: 12 additions & 0 deletions src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4941,6 +4941,8 @@ bootstrap_from_master(struct replica *master)
try {
applier_resume_to_state(applier, APPLIER_READY,
TIMEOUT_INFINITY);
} catch (FiberIsCancelled *e) {
throw e;
} catch (...) {
return false;
}
Expand All @@ -4958,6 +4960,8 @@ bootstrap_from_master(struct replica *master)
try {
applier_resume_to_state(applier, APPLIER_FETCH_SNAPSHOT,
TIMEOUT_INFINITY);
} catch (FiberIsCancelled *e) {
throw e;
} catch (...) {
return false;
}
Expand Down Expand Up @@ -5926,6 +5930,14 @@ box_storage_shutdown()
if (!is_storage_initialized)
return;
iproto_shutdown();
/*
* Finish client fibers after iproto_shutdown otherwise new fibers
* can be started through new iproto requests. Also we should
* finish client fibers before other subsystems shutdown so that
* we won't need to handle requests from client fibers after/during
* subsystem shutdown.
*/
fiber_shutdown();
replication_shutdown();
}

Expand Down
8 changes: 8 additions & 0 deletions src/box/lua/console.lua
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,15 @@ local function client_handler(client, _peer)
state:print(string.format("%-63s\n%-63s\n",
"Tarantool ".. version.." (Lua console)",
"type 'help' for interactive help"))
local on_shutdown = function()
-- Fiber is going to be cancelled on shutdown. Do not report
-- cancel induced error to the peer.
client:close();
end
state.fiber = fiber.self()
box.ctl.on_shutdown(on_shutdown)
repl(state)
box.ctl.on_shutdown(nil, on_shutdown)
session_internal.run_on_disconnect()
end

Expand Down
11 changes: 9 additions & 2 deletions src/box/memtx_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1028,12 +1028,19 @@ checkpoint_f(va_list ap)
return -1;
}

struct mh_i32_t *temp_space_ids = mh_i32_new();
struct mh_i32_t *temp_space_ids;

say_info("saving snapshot `%s'", snap->filename);
ERROR_INJECT_SLEEP(ERRINJ_SNAP_WRITE_DELAY);
ERROR_INJECT_WHILE(ERRINJ_SNAP_WRITE_DELAY, {
fiber_sleep(0.001);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
goto fail;
}
});
ERROR_INJECT(ERRINJ_SNAP_SKIP_ALL_ROWS, goto done);
struct space_read_view *space_rv;
temp_space_ids = mh_i32_new();
read_view_foreach_space(space_rv, &ckpt->rv) {
FiberGCChecker gc_check;
bool skip = false;
Expand Down
34 changes: 5 additions & 29 deletions src/box/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,6 @@ double replication_sync_timeout = 300.0; /* seconds */
bool replication_skip_conflict = false;
int replication_threads = 1;

/**
* Fiber executing replicaset_connect. NULL if the function
* is not being executed.
*/
static struct fiber *replication_connect_fiber;

/** Condition that replicaset_connect finished execution. */
static struct fiber_cond replication_connect_cond;

/** If set then replication shutdown is started. */
static bool replication_is_shutting_down;

bool cfg_replication_anon = true;
struct tt_uuid cfg_bootstrap_leader_uuid;
struct uri cfg_bootstrap_leader_uri;
Expand Down Expand Up @@ -231,7 +219,6 @@ replication_init(int num_threads)
diag_create(&replicaset.applier.diag);

replication_threads = num_threads;
fiber_cond_create(&replication_connect_cond);

/* The local instance is always part of the quorum. */
replicaset.healthy_count = 1;
Expand All @@ -242,12 +229,6 @@ replication_init(int num_threads)
void
replication_shutdown(void)
{
replication_is_shutting_down = true;
if (replication_connect_fiber != NULL)
fiber_cancel(replication_connect_fiber);
while (replication_connect_fiber != NULL)
fiber_cond_wait(&replication_connect_cond);

struct replica *replica;
rlist_foreach_entry(replica, &replicaset.anon, in_anon)
applier_stop(replica->applier);
Expand All @@ -263,7 +244,6 @@ replication_free(void)
diag_destroy(&replicaset.applier.diag);
trigger_destroy(&replicaset.on_ack);
trigger_destroy(&replicaset.on_relay_thread_start);
fiber_cond_destroy(&replication_connect_cond);
fiber_cond_destroy(&replicaset.applier.cond);
latch_destroy(&replicaset.applier.order_latch);
applier_free();
Expand Down Expand Up @@ -1072,9 +1052,6 @@ void
replicaset_connect(const struct uri_set *uris,
bool connect_quorum, bool keep_connect)
{
if (replication_is_shutting_down)
tnt_raise(ClientError, ER_SHUTDOWN);

if (uris->uri_count == 0) {
/* Cleanup the replica set. */
replicaset_update(NULL, 0, false);
Expand All @@ -1087,12 +1064,6 @@ replicaset_connect(const struct uri_set *uris,
tnt_raise(ClientError, ER_CFG, "replication",
"too many replicas");
}
assert(replication_connect_fiber == NULL);
replication_connect_fiber = fiber();
auto connect_fiber_guard = make_scoped_guard([&]{
replication_connect_fiber = NULL;
fiber_cond_signal(&replication_connect_cond);
});
int count = 0;
struct applier *appliers[VCLOCK_MAX] = {};
auto appliers_guard = make_scoped_guard([&]{
Expand Down Expand Up @@ -1342,6 +1313,11 @@ replicaset_sync(void)
say_info("replica set sync complete");
box_set_orphan(false);
}
/*
* If fiber is cancelled raise error here so that orphan status is
* correct.
*/
fiber_testcancel();
}

void
Expand Down
4 changes: 4 additions & 0 deletions src/box/vy_quota.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ vy_quota_use(struct vy_quota *q, enum vy_quota_consumer_type type,
diag_set(ClientError, ER_VY_QUOTA_TIMEOUT);
return -1;
}
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
return -1;
}

double wait_time = ev_monotonic_now(loop()) - wait_start;
if (wait_time > q->too_long_threshold) {
Expand Down
12 changes: 10 additions & 2 deletions src/box/vy_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -730,13 +730,20 @@ vy_scheduler_wait_checkpoint(struct vy_scheduler *scheduler)
/* A dump error occurred, abort checkpoint. */
struct error *e = diag_last_error(&scheduler->diag);
diag_set_error(diag_get(), e);
say_error("vinyl checkpoint failed: %s", e->errmsg);
return -1;
goto error;
}
fiber_cond_wait(&scheduler->dump_cond);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
goto error;
}
}
say_info("vinyl checkpoint completed");
return 0;
error:
say_error("vinyl checkpoint failed: %s",
diag_last_error(diag_get())->errmsg);
return -1;
}

void
Expand Down Expand Up @@ -886,6 +893,7 @@ vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
struct vy_deferred_delete_batch *batch = container_of(cmsg,
struct vy_deferred_delete_batch, cmsg);
struct vy_task *task = batch->task;
fiber_set_system(fiber(), true);
/*
* Wait for memory quota if necessary before starting to
* process the batch (we can't yield between statements).
Expand Down
1 change: 0 additions & 1 deletion src/lib/core/errinj.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ struct errinj {
_(ERRINJ_IPROTO_TX_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_IPROTO_WRITE_ERROR_DELAY, ERRINJ_BOOL, {.bparam = false})\
_(ERRINJ_LOG_ROTATE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_MAIN_MAKE_FILE_ON_RETURN, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_MEMTX_DELAY_GC, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_NETBOX_DISABLE_ID, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_NETBOX_FLIP_FEATURE, ERRINJ_INT, {.iparam = -1}) \
Expand Down
48 changes: 47 additions & 1 deletion src/lib/core/fiber.c
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,13 @@ fiber_loop(MAYBE_UNUSED void *data)
assert(f != fiber);
fiber_wakeup(f);
}
if (!(fiber->flags & FIBER_IS_SYSTEM)) {
assert(cord()->client_fiber_count > 0);
cord()->client_fiber_count--;
if (cord()->shutdown_fiber != NULL &&
cord()->client_fiber_count == 0)
fiber_wakeup(cord()->shutdown_fiber);
}
fiber_on_stop(fiber);
/* reset pending wakeups */
rlist_del(&fiber->state);
Expand Down Expand Up @@ -1590,6 +1597,8 @@ fiber_new_ex(const char *name, const struct fiber_attr *fiber_attr,
fiber_gc_checker_init(fiber);
cord->next_fid++;
assert(cord->next_fid > FIBER_ID_MAX_RESERVED);
if (!(fiber->flags & FIBER_IS_SYSTEM))
cord()->client_fiber_count++;

return fiber;

Expand Down Expand Up @@ -1849,7 +1858,7 @@ cord_create(struct cord *cord, const char *name)
cord->sched.name = NULL;
fiber_set_name(&cord->sched, "sched");
cord->fiber = &cord->sched;
cord->sched.flags = FIBER_IS_RUNNING;
cord->sched.flags = FIBER_IS_RUNNING | FIBER_IS_SYSTEM;
cord->sched.max_slice = zero_slice;
cord->max_slice = default_slice;

Expand Down Expand Up @@ -1884,6 +1893,8 @@ cord_create(struct cord *cord, const char *name)
cord->sched.stack_watermark = NULL;
#endif
signal_stack_init();
cord->shutdown_fiber = NULL;
cord->client_fiber_count = 0;
}

void
Expand Down Expand Up @@ -2339,3 +2350,38 @@ fiber_lua_state(struct fiber *f)
{
return f->storage.lua.stack;
}

void
fiber_set_system(struct fiber *f, bool yesno)
{
if (yesno) {
if (!(f->flags & FIBER_IS_SYSTEM)) {
f->flags |= FIBER_IS_SYSTEM;
assert(cord()->client_fiber_count > 0);
cord()->client_fiber_count--;
if (cord()->shutdown_fiber != NULL &&
cord()->client_fiber_count == 0)
fiber_wakeup(cord()->shutdown_fiber);
}
} else {
if (f->flags & FIBER_IS_SYSTEM) {
f->flags &= ~FIBER_IS_SYSTEM;
cord()->client_fiber_count++;
}
}
}

void
fiber_shutdown(void)
{
assert(cord()->shutdown_fiber == NULL);
struct fiber *fiber;
rlist_foreach_entry(fiber, &cord()->alive, link) {
if (!(fiber->flags & FIBER_IS_SYSTEM))
fiber_cancel(fiber);
}
cord()->shutdown_fiber = fiber();
while (cord()->client_fiber_count != 0)
fiber_yield();
cord()->shutdown_fiber = NULL;
}
12 changes: 12 additions & 0 deletions src/lib/core/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,10 @@ struct cord {
struct fiber *main_fiber;
/** An event triggered to cancel cord main fiber. */
ev_async cancel_event;
/** Number of alive client (non system) fibers. */
int client_fiber_count;
/** Fiber calling fiber_shutdown. NULL if there is no such. */
struct fiber *shutdown_fiber;
};

extern __thread struct cord *cord_ptr;
Expand Down Expand Up @@ -1243,6 +1247,14 @@ fiber_check_gc(void);
struct lua_State *
fiber_lua_state(struct fiber *f);

/** Change whether fiber is system or not. */
void
fiber_set_system(struct fiber *f, bool yesno);

/** Cancel all client (non system) fibers and wait until they finished. */
void
fiber_shutdown(void);

#if defined(__cplusplus)
} /* extern "C" */

Expand Down
14 changes: 13 additions & 1 deletion src/lib/core/fiber_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ fiber_pool_f(va_list ap)
f->caller->flags |= FIBER_IS_READY;
assert(f->caller->caller == &cord->sched);
}
fiber_set_system(fiber(), false);
cmsg_deliver(msg);
fiber_set_system(fiber(), true);
fiber_check_gc();
/*
* Normally fibers die after their function
Expand Down Expand Up @@ -131,7 +133,17 @@ fiber_pool_cb(ev_loop *loop, struct ev_watcher *watcher, int events)
f = rlist_shift_entry(&pool->idle, struct fiber, state);
fiber_call(f);
} else if (pool->size < pool->max_size) {
f = fiber_new(cord_name(cord()), fiber_pool_f);
/*
* We don't want fibers to be cancellable by client
* while they are in the pool. However system flag is
* reset during processing message from pool endpoint
* so that fiber is made cancellable back.
*
* If some message processing should not be cancellable
* by client then it can just set system flag during
* it's execution.
*/
f = fiber_new_system(cord_name(cord()), fiber_pool_f);
if (f == NULL) {
diag_log();
break;
Expand Down
12 changes: 12 additions & 0 deletions src/lua/fiber.c
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,17 @@ lbox_fiber_stall(struct lua_State *L)
return 0;
}

/** Make fiber system. Takes the fiber as a single argument. */
static int
lbox_fiber_set_system(struct lua_State *L)
{
if (lua_gettop(L) != 1)
luaL_error(L, "fiber.set_system(id): bad arguments");
struct fiber *fiber = lbox_checkfiber(L, 1);
fiber_set_system(fiber, true);
return 0;
}

/** Helper for fiber slice parsing. */
static struct fiber_slice
lbox_fiber_slice_parse(struct lua_State *L, int idx)
Expand Down Expand Up @@ -1018,6 +1029,7 @@ static const struct luaL_Reg fiberlib[] = {
{"extend_slice", lbox_fiber_extend_slice},
/* Internal functions, to hide in fiber.lua. */
{"stall", lbox_fiber_stall},
{"set_system", lbox_fiber_set_system},
{NULL, NULL}
};

Expand Down
Loading

0 comments on commit bf62065

Please sign in to comment.