WIP: use atoms to speed up hash lookup of shared cursors.

This commit is contained in:
Gregory Burd 2013-04-24 10:15:46 -04:00
parent 54c5158490
commit 8772d39663
3 changed files with 44 additions and 49 deletions

View file

@ -51,7 +51,7 @@ static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE;
/* Generators for 'cursors' a named, type-specific hash table functions. */ /* Generators for 'cursors' a named, type-specific hash table functions. */
KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); KHASH_INIT(cursors, ERL_NIF_TERM, WT_CURSOR*, 1, kh_int_hash_func, enif_is_identical);
/** /**
* We will have exactly one (1) WterlCtx for each async worker thread. As * We will have exactly one (1) WterlCtx for each async worker thread. As
@ -305,11 +305,9 @@ __close_all_sessions(WterlConnHandle *conn_handle)
for (itr = kh_begin(h); itr != kh_end(h); ++itr) { for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) { if (kh_exist(h, itr)) {
WT_CURSOR *cursor = kh_val(h, itr); WT_CURSOR *cursor = kh_val(h, itr);
char *key = (char *)kh_key(h, itr);
cursor->close(cursor); cursor->close(cursor);
kh_del(cursors, h, itr);
enif_free(key);
kh_value(h, itr) = NULL; kh_value(h, itr) = NULL;
kh_del(cursors, h, itr);
} }
} }
kh_destroy(cursors, h); kh_destroy(cursors, h);
@ -325,7 +323,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
* Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex)
*/ */
void void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) __close_cursors_on(WterlConnHandle *conn_handle, ERL_NIF_TERM key)
{ {
int i; int i;
@ -333,14 +331,12 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
WterlCtx *ctx = &conn_handle->contexts[i]; WterlCtx *ctx = &conn_handle->contexts[i];
if (ctx->session != NULL) { if (ctx->session != NULL) {
khash_t(cursors) *h = ctx->cursors; khash_t(cursors) *h = ctx->cursors;
khiter_t itr = kh_get(cursors, h, (char *)uri); khiter_t itr = kh_get(cursors, h, key);
if (itr != kh_end(h)) { if (itr != kh_end(h)) {
WT_CURSOR *cursor = kh_value(h, itr); WT_CURSOR *cursor = kh_value(h, itr);
char *key = (char *)kh_key(h, itr);
cursor->close(cursor); cursor->close(cursor);
kh_del(cursors, h, itr);
enif_free(key);
kh_value(h, itr) = NULL; kh_value(h, itr) = NULL;
kh_del(cursors, h, itr);
} }
} }
} }
@ -368,7 +364,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
} }
h = ctx->cursors; h = ctx->cursors;
itr = kh_get(cursors, h, (char *)uri); itr = kh_get(cursors, h, key);
if (itr != kh_end(h)) { if (itr != kh_end(h)) {
// key exists in hash table, retrieve it // key exists in hash table, retrieve it
*cursor = (WT_CURSOR*)kh_value(h, itr); *cursor = (WT_CURSOR*)kh_value(h, itr);
@ -570,8 +566,8 @@ ASYNC_NIF_DECL(
itr = kh_get(conns, h, conn); itr = kh_get(conns, h, conn);
if (itr != kh_end(h)) { if (itr != kh_end(h)) {
/* key exists in table (as expected) delete it */ /* key exists in table (as expected) delete it */
kh_del(conns, h, itr);
kh_value(h, itr) = NULL; kh_value(h, itr) = NULL;
kh_del(conns, h, itr);
} }
enif_mutex_unlock(args->priv->conns_mutex); enif_mutex_unlock(args->priv->conns_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex); enif_mutex_unlock(args->conn_handle->contexts_mutex);
@ -2175,11 +2171,9 @@ on_unload(ErlNifEnv *env, void *priv_data)
for (itr_cursors = kh_begin(h); itr_cursors != kh_end(h); ++itr_cursors) { for (itr_cursors = kh_begin(h); itr_cursors != kh_end(h); ++itr_cursors) {
if (kh_exist(h, itr_cursors)) { if (kh_exist(h, itr_cursors)) {
WT_CURSOR *cursor = kh_val(h, itr_cursors); WT_CURSOR *cursor = kh_val(h, itr_cursors);
char *key = (char *)kh_key(h, itr_cursors);
cursor->close(cursor); cursor->close(cursor);
kh_del(cursors, h, itr_cursors);
enif_free(key);
kh_value(h, itr_cursors) = NULL; kh_value(h, itr_cursors) = NULL;
kh_del(cursors, h, itr_cursors);
} }
} }
kh_destroy(cursors, h); kh_destroy(cursors, h);

View file

@ -50,7 +50,7 @@
%%-define(CAPABILITIES, [async_fold, indexes]). %%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(), -record(state, {table :: atom(), % Atoms aka int's are easier to hash
type :: string(), type :: string(),
connection :: wterl:connection(), connection :: wterl:connection(),
is_empty_cursor :: wterl:cursor(), is_empty_cursor :: wterl:cursor(),
@ -107,7 +107,7 @@ start(Partition, Config) ->
"lsm" "lsm"
end, end,
{ok, Connection} = establish_connection(Config, Type), {ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition), TableUri = Type ++ ":" ++ integer_to_list(Partition),
Compressor = Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C]; {block_compressor, "snappy"}=C -> [C];
@ -133,7 +133,8 @@ start(Partition, Config) ->
"table" -> "table" ->
Compressor Compressor
end, end,
case wterl:create(Connection, Table, TableOpts) of Table = list_to_atom(TableUri),
case wterl:create(Connection, TableUri, TableOpts) of
ok -> ok ->
case establish_utility_cursors(Connection, Table) of case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} -> {ok, IsEmptyCursor, StatusCursor} ->

View file

@ -134,71 +134,71 @@ connection_close(ConnRef) ->
conn_close_nif(_AsyncRef, _ConnRef) -> conn_close_nif(_AsyncRef, _ConnRef) ->
?nif_stub. ?nif_stub.
-spec create(connection(), string()) -> ok | {error, term()}. -spec create(connection(), atom()) -> ok | {error, term()}.
-spec create(connection(), string(), config_list()) -> ok | {error, term()}. -spec create(connection(), atom(), config_list()) -> ok | {error, term()}.
create(Ref, Name) -> create(Ref, Name) ->
create(Ref, Name, []). create(Ref, Name, []).
create(Ref, Name, Config) -> create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -spec create_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config) -> create_nif(_AsyncNif, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec drop(connection(), string()) -> ok | {error, term()}. -spec drop(connection(), atom()) -> ok | {error, term()}.
-spec drop(connection(), string(), config_list()) -> ok | {error, term()}. -spec drop(connection(), atom(), config_list()) -> ok | {error, term()}.
drop(Ref, Name) -> drop(Ref, Name) ->
drop(Ref, Name, [{force, true}]). drop(Ref, Name, [{force, true}]).
drop(Ref, Name, Config) -> drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -spec drop_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _Ref, _Name, _Config) -> drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec delete(connection(), string(), key()) -> ok | {error, term()}. -spec delete(connection(), atom(), key()) -> ok | {error, term()}.
delete(Ref, Table, Key) -> delete(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]). ?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]).
-spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}. -spec delete_nif(reference(), connection(), atom(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _Ref, _Table, _Key) -> delete_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub. ?nif_stub.
-spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. -spec get(connection(), atom(), key()) -> {ok, value()} | not_found | {error, term()}.
get(Ref, Table, Key) -> get(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]). ?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]).
-spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. -spec get_nif(reference(), connection(), atom(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _Ref, _Table, _Key) -> get_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub. ?nif_stub.
-spec put(connection(), string(), key(), value()) -> ok | {error, term()}. -spec put(connection(), atom(), key(), value()) -> ok | {error, term()}.
put(Ref, Table, Key, Value) -> put(Ref, Table, Key, Value) ->
?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]). ?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]).
-spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}. -spec put_nif(reference(), connection(), atom(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) -> put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
?nif_stub. ?nif_stub.
-spec rename(connection(), string(), string()) -> ok | {error, term()}. -spec rename(connection(), atom(), string()) -> ok | {error, term()}.
-spec rename(connection(), string(), string(), config_list()) -> ok | {error, term()}. -spec rename(connection(), atom(), string(), config_list()) -> ok | {error, term()}.
rename(Ref, OldName, NewName) -> rename(Ref, OldName, NewName) ->
rename(Ref, OldName, NewName, []). rename(Ref, OldName, NewName, []).
rename(Ref, OldName, NewName, Config) -> rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]).
-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}. -spec rename_nif(reference(), connection(), atom(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) -> rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
?nif_stub. ?nif_stub.
-spec salvage(connection(), string()) -> ok | {error, term()}. -spec salvage(connection(), atom()) -> ok | {error, term()}.
-spec salvage(connection(), string(), config_list()) -> ok | {error, term()}. -spec salvage(connection(), atom(), config_list()) -> ok | {error, term()}.
salvage(Ref, Name) -> salvage(Ref, Name) ->
salvage(Ref, Name, []). salvage(Ref, Name, []).
salvage(Ref, Name, Config) -> salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -spec salvage_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _Ref, _Name, _Config) -> salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
@ -213,10 +213,10 @@ checkpoint(Ref, Config) ->
checkpoint_nif(_AsyncRef, _Ref, _Config) -> checkpoint_nif(_AsyncRef, _Ref, _Config) ->
?nif_stub. ?nif_stub.
-spec truncate(connection(), string()) -> ok | {error, term()}. -spec truncate(connection(), atom()) -> ok | {error, term()}.
-spec truncate(connection(), string(), config_list()) -> ok | {error, term()}. -spec truncate(connection(), atom(), config_list()) -> ok | {error, term()}.
-spec truncate(connection(), string(), binary() | first, binary() | last) -> ok | {error, term()}. -spec truncate(connection(), atom(), binary() | first, binary() | last) -> ok | {error, term()}.
-spec truncate(connection(), string(), binary() | first, binary() | last, config()) -> ok | {error, term()}. -spec truncate(connection(), atom(), binary() | first, binary() | last, config()) -> ok | {error, term()}.
truncate(Ref, Name) -> truncate(Ref, Name) ->
truncate(Ref, Name, first, last, []). truncate(Ref, Name, first, last, []).
truncate(Ref, Name, Config) -> truncate(Ref, Name, Config) ->
@ -226,40 +226,40 @@ truncate(Ref, Name, Start, Stop) ->
truncate(Ref, Name, Start, Stop, Config) -> truncate(Ref, Name, Start, Stop, Config) ->
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]).
-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}. -spec truncate_nif(reference(), connection(), atom(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) -> truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
?nif_stub. ?nif_stub.
-spec upgrade(connection(), string()) -> ok | {error, term()}. -spec upgrade(connection(), atom()) -> ok | {error, term()}.
-spec upgrade(connection(), string(), config_list()) -> ok | {error, term()}. -spec upgrade(connection(), atom(), config_list()) -> ok | {error, term()}.
upgrade(Ref, Name) -> upgrade(Ref, Name) ->
upgrade(Ref, Name, []). upgrade(Ref, Name, []).
upgrade(Ref, Name, Config) -> upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -spec upgrade_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _Ref, _Name, _Config) -> upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec verify(connection(), string()) -> ok | {error, term()}. -spec verify(connection(), atom()) -> ok | {error, term()}.
-spec verify(connection(), string(), config_list()) -> ok | {error, term()}. -spec verify(connection(), atom(), config_list()) -> ok | {error, term()}.
verify(Ref, Name) -> verify(Ref, Name) ->
verify(Ref, Name, []). verify(Ref, Name, []).
verify(Ref, Name, Config) -> verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -spec verify_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _Ref, _Name, _Config) -> verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}. -spec cursor_open(connection(), atom()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}. -spec cursor_open(connection(), atom(), config_list()) -> {ok, cursor()} | {error, term()}.
cursor_open(Ref, Table) -> cursor_open(Ref, Table) ->
cursor_open(Ref, Table, []). cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) -> cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}. -spec cursor_open_nif(reference(), connection(), atom(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) -> cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) ->
?nif_stub. ?nif_stub.