diff --git a/c_src/wterl.c b/c_src/wterl.c index ffa24c6..824224d 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -46,8 +46,8 @@ struct wterl_ctx { uint64_t sig; size_t sig_len; WT_SESSION *session; - const char *session_config; uint32_t num_cursors; + const char *session_config; struct cursor_info { const char *uri; const char *config; @@ -127,7 +127,7 @@ __str_hash(uint32_t in, const char *p, size_t len) { uint32_t h = in; for (++p ; len > 0; ++p, --len) - h += (h << 5) + (h >> 27) + *p; + h += (h << 5) + (h >> 27) + *p; return h; } @@ -191,13 +191,16 @@ static inline uint32_t __log2(uint64_t x) { static int __ctx_cache_evict(WterlConnHandle *conn_handle) { - uint32_t mean, log, num_evicted, i; - uint64_t now, elapsed; - struct wterl_ctx *c, *n; + uint32_t num_evicted = 0; + struct wterl_ctx *c; if (conn_handle->cache_size < MAX_CACHE_SIZE) return 0; +#if 0 // TODO: fixme once stats work again + uint32_t mean, log, num_evicted, i; + uint64_t now, elapsed; + struct wterl_ctx *c, *n; now = cpu_clock_ticks(); // Find the mean of the recorded times that items stayed in cache. @@ -230,6 +233,16 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) } c = n; } +#else + c = STAILQ_FIRST(&conn_handle->cache); + if (c) { + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + DPRINTF("evicting: %llu", PRIuint64(c->sig)); + c->session->close(c->session, NULL); + enif_free(c); + num_evicted++; + } +#endif conn_handle->cache_size -= num_evicted; return num_evicted; } @@ -246,22 +259,20 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) static struct wterl_ctx * __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) { - struct wterl_ctx *c, *n; + struct wterl_ctx *c; enif_mutex_lock(conn_handle->cache_mutex); c = STAILQ_FIRST(&conn_handle->cache); while (c != NULL) { - n = STAILQ_NEXT(c, entries); if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs // cache hit: - STAILQ_REMOVE_HEAD(&conn_handle->cache, entries); + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; conn_handle->histogram_count++; conn_handle->cache_size -= 1; break; - } else { - c = n; } + c = STAILQ_NEXT(c, entries); } enif_mutex_unlock(conn_handle->cache_mutex); return c; @@ -369,7 +380,6 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, *ctx = NULL; do { - WMB_NEAR_CAS(); c = conn_handle->mru_ctx[worker_id]; if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { if (c == 0) { @@ -457,7 +467,6 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx cursor->reset(cursor); } - WMB_NEAR_CAS(); c = conn_handle->mru_ctx[worker_id]; if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) { __ctx_cache_add(conn_handle, ctx); @@ -486,7 +495,6 @@ __close_all_sessions(WterlConnHandle *conn_handle) // clear out the mru for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { do { - WMB_NEAR_CAS(); c = conn_handle->mru_ctx[worker_id]; } while(CASPO(&conn_handle->mru_ctx[worker_id], c, 0) != c); @@ -521,7 +529,6 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // walk the mru first, look for open cursors on matching uri for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { - WMB_NEAR_CAS(); c = conn_handle->mru_ctx[worker_id]; if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c && c != 0) { cnt = c->num_cursors; @@ -691,6 +698,7 @@ __strerror_term(ErlNifEnv* env, int rc) and/or may be localized to any given language (i18n). Use the errno atom rather than the message when matching in Erlang. You've been warned. */ + DPRINTF("error: %s", erl_errno_id(rc)); return enif_make_tuple2(env, ATOM_ERROR, enif_make_tuple2(env, enif_make_atom(env, erl_errno_id(rc)), @@ -1861,6 +1869,7 @@ ASYNC_NIF_DECL( WT_CURSOR* cursor = args->cursor_handle->cursor; ASYNC_NIF_REPLY(__cursor_value_ret(env, cursor, cursor->next(cursor))); + DPRINTF("env: %p cursor: %p", env, cursor); }, { // post diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 313da29..94cf8bb 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -52,9 +52,7 @@ -record(state, {table :: string(), type :: string(), - connection :: wterl:connection(), - is_empty_cursor :: wterl:cursor(), - status_cursor :: wterl:cursor()}). + connection :: wterl:connection()}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -135,15 +133,8 @@ start(Partition, Config) -> end, case wterl:create(Connection, Table, TableOpts) of ok -> - case establish_utility_cursors(Connection, Table) of - {ok, IsEmptyCursor, StatusCursor} -> - {ok, #state{table=Table, type=Type, - connection=Connection, - is_empty_cursor=IsEmptyCursor, - status_cursor=StatusCursor}}; - {error, Reason2} -> - {error, Reason2} - end; + {ok, #state{table=Table, type=Type, + connection=Connection}}; {error, Reason3} -> {error, Reason3} end @@ -329,25 +320,42 @@ drop(#state{connection=Connection, table=Table}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{is_empty_cursor=Cursor}) -> - wterl:cursor_reset(Cursor), - case wterl:cursor_next(Cursor) of - not_found -> true; - {error, {eperm, _}} -> false; % TODO: review/fix this logic - _ -> false +is_empty(#state{connection=Connection, table=Table}) -> + case wterl:cursor_open(Connection, Table) of + {ok, Cursor} -> + IsEmpty = + case wterl:cursor_next(Cursor) of + not_found -> + true; + {error, {eperm, _}} -> + false; % TODO: review/fix this logic + _ -> + false + end, + wterl:cursor_close(Cursor), + IsEmpty; + {error, Reason2} -> + {error, Reason2} end. %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{status_cursor=Cursor}) -> - wterl:cursor_reset(Cursor), - case fetch_status(Cursor) of - {ok, Stats} -> - Stats; - {error, {eperm, _}} -> % TODO: review/fix this logic - {ok, []}; - _ -> - {ok, []} +status(#state{connection=Connection, table=Table}) -> + case wterl:cursor_open(Connection, Table) of + {ok, Cursor} -> + TheStats = + case fetch_status(Cursor) of + {ok, Stats} -> + Stats; + {error, {eperm, _}} -> % TODO: review/fix this logic + {ok, []}; + _ -> + {ok, []} + end, + wterl:cursor_close(Cursor), + TheStats; + {error, Reason2} -> + {error, Reason2} end. %% @doc Register an asynchronous callback @@ -373,20 +381,6 @@ max_sessions(Config) -> false -> Est end. -%% @private -establish_utility_cursors(Connection, Table) -> - case wterl:cursor_open(Connection, Table) of - {ok, IsEmptyCursor} -> - case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of - {ok, StatusCursor} -> - {ok, IsEmptyCursor, StatusCursor}; - {error, Reason1} -> - {error, Reason1} - end; - {error, Reason2} -> - {error, Reason2} - end. - %% @private establish_connection(Config, Type) -> %% Get the data root directory diff --git a/src/wterl.erl b/src/wterl.erl index 495be92..1da6d44 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -95,6 +95,7 @@ nif_stub_error(Line) -> -spec init() -> ok | {error, any()}. init() -> + Module <- [wterl, wterl_conn, wterl_app, wterl_sup, wterl_ets, riak_kv_wiredtiger_backend, temp_riak_kv_backend]], erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), [{wterl_vsn, "53307e8"}, {wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).