WIP -- tests failing -- WIP

Fixed numerous bugs.  Pushed config_to_bin down to just before
calling the _nif() function, everywhere else config is a proplist.
This commit is contained in:
Gregory Burd 2013-04-07 09:21:47 -04:00
parent 2675aa501a
commit 5ac006630e
4 changed files with 100 additions and 112 deletions

View file

@ -149,7 +149,7 @@ async_nif_worker_fn(void *arg)
struct async_nif_state *async_nif = wi->async_nif;
unsigned int worker_id = wi->worker_id;
free(wi); // Allocated when starting the thread, now no longer needed.
free(arg); // Allocated when starting the thread, now no longer needed.
/*
* Workers are active while there is work on the queue to do and
@ -222,7 +222,7 @@ static void async_nif_unload(ErlNifEnv *env)
enif_mutex_unlock(async_nif->req_mutex);
/* Join for the now exiting worker threads. */
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; ++i) {
for (i = 0; i < async_nif->num_workers; ++i) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
@ -303,7 +303,7 @@ async_nif_load(void)
wi->worker = &async_nif->worker_entries[i];
wi->worker_id = i;
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)&wi, NULL) != 0) {
&async_nif_worker_fn, (void*)wi, NULL) != 0) {
async_nif->shutdown = 1;
enif_cond_broadcast(async_nif->cnd);
enif_mutex_unlock(async_nif->worker_mutex);

View file

@ -154,43 +154,34 @@ ASYNC_NIF_DECL(
if (!(argc == 3 &&
enif_get_string(env, argv[0], args->homedir, sizeof args->homedir, ERL_NIF_LATIN1) &&
(enif_is_binary(env, argv[1]) || argv[1] == 0) &&
(enif_is_binary(env, argv[2]) || argv[2] == 0))) {
enif_is_binary(env, argv[1]) &&
enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG();
}
if (argv[1])
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
else
args->config = 0;
if (argv[2])
args->session_config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
else
args->session_config = 0;
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->session_config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
},
{ // work
WT_CONNECTION *conn;
ErlNifBinary config;
ErlNifBinary session_config;
if (args->config) {
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
}
if (args->session_config) {
if (!enif_inspect_binary(env, args->session_config, &session_config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
}
int rc = wiredtiger_open(args->homedir, NULL, args->config ? (const char*)config.data : NULL, &conn);
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
if (!enif_inspect_binary(env, args->session_config, &session_config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
fprintf(stderr, "c: %s\ns: %s\n", (char *)config.data, (char *)session_config.data); fflush(stderr);
int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn);
if (rc == 0) {
WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle));
conn_handle->conn = conn;
if (args->session_config)
conn_handle->session_config = (const char *)strndup((const char *)session_config.data, session_config.size);
if (session_config.data[0] != 0)
conn_handle->session_config = (const char *)strndup((const char *)session_config.data, session_config.size); // TODO: test for NULL, handle OOM
else
conn_handle->session_config = NULL;
conn_handle->num_contexts = 0;
@ -544,19 +535,15 @@ ASYNC_NIF_DECL(
if (!(argc == 5 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) &&
(enif_is_binary(env, argv[2]) || argv[2] == 0) &&
(enif_is_binary(env, argv[3]) || argv[3] == 0) &&
(enif_is_binary(env, argv[2]) || enif_is_atom(env, argv[2])) &&
(enif_is_binary(env, argv[3]) || enif_is_atom(env, argv[3])) &&
enif_is_binary(env, argv[4]))) {
ASYNC_NIF_RETURN_BADARG();
}
if (argv[2] != 0)
if (!enif_is_atom(env, argv[2]))
args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
else
args->start = 0;
if (argv[3] != 0)
if (!enif_is_atom(env, argv[3]))
args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]);
else
args->stop = 0;
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[4]);
enif_keep_resource((void*)args->conn_handle);
},
@ -586,7 +573,7 @@ ASYNC_NIF_DECL(
return;
}
WT_CURSOR *start = NULL;
if (args->start) {
if (!enif_is_atom(env, args->start)) {
rc = session->open_cursor(session, args->uri, NULL, "raw", &start);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -605,7 +592,7 @@ ASYNC_NIF_DECL(
return;
}
WT_CURSOR *stop = NULL;
if (args->stop) {
if (!enif_is_atom(env, args->stop)) {
rc = session->open_cursor(session, args->uri, NULL, "raw", &stop);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -968,20 +955,15 @@ ASYNC_NIF_DECL(
enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG();
}
if (argv[2] != 0)
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
else
args->config = 0;
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
enif_keep_resource((void*)args->conn_handle);
},
{ // work
ErlNifBinary config;
if (args->config) {
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
/* We create a separate session here to ensure that operations are thread safe. */
@ -994,8 +976,7 @@ ASYNC_NIF_DECL(
}
WT_CURSOR* cursor;
char *c = args->config ? (char *)config.data : "overwrite,raw";
rc = session->open_cursor(session, args->uri, NULL, c, &cursor);
rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
@ -1633,7 +1614,7 @@ static ErlNifFunc nif_funcs[] =
{"checkpoint_nif", 3, wterl_checkpoint},
{"conn_close_nif", 2, wterl_conn_close},
{"conn_open_nif", 4, wterl_conn_open},
{"create_nif", 5, wterl_create},
{"create_nif", 4, wterl_create},
{"delete_nif", 4, wterl_delete},
{"drop_nif", 4, wterl_drop},
{"get_nif", 4, wterl_get},

View file

@ -93,9 +93,10 @@ start(Partition, Config) ->
end,
case AppStart of
ok ->
%% TODO: on failure to open a table try to verify, and then salvage it
%% if the cluster size > the n value
Table = "lsm:wt" ++ integer_to_list(Partition),
%% TODO: open, create, or open/verify
%% on failure to open a table try to verify, and then salvage it
%% if the cluster size > the n value
{ok, Connection} = establish_connection(Config),
case wterl:cursor_open(Connection, Table) of
{ok, IsEmptyCursor} ->

View file

@ -40,7 +40,7 @@
checkpoint/1,
checkpoint/2,
create/2,
create/4,
create/3,
delete/3,
drop/2,
drop/3,
@ -58,7 +58,6 @@
verify/2,
verify/3,
config_value/3,
config_to_bin/1,
priv_dir/0,
fold_keys/3,
fold/3]).
@ -114,16 +113,7 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) ->
conn_open(HomeDir, [{extensions, SoPaths}] ++ ConnectionConfig, SessionConfig).
-spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
conn_open(HomeDir, ConnectionConfig, SessionConfig)
when is_list(ConnectionConfig), length(ConnectionConfig) =:= 0,
is_list(SessionConfig), length(SessionConfig) =:= 0 ->
?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir, 0, 0]);
conn_open(HomeDir, ConnectionConfig, SessionConfig)
when is_list(ConnectionConfig),
is_list(SessionConfig), length(SessionConfig) =:= 0 ->
?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir, config_to_bin(ConnectionConfig), 0]);
conn_open(HomeDir, ConnectionConfig, SessionConfig)
when is_list(ConnectionConfig), is_list(SessionConfig) ->
conn_open(HomeDir, ConnectionConfig, SessionConfig) ->
?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir,
config_to_bin(ConnectionConfig),
config_to_bin(SessionConfig)]).
@ -141,22 +131,22 @@ conn_close_nif(_AsyncRef, _ConnRef) ->
?nif_stub.
-spec create(connection(), string()) -> ok | {error, term()}.
-spec create(connection(), string(), config(), config()) -> ok | {error, term()}.
-spec create(connection(), string(), config_list()) -> ok | {error, term()}.
create(Ref, Name) ->
create(Ref, Name, ?EMPTY_CONFIG, ?EMPTY_CONFIG).
create(Ref, Name, Config, SessionConfig) ->
?ASYNC_NIF_CALL(fun create_nif/5, [Ref, Name, Config, SessionConfig]).
create(Ref, Name, []).
create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec create_nif(reference(), connection(), string(), config(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config, _SessionConfig) ->
-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config) ->
?nif_stub.
-spec drop(connection(), string()) -> ok | {error, term()}.
-spec drop(connection(), string(), config()) -> ok | {error, term()}.
-spec drop(connection(), string(), config_list()) -> ok | {error, term()}.
drop(Ref, Name) ->
drop(Ref, Name, ?EMPTY_CONFIG).
drop(Ref, Name, []).
drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, Config]).
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
@ -187,82 +177,82 @@ put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
?nif_stub.
-spec rename(connection(), string(), string()) -> ok | {error, term()}.
-spec rename(connection(), string(), string(), config()) -> ok | {error, term()}.
-spec rename(connection(), string(), string(), config_list()) -> ok | {error, term()}.
rename(Ref, OldName, NewName) ->
rename(Ref, OldName, NewName, ?EMPTY_CONFIG).
rename(Ref, OldName, NewName, []).
rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, Config]).
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]).
-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
?nif_stub.
-spec salvage(connection(), string()) -> ok | {error, term()}.
-spec salvage(connection(), string(), config()) -> ok | {error, term()}.
-spec salvage(connection(), string(), config_list()) -> ok | {error, term()}.
salvage(Ref, Name) ->
salvage(Ref, Name, ?EMPTY_CONFIG).
salvage(Ref, Name, []).
salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, Config]).
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec checkpoint(connection()) -> ok | {error, term()}.
-spec checkpoint(connection(), config()) -> ok | {error, term()}.
-spec checkpoint(connection(), config_list()) -> ok | {error, term()}.
checkpoint(_Ref) ->
checkpoint(_Ref, ?EMPTY_CONFIG).
checkpoint(_Ref, []).
checkpoint(Ref, Config) ->
?ASYNC_NIF_CALL(fun checkpoint_nif/3, [Ref, Config]).
?ASYNC_NIF_CALL(fun checkpoint_nif/3, [Ref, config_to_bin(Config)]).
-spec checkpoint_nif(reference(), connection(), config()) -> ok | {error, term()}.
checkpoint_nif(_AsyncRef, _Ref, _Config) ->
?nif_stub.
-spec truncate(connection(), string()) -> ok | {error, term()}.
-spec truncate(connection(), string(), config()) -> ok | {error, term()}.
truncate(Ref, Name, Config) ->
truncate(Ref, Name, 0, 0, Config).
-spec truncate(connection(), string(), cursor() | 0, cursor() | 0, config()) -> ok | {error, term()}.
-spec truncate(connection(), string(), config_list()) -> ok | {error, term()}.
-spec truncate(connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate(Ref, Name) ->
truncate(Ref, Name, 0, 0, ?EMPTY_CONFIG).
truncate(Ref, Name, first, last, []).
truncate(Ref, Name, Config) ->
truncate(Ref, Name, first, last, Config).
truncate(Ref, Name, Start, Stop, Config) ->
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, Config]).
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]).
-spec truncate_nif(reference(), connection(), string(), cursor() | 0, cursor() | 0, config()) -> ok | {error, term()}.
-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
?nif_stub.
-spec upgrade(connection(), string()) -> ok | {error, term()}.
-spec upgrade(connection(), string(), config()) -> ok | {error, term()}.
-spec upgrade(connection(), string(), config_list()) -> ok | {error, term()}.
upgrade(Ref, Name) ->
upgrade(Ref, Name, ?EMPTY_CONFIG).
upgrade(Ref, Name, []).
upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, Config]).
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec verify(connection(), string()) -> ok | {error, term()}.
-spec verify(connection(), string(), config()) -> ok | {error, term()}.
-spec verify(connection(), string(), config_list()) -> ok | {error, term()}.
verify(Ref, Name) ->
verify(Ref, Name, ?EMPTY_CONFIG).
verify(Ref, Name, []).
verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, Config]).
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), string(), config() | 0) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}.
cursor_open(Ref, Table) ->
cursor_open(Ref, Table, 0).
cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, Config]).
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
-spec cursor_open_nif(reference(), connection(), string(), config() | 0) -> {ok, cursor()} | {error, term()}.
-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) ->
?nif_stub.
@ -514,20 +504,21 @@ open_test_conn(DataDir) ->
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir),
?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))),
OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]),
OpenConfig = [{create,true},{cache_size,"100MB"}],
{ok, ConnRef} = connection_open(DataDir, OpenConfig),
ConnRef.
open_test_table(ConnRef) ->
?assertMatch(ok, drop(ConnRef, "table:test", config_to_bin([{force,true}]))),
?assertMatch(ok, create(ConnRef, "table:test", config_to_bin([{block_compressor, "snappy"}]))),
open_test_table(ConnRef, "table", []).
open_test_table(ConnRef, Type) ->
open_test_table(ConnRef, Type, []).
open_test_table(ConnRef, Type, Opts) ->
?assertMatch(ok, create(ConnRef, Type ++ ":test", Opts)),
ConnRef.
conn_test() ->
ConnRef = open_test_conn(?TEST_DATA_DIR),
?assertMatch(ok, connection_close(ConnRef)).
session_test_() ->
conn_test_() ->
{setup,
fun() ->
open_test_conn(?TEST_DATA_DIR)
@ -537,9 +528,26 @@ session_test_() ->
end,
fun(ConnRef) ->
{inorder,
[{"create and drop a table",
[{"open and close a connection",
fun() ->
?assertMatch(ok, ok)
end},
{"create, verify, drop a table(btree)",
fun() ->
ConnRef = open_test_table(ConnRef),
?assertMatch(ok, verify(ConnRef, "table:test"))
?assertMatch(ok, drop(ConnRef, "table:test"))
end},
{"create, test verify, drop a table(lsm)",
fun() ->
ConnRef = open_test_table(ConnRef, "lsm"),
?assertMatch(ok, verify(ConnRef, "lsm:test"))
?assertMatch(ok, drop(ConnRef, "lsm:test"))
end},
{"create, verify, drop a table(btree, snappy)",
fun() ->
ConnRef = open_test_table(ConnRef, "table", [{block_compressor, "snappy"}]),
?assertMatch(ok, verify(ConnRef, "table:test"))
?assertMatch(ok, drop(ConnRef, "table:test"))
end}]}
end}.
@ -580,8 +588,7 @@ various_session_test_() ->
end},
{"session checkpoint",
fun() ->
Cfg = wterl:config_to_bin([{target, ["\"table:test\""]}]),
?assertMatch(ok, checkpoint(ConnRef, Cfg)),
?assertMatch(ok, checkpoint(ConnRef, [{target, ["\"table:test\""]}])),
?assertMatch({ok, <<"apple">>},
get(ConnRef, "table:test", <<"a">>))
end},
@ -775,8 +782,7 @@ prop_put_delete() ->
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir),
ok = filelib:ensure_dir(filename:join(DataDir, "x")),
Cfg = wterl:config_to_bin([{create,true}]),
{ok, Conn} = wterl:connection_open(DataDir, Cfg),
{ok, Conn} = wterl:connection_open(DataDir, [{create,true}]),
try
wterl:create(ConnRef, Table),
Model = apply_kv_ops(Ops, ConnRef, Table, []),