Compare commits

...

1 commit

Author SHA1 Message Date
Gregory Burd
8772d39663 WIP: use atoms to speed up hash lookup of shared cursors. 2013-04-24 10:15:46 -04:00
3 changed files with 44 additions and 49 deletions

View file

@ -51,7 +51,7 @@ static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE;
/* Generators for 'cursors' a named, type-specific hash table functions. */
KHASH_MAP_INIT_STR(cursors, WT_CURSOR*);
KHASH_INIT(cursors, ERL_NIF_TERM, WT_CURSOR*, 1, kh_int_hash_func, enif_is_identical);
/**
* We will have exactly one (1) WterlCtx for each async worker thread. As
@ -305,11 +305,9 @@ __close_all_sessions(WterlConnHandle *conn_handle)
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WT_CURSOR *cursor = kh_val(h, itr);
char *key = (char *)kh_key(h, itr);
cursor->close(cursor);
kh_del(cursors, h, itr);
enif_free(key);
kh_value(h, itr) = NULL;
kh_del(cursors, h, itr);
}
}
kh_destroy(cursors, h);
@ -325,7 +323,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
* Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex)
*/
void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
__close_cursors_on(WterlConnHandle *conn_handle, ERL_NIF_TERM key)
{
int i;
@ -333,14 +331,12 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
WterlCtx *ctx = &conn_handle->contexts[i];
if (ctx->session != NULL) {
khash_t(cursors) *h = ctx->cursors;
khiter_t itr = kh_get(cursors, h, (char *)uri);
khiter_t itr = kh_get(cursors, h, key);
if (itr != kh_end(h)) {
WT_CURSOR *cursor = kh_value(h, itr);
char *key = (char *)kh_key(h, itr);
cursor->close(cursor);
kh_del(cursors, h, itr);
enif_free(key);
kh_value(h, itr) = NULL;
kh_del(cursors, h, itr);
}
}
}
@ -368,7 +364,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
}
h = ctx->cursors;
itr = kh_get(cursors, h, (char *)uri);
itr = kh_get(cursors, h, key);
if (itr != kh_end(h)) {
// key exists in hash table, retrieve it
*cursor = (WT_CURSOR*)kh_value(h, itr);
@ -570,8 +566,8 @@ ASYNC_NIF_DECL(
itr = kh_get(conns, h, conn);
if (itr != kh_end(h)) {
/* key exists in table (as expected) delete it */
kh_del(conns, h, itr);
kh_value(h, itr) = NULL;
kh_del(conns, h, itr);
}
enif_mutex_unlock(args->priv->conns_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
@ -2175,11 +2171,9 @@ on_unload(ErlNifEnv *env, void *priv_data)
for (itr_cursors = kh_begin(h); itr_cursors != kh_end(h); ++itr_cursors) {
if (kh_exist(h, itr_cursors)) {
WT_CURSOR *cursor = kh_val(h, itr_cursors);
char *key = (char *)kh_key(h, itr_cursors);
cursor->close(cursor);
kh_del(cursors, h, itr_cursors);
enif_free(key);
kh_value(h, itr_cursors) = NULL;
kh_del(cursors, h, itr_cursors);
}
}
kh_destroy(cursors, h);

View file

@ -50,7 +50,7 @@
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(),
-record(state, {table :: atom(), % Atoms aka int's are easier to hash
type :: string(),
connection :: wterl:connection(),
is_empty_cursor :: wterl:cursor(),
@ -107,7 +107,7 @@ start(Partition, Config) ->
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition),
TableUri = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C];
@ -133,7 +133,8 @@ start(Partition, Config) ->
"table" ->
Compressor
end,
case wterl:create(Connection, Table, TableOpts) of
Table = list_to_atom(TableUri),
case wterl:create(Connection, TableUri, TableOpts) of
ok ->
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->

View file

@ -134,71 +134,71 @@ connection_close(ConnRef) ->
conn_close_nif(_AsyncRef, _ConnRef) ->
?nif_stub.
-spec create(connection(), string()) -> ok | {error, term()}.
-spec create(connection(), string(), config_list()) -> ok | {error, term()}.
-spec create(connection(), atom()) -> ok | {error, term()}.
-spec create(connection(), atom(), config_list()) -> ok | {error, term()}.
create(Ref, Name) ->
create(Ref, Name, []).
create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec create_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config) ->
?nif_stub.
-spec drop(connection(), string()) -> ok | {error, term()}.
-spec drop(connection(), string(), config_list()) -> ok | {error, term()}.
-spec drop(connection(), atom()) -> ok | {error, term()}.
-spec drop(connection(), atom(), config_list()) -> ok | {error, term()}.
drop(Ref, Name) ->
drop(Ref, Name, [{force, true}]).
drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec drop_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec delete(connection(), string(), key()) -> ok | {error, term()}.
-spec delete(connection(), atom(), key()) -> ok | {error, term()}.
delete(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]).
-spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}.
-spec delete_nif(reference(), connection(), atom(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub.
-spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
-spec get(connection(), atom(), key()) -> {ok, value()} | not_found | {error, term()}.
get(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]).
-spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
-spec get_nif(reference(), connection(), atom(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub.
-spec put(connection(), string(), key(), value()) -> ok | {error, term()}.
-spec put(connection(), atom(), key(), value()) -> ok | {error, term()}.
put(Ref, Table, Key, Value) ->
?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]).
-spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}.
-spec put_nif(reference(), connection(), atom(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
?nif_stub.
-spec rename(connection(), string(), string()) -> ok | {error, term()}.
-spec rename(connection(), string(), string(), config_list()) -> ok | {error, term()}.
-spec rename(connection(), atom(), string()) -> ok | {error, term()}.
-spec rename(connection(), atom(), string(), config_list()) -> ok | {error, term()}.
rename(Ref, OldName, NewName) ->
rename(Ref, OldName, NewName, []).
rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]).
-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}.
-spec rename_nif(reference(), connection(), atom(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
?nif_stub.
-spec salvage(connection(), string()) -> ok | {error, term()}.
-spec salvage(connection(), string(), config_list()) -> ok | {error, term()}.
-spec salvage(connection(), atom()) -> ok | {error, term()}.
-spec salvage(connection(), atom(), config_list()) -> ok | {error, term()}.
salvage(Ref, Name) ->
salvage(Ref, Name, []).
salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec salvage_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
@ -213,10 +213,10 @@ checkpoint(Ref, Config) ->
checkpoint_nif(_AsyncRef, _Ref, _Config) ->
?nif_stub.
-spec truncate(connection(), string()) -> ok | {error, term()}.
-spec truncate(connection(), string(), config_list()) -> ok | {error, term()}.
-spec truncate(connection(), string(), binary() | first, binary() | last) -> ok | {error, term()}.
-spec truncate(connection(), string(), binary() | first, binary() | last, config()) -> ok | {error, term()}.
-spec truncate(connection(), atom()) -> ok | {error, term()}.
-spec truncate(connection(), atom(), config_list()) -> ok | {error, term()}.
-spec truncate(connection(), atom(), binary() | first, binary() | last) -> ok | {error, term()}.
-spec truncate(connection(), atom(), binary() | first, binary() | last, config()) -> ok | {error, term()}.
truncate(Ref, Name) ->
truncate(Ref, Name, first, last, []).
truncate(Ref, Name, Config) ->
@ -226,40 +226,40 @@ truncate(Ref, Name, Start, Stop) ->
truncate(Ref, Name, Start, Stop, Config) ->
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]).
-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
-spec truncate_nif(reference(), connection(), atom(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
?nif_stub.
-spec upgrade(connection(), string()) -> ok | {error, term()}.
-spec upgrade(connection(), string(), config_list()) -> ok | {error, term()}.
-spec upgrade(connection(), atom()) -> ok | {error, term()}.
-spec upgrade(connection(), atom(), config_list()) -> ok | {error, term()}.
upgrade(Ref, Name) ->
upgrade(Ref, Name, []).
upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec upgrade_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec verify(connection(), string()) -> ok | {error, term()}.
-spec verify(connection(), string(), config_list()) -> ok | {error, term()}.
-spec verify(connection(), atom()) -> ok | {error, term()}.
-spec verify(connection(), atom(), config_list()) -> ok | {error, term()}.
verify(Ref, Name) ->
verify(Ref, Name, []).
verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec verify_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), atom()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), atom(), config_list()) -> {ok, cursor()} | {error, term()}.
cursor_open(Ref, Table) ->
cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open_nif(reference(), connection(), atom(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) ->
?nif_stub.