Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9

Merged
gburd merged 30 commits from gsb-ctx-cache into master 2013-07-03 12:31:15 +00:00
3 changed files with 59 additions and 55 deletions
Showing only changes of commit 0f180a6531 - Show all commits

View file

@ -46,8 +46,8 @@ struct wterl_ctx {
uint64_t sig; uint64_t sig;
size_t sig_len; size_t sig_len;
WT_SESSION *session; WT_SESSION *session;
const char *session_config;
uint32_t num_cursors; uint32_t num_cursors;
const char *session_config;
struct cursor_info { struct cursor_info {
const char *uri; const char *uri;
const char *config; const char *config;
@ -191,13 +191,16 @@ static inline uint32_t __log2(uint64_t x) {
static int static int
__ctx_cache_evict(WterlConnHandle *conn_handle) __ctx_cache_evict(WterlConnHandle *conn_handle)
{ {
uint32_t mean, log, num_evicted, i; uint32_t num_evicted = 0;
uint64_t now, elapsed; struct wterl_ctx *c;
struct wterl_ctx *c, *n;
if (conn_handle->cache_size < MAX_CACHE_SIZE) if (conn_handle->cache_size < MAX_CACHE_SIZE)
return 0; return 0;
#if 0 // TODO: fixme once stats work again
uint32_t mean, log, num_evicted, i;
uint64_t now, elapsed;
struct wterl_ctx *c, *n;
now = cpu_clock_ticks(); now = cpu_clock_ticks();
// Find the mean of the recorded times that items stayed in cache. // Find the mean of the recorded times that items stayed in cache.
@ -230,6 +233,16 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
} }
c = n; c = n;
} }
#else
c = STAILQ_FIRST(&conn_handle->cache);
if (c) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
DPRINTF("evicting: %llu", PRIuint64(c->sig));
c->session->close(c->session, NULL);
enif_free(c);
num_evicted++;
}
#endif
conn_handle->cache_size -= num_evicted; conn_handle->cache_size -= num_evicted;
return num_evicted; return num_evicted;
} }
@ -246,22 +259,20 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
static struct wterl_ctx * static struct wterl_ctx *
__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
{ {
struct wterl_ctx *c, *n; struct wterl_ctx *c;
enif_mutex_lock(conn_handle->cache_mutex); enif_mutex_lock(conn_handle->cache_mutex);
c = STAILQ_FIRST(&conn_handle->cache); c = STAILQ_FIRST(&conn_handle->cache);
while (c != NULL) { while (c != NULL) {
n = STAILQ_NEXT(c, entries);
if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs
// cache hit: // cache hit:
STAILQ_REMOVE_HEAD(&conn_handle->cache, entries); STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
conn_handle->histogram_count++; conn_handle->histogram_count++;
conn_handle->cache_size -= 1; conn_handle->cache_size -= 1;
break; break;
} else {
c = n;
} }
c = STAILQ_NEXT(c, entries);
} }
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);
return c; return c;
@ -369,7 +380,6 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
*ctx = NULL; *ctx = NULL;
do { do {
WMB_NEAR_CAS();
c = conn_handle->mru_ctx[worker_id]; c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) {
if (c == 0) { if (c == 0) {
@ -457,7 +467,6 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
cursor->reset(cursor); cursor->reset(cursor);
} }
WMB_NEAR_CAS();
c = conn_handle->mru_ctx[worker_id]; c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) { if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) {
__ctx_cache_add(conn_handle, ctx); __ctx_cache_add(conn_handle, ctx);
@ -486,7 +495,6 @@ __close_all_sessions(WterlConnHandle *conn_handle)
// clear out the mru // clear out the mru
for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) {
do { do {
WMB_NEAR_CAS();
c = conn_handle->mru_ctx[worker_id]; c = conn_handle->mru_ctx[worker_id];
} while(CASPO(&conn_handle->mru_ctx[worker_id], c, 0) != c); } while(CASPO(&conn_handle->mru_ctx[worker_id], c, 0) != c);
@ -521,7 +529,6 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
// walk the mru first, look for open cursors on matching uri // walk the mru first, look for open cursors on matching uri
for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) {
WMB_NEAR_CAS();
c = conn_handle->mru_ctx[worker_id]; c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c && c != 0) { if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c && c != 0) {
cnt = c->num_cursors; cnt = c->num_cursors;
@ -691,6 +698,7 @@ __strerror_term(ErlNifEnv* env, int rc)
and/or may be localized to any given language (i18n). Use the errno and/or may be localized to any given language (i18n). Use the errno
atom rather than the message when matching in Erlang. You've been atom rather than the message when matching in Erlang. You've been
warned. */ warned. */
DPRINTF("error: %s", erl_errno_id(rc));
return enif_make_tuple2(env, ATOM_ERROR, return enif_make_tuple2(env, ATOM_ERROR,
enif_make_tuple2(env, enif_make_tuple2(env,
enif_make_atom(env, erl_errno_id(rc)), enif_make_atom(env, erl_errno_id(rc)),
@ -1861,6 +1869,7 @@ ASYNC_NIF_DECL(
WT_CURSOR* cursor = args->cursor_handle->cursor; WT_CURSOR* cursor = args->cursor_handle->cursor;
ASYNC_NIF_REPLY(__cursor_value_ret(env, cursor, cursor->next(cursor))); ASYNC_NIF_REPLY(__cursor_value_ret(env, cursor, cursor->next(cursor)));
DPRINTF("env: %p cursor: %p", env, cursor);
}, },
{ // post { // post

View file

@ -52,9 +52,7 @@
-record(state, {table :: string(), -record(state, {table :: string(),
type :: string(), type :: string(),
connection :: wterl:connection(), connection :: wterl:connection()}).
is_empty_cursor :: wterl:cursor(),
status_cursor :: wterl:cursor()}).
-type state() :: #state{}. -type state() :: #state{}.
-type config() :: [{atom(), term()}]. -type config() :: [{atom(), term()}].
@ -135,15 +133,8 @@ start(Partition, Config) ->
end, end,
case wterl:create(Connection, Table, TableOpts) of case wterl:create(Connection, Table, TableOpts) of
ok -> ok ->
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->
{ok, #state{table=Table, type=Type, {ok, #state{table=Table, type=Type,
connection=Connection, connection=Connection}};
is_empty_cursor=IsEmptyCursor,
status_cursor=StatusCursor}};
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason3} -> {error, Reason3} ->
{error, Reason3} {error, Reason3}
end end
@ -329,18 +320,30 @@ drop(#state{connection=Connection, table=Table}=State) ->
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{is_empty_cursor=Cursor}) -> is_empty(#state{connection=Connection, table=Table}) ->
wterl:cursor_reset(Cursor), case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
IsEmpty =
case wterl:cursor_next(Cursor) of case wterl:cursor_next(Cursor) of
not_found -> true; not_found ->
{error, {eperm, _}} -> false; % TODO: review/fix this logic true;
_ -> false {error, {eperm, _}} ->
false; % TODO: review/fix this logic
_ ->
false
end,
wterl:cursor_close(Cursor),
IsEmpty;
{error, Reason2} ->
{error, Reason2}
end. end.
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{status_cursor=Cursor}) -> status(#state{connection=Connection, table=Table}) ->
wterl:cursor_reset(Cursor), case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
TheStats =
case fetch_status(Cursor) of case fetch_status(Cursor) of
{ok, Stats} -> {ok, Stats} ->
Stats; Stats;
@ -348,6 +351,11 @@ status(#state{status_cursor=Cursor}) ->
{ok, []}; {ok, []};
_ -> _ ->
{ok, []} {ok, []}
end,
wterl:cursor_close(Cursor),
TheStats;
{error, Reason2} ->
{error, Reason2}
end. end.
%% @doc Register an asynchronous callback %% @doc Register an asynchronous callback
@ -373,20 +381,6 @@ max_sessions(Config) ->
false -> Est false -> Est
end. end.
%% @private
establish_utility_cursors(Connection, Table) ->
case wterl:cursor_open(Connection, Table) of
{ok, IsEmptyCursor} ->
case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
{ok, StatusCursor} ->
{ok, IsEmptyCursor, StatusCursor};
{error, Reason1} ->
{error, Reason1}
end;
{error, Reason2} ->
{error, Reason2}
end.
%% @private %% @private
establish_connection(Config, Type) -> establish_connection(Config, Type) ->
%% Get the data root directory %% Get the data root directory

View file

@ -95,6 +95,7 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}. -spec init() -> ok | {error, any()}.
init() -> init() ->
Module <- [wterl, wterl_conn, wterl_app, wterl_sup, wterl_ets, riak_kv_wiredtiger_backend, temp_riak_kv_backend]],
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "53307e8"}, [{wterl_vsn, "53307e8"},
{wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]). {wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).