diff --git a/Makefile b/Makefile index 14d9196..e485507 100644 --- a/Makefile +++ b/Makefile @@ -5,12 +5,18 @@ REBAR= ./rebar ERL= /usr/bin/env erl ERLEXEC= ${ERL_ROOTDIR}/lib/erlang/erts-5.9.1/bin/erlexec DIALYZER= /usr/bin/env dialyzer - +ARCHIVETAG?= $(shell git describe --always --long --tags) +ARCHIVE?= $(shell basename $(CURDIR))-$(ARCHIVETAG) +WT_ARCHIVETAG?= $(shell cd c_src/wiredtiger-basho; git describe --always --long --tags) .PHONY: plt analyze all deps compile get-deps clean all: compile +archive: + @rm -f $(ARCHIVE).tar.gz + git archive --format=tar --prefix=$(ARCHIVE)/ $(ARCHIVETAG) | gzip >$(ARCHIVE).tar.gz + deps: get-deps get-deps: @@ -33,34 +39,52 @@ compile: c_src/wterl.o ebin/app_helper.beam @$(REBAR) compile clean: + @rm -f $(ARCHIVE).tar.gz @$(REBAR) clean +xref: + @$(REBAR) xref skip_deps=true + test: eunit -eunit: compile +eunit: compile-for-eunit @$(REBAR) eunit skip_deps=true -eunit_console: - @$(ERL) -pa .eunit deps/lager/ebin +eqc: compile-for-eqc + @$(REBAR) eqc skip_deps=true + +proper: compile-for-proper + @echo "rebar does not implement a 'proper' command" && false + +triq: compile-for-triq + @$(REBAR) triq skip_deps=true + +compile-for-eunit: + @$(REBAR) compile eunit compile_only=true + +compile-for-eqc: + @$(REBAR) -D QC -D QC_EQC compile eqc compile_only=true + +compile-for-eqcmini: + @$(REBAR) -D QC -D QC_EQCMINI compile eqc compile_only=true + +compile-for-proper: + @$(REBAR) -D QC -D QC_PROPER compile eqc compile_only=true + +compile-for-triq: + @$(REBAR) -D QC -D QC_TRIQ compile triq compile_only=true plt: compile @$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib analyze: compile - $(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin + @$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin repl: - $(ERL) -pz deps/lager/ebin -pa ebin - -gdb-repl: - USE_GDB=1 $(ERL) -pz deps/lager/ebin -pa ebin + @$(ERL) -pa ebin -pz deps/lager/ebin eunit-repl: - $(ERL) -pz deps/lager/ebin -pa ebin -pa .eunit - -gdb-eunit-repl: - USE_GDB=1 $(ERL) -pa .eunit -pz deps/lager/ebin -pz ebin -exec 'cd(".eunit").' - + @$(ERL) -pa .eunit deps/lager/ebin ERL_TOP= /home/gburd/eng/otp_R15B01 CERL= ${ERL_TOP}/bin/cerl diff --git a/c_src/async_nif.h b/c_src/async_nif.h index a89262a..6f17495 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -32,6 +32,10 @@ extern "C" { #include "stats.h" // TODO: measure, measure... measure again #endif +#ifndef __UNUSED +#define __UNUSED(v) ((void)(v)) +#endif + #define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500 @@ -61,8 +65,12 @@ struct async_nif_state { #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ struct decl ## _args frame; \ - static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \ + static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ + __UNUSED(worker_id); \ + do work_block while(0); \ + } \ static void fn_post_ ## decl (struct decl ## _args *args) { \ + __UNUSED(args); \ do post_block while(0); \ } \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ @@ -80,7 +88,7 @@ struct async_nif_state { if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ - if (!(new_env = enif_alloc_env())) { \ + if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ @@ -172,7 +180,7 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int qid = (hint != -1) ? hint : async_nif->next_q; + unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q; struct async_nif_work_queue *q = NULL; do { q = &async_nif->queues[qid]; @@ -193,6 +201,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en } else { break; } + // TODO: at some point add in work sheading/stealing } while(1); /* We hold the queue's lock, and we've seletect a reasonable queue for this @@ -265,6 +274,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int i; unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; + __UNUSED(env); /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first @@ -296,6 +306,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) /* Worker threads are stopped, now toss anything left in the queue. */ struct async_nif_req_entry *req = NULL; fifo_q_foreach(reqs, q->reqs, req, { + enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_atom(req->env, "shutdown"))); @@ -400,6 +411,7 @@ async_nif_load() static void async_nif_upgrade(ErlNifEnv *env) { + __UNUSED(env); // TODO: } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 70163f5..686fda3 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -28,7 +28,7 @@ export BASEDIR="$PWD" which gmake 1>/dev/null 2>/dev/null && MAKE=gmake MAKE=${MAKE:-make} -export CFLAGS="$CFLAGS -g -I $BASEDIR/system/include" +export CFLAGS="$CFLAGS -I $BASEDIR/system/include" export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH" diff --git a/c_src/wterl.c b/c_src/wterl.c index 3eab666..2890f7c 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -35,6 +35,10 @@ # define dprint(s, ...) {} #endif +#ifndef __UNUSED +#define __UNUSED(v) ((void)(v)) +#endif + #include "wiredtiger.h" #include "async_nif.h" #include "khash.h" @@ -89,6 +93,22 @@ static ERL_NIF_TERM ATOM_OK; static ERL_NIF_TERM ATOM_NOT_FOUND; static ERL_NIF_TERM ATOM_FIRST; static ERL_NIF_TERM ATOM_LAST; +static ERL_NIF_TERM ATOM_MESSAGE; +static ERL_NIF_TERM ATOM_PROGRESS; +static ERL_NIF_TERM ATOM_WTERL_VSN; +static ERL_NIF_TERM ATOM_WIREDTIGER_VSN; +static ERL_NIF_TERM ATOM_MSG_PID; + +struct wterl_event_handlers { + WT_EVENT_HANDLER handlers; + ErlNifEnv *msg_env_error; + ErlNifMutex *error_mutex; + ErlNifEnv *msg_env_message; + ErlNifMutex *message_mutex; + ErlNifEnv *msg_env_progress; + ErlNifMutex *progress_mutex; + ErlNifPid to_pid; +}; /* Generators for 'conns' a named, type-specific hash table functions. */ KHASH_MAP_INIT_PTR(conns, WterlConnHandle*); @@ -97,12 +117,130 @@ struct wterl_priv_data { void *async_nif_priv; // Note: must be first element in struct ErlNifMutex *conns_mutex; khash_t(conns) *conns; + struct wterl_event_handlers eh; + char wterl_vsn[512]; + char wiredtiger_vsn[512]; }; /* Global init for async_nif. */ ASYNC_NIF_INIT(wterl); +/** + * Callback to handle error messages. + * + * Deliver error messages into Erlang to be logged via loger:error() + * or on failure, write message to stderr. + * + * error a WiredTiger, C99 or POSIX error code, which can + * be converted to a string using wiredtiger_strerror() + * message an error string + * -> 0 on success, a non-zero return may cause the WiredTiger + * function posting the event to fail, and may even cause + * operation or library failure. + */ +int +__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message) +{ + struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; + ErlNifEnv *msg_env; + ErlNifPid *to_pid; + int rc = 0; + + enif_mutex_lock(eh->error_mutex); + msg_env = eh->msg_env_error; + to_pid = &eh->to_pid; + if (msg_env) { + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_ERROR, + enif_make_tuple2(msg_env, + enif_make_atom(msg_env, erl_errno_id(error)), + enif_make_string(msg_env, message, ERL_NIF_LATIN1))); + enif_clear_env(msg_env); + if (!enif_send(NULL, to_pid, msg_env, msg)) + fprintf(stderr, "[%d] %s\n", error, message); + } else { + rc = (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO); + } + enif_mutex_unlock(eh->error_mutex); + return rc; +} + +/** + * Callback to handle informational messages. + * + * Deliver informational messages into Erlang to be logged via loger:info() + * or on failure, write message to stdout. + * + * message an informational string + * -> 0 on success, a non-zero return may cause the WiredTiger + * function posting the event to fail, and may even cause + * operation or library failure. + */ +int +__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message) +{ + struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; + ErlNifEnv *msg_env; + ErlNifPid *to_pid; + int rc = 0; + + enif_mutex_lock(eh->message_mutex); + msg_env = eh->msg_env_message; + to_pid = &eh->to_pid; + if (msg_env) { + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_MESSAGE, + enif_make_string(msg_env, message, ERL_NIF_LATIN1)); + enif_clear_env(msg_env); + if (!enif_send(NULL, to_pid, msg_env, msg)) + fprintf(stderr, "%s\n", message); + } else { + rc = (printf("%s\n", message) >= 0 ? 0 : EIO); + } + enif_mutex_unlock(eh->message_mutex); + return rc; +} + +/** + * Callback to handle progress messages. + * + * Deliver progress messages into Erlang to be logged via loger:info() + * or on failure, written message to stdout. + * + * operation a string representation of the operation + * counter a progress counter [0..100] + * -> 0 on success, a non-zero return may cause the WiredTiger + * function posting the event to fail, and may even cause + * operation or library failure. + */ +int +__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter) +{ + struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; + ErlNifEnv *msg_env; + ErlNifPid *to_pid; + int rc = 0; + + enif_mutex_lock(eh->progress_mutex); + msg_env = eh->msg_env_progress; + to_pid = &eh->to_pid; + if (msg_env) { + ERL_NIF_TERM msg = + enif_make_tuple2(msg_env, ATOM_PROGRESS, + enif_make_tuple2(msg_env, + enif_make_string(msg_env, operation, ERL_NIF_LATIN1), + enif_make_int64(msg_env, counter))); + enif_clear_env(msg_env); + if (!enif_send(NULL, to_pid, msg_env, msg)) + fprintf(stderr, "[%ld] %s\n", counter, operation); + } else { + rc = (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO); + } + enif_mutex_unlock(eh->progress_mutex); + return rc; +} + /** * Open a WT_SESSION for the thread context 'ctx' to use, also init the * shared cursor hash table. @@ -119,7 +257,13 @@ __init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx) ctx->session = NULL; return rc; } + ctx->cursors = kh_init(cursors); + if (!ctx->cursors) { + ctx->session->close(ctx->session, NULL); + ctx->session = NULL; + return ENOMEM; + } return 0; } @@ -256,6 +400,9 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char static void __release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor) { + __UNUSED(conn_handle); + __UNUSED(worker_id); + __UNUSED(uri); cursor->reset(cursor); } @@ -327,7 +474,11 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } - int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn); + + int rc = wiredtiger_open(args->homedir, + (WT_EVENT_HANDLER*)&args->priv->eh.handlers, + config.data[0] != 0 ? (const char*)config.data : NULL, + &conn); if (rc == 0) { WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); if (!conn_handle) { @@ -847,14 +998,6 @@ ASYNC_NIF_DECL( item_start.data = start_key.data; item_start.size = start_key.size; start->set_key(start, item_start); - rc = start->search(start); - if (rc != 0) { - start->close(start); - session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; - } } if (!args->to_last) { @@ -890,15 +1033,6 @@ ASYNC_NIF_DECL( item_stop.data = stop_key.data; item_stop.size = stop_key.size; stop->set_key(stop, item_stop); - rc = stop->search(stop); - if (rc != 0) { - start->close(start); - stop->close(stop); - session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; - } } /* Always pass NULL for URI here because we always specify the range with the @@ -1889,6 +2023,37 @@ ASYNC_NIF_DECL( enif_release_resource((void*)args->cursor_handle); }); + +/** + * Called by wterl_event_handler to set the pid for message delivery. + */ +static ERL_NIF_TERM +wterl_set_event_handler_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + struct wterl_priv_data *priv = enif_priv_data(env); + struct wterl_event_handlers *eh = &priv->eh; + + if (!(argc == 1 && enif_is_pid(env, argv[0]))) { + return enif_make_badarg(env); + } + if (enif_get_local_pid(env, argv[0], &eh->to_pid)) { + if (!eh->msg_env_message) + eh->msg_env_message = enif_alloc_env(); // TOOD: if (!eh->msg_env) { return ENOMEM; } + if (!eh->msg_env_error) + eh->msg_env_error = enif_alloc_env(); + if (!eh->msg_env_progress) + eh->msg_env_progress = enif_alloc_env(); + + eh->handlers.handle_error = __wterl_error_handler; + eh->handlers.handle_message = __wterl_message_handler; + eh->handlers.handle_progress = __wterl_progress_handler; + } else { + memset(&eh->to_pid, 0, sizeof(ErlNifPid)); + } + return ATOM_OK; +} + + /** * Called as this driver is loaded by the Erlang BEAM runtime triggered by the * module's on_load directive. @@ -1903,6 +2068,9 @@ ASYNC_NIF_DECL( static int on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { + int arity; + ERL_NIF_TERM head, tail; + const ERL_NIF_TERM* option; ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", NULL, flags, NULL); @@ -1914,25 +2082,69 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) ATOM_NOT_FOUND = enif_make_atom(env, "not_found"); ATOM_FIRST = enif_make_atom(env, "first"); ATOM_LAST = enif_make_atom(env, "last"); + ATOM_MESSAGE = enif_make_atom(env, "message"); + ATOM_PROGRESS = enif_make_atom(env, "progress"); + ATOM_WTERL_VSN = enif_make_atom(env, "wterl_vsn"); + ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn"); + ATOM_MSG_PID = enif_make_atom(env, "message_pid"); struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data)); if (!priv) return ENOMEM; memset(priv, 0, sizeof(struct wterl_priv_data)); + priv->conns_mutex = enif_mutex_create(NULL); + priv->conns = kh_init(conns); + if (!priv->conns) { + enif_mutex_destroy(priv->conns_mutex); + enif_free(priv); + return ENOMEM; + } + + struct wterl_event_handlers *eh = &priv->eh; + eh->error_mutex = enif_mutex_create(NULL); + eh->message_mutex = enif_mutex_create(NULL); + eh->progress_mutex = enif_mutex_create(NULL); + + /* Process the load_info array of tuples, we expect: + [{wterl_vsn, "a version string"}, + {wiredtiger_vsn, "a version string"}]. */ + while (enif_get_list_cell(env, load_info, &head, &tail)) { + if (enif_get_tuple(env, head, &arity, &option)) { + if (arity == 2) { + if (enif_is_identical(option[0], ATOM_WTERL_VSN)) { + enif_get_string(env, option[1], priv->wterl_vsn, sizeof(priv->wterl_vsn), ERL_NIF_LATIN1); + } else if (enif_is_identical(option[0], ATOM_WIREDTIGER_VSN)) { + enif_get_string(env, option[1], priv->wiredtiger_vsn, sizeof(priv->wiredtiger_vsn), ERL_NIF_LATIN1); + } + } + } + load_info = tail; + } + /* Note: !!! the first element of our priv_data struct *must* be the pointer to the async_nif's private data which we set here. */ ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); - - priv->conns_mutex = enif_mutex_create(NULL); - priv->conns = kh_init(conns); + if (!priv->async_nif_priv) { + kh_destroy(conns, priv->conns); + enif_mutex_destroy(priv->conns_mutex); + enif_free(priv); + return ENOMEM; + } *priv_data = priv; - return *priv_data ? 0 : ENOMEM; + + char msg[1024]; + snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn); + __wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg); + return 0; } static int on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { + __UNUSED(env); + __UNUSED(priv_data); + __UNUSED(load_info); return 0; // TODO: implement } @@ -1996,6 +2208,20 @@ on_unload(ErlNifEnv *env, void *priv_data) } } + /* At this point all WiredTiger state and threads are free'd/stopped so there + is no chance that the event handler functions will be called so we can + be sure that there won't be a race on eh.msg_env in the callback functions. */ + struct wterl_event_handlers *eh = &priv->eh; + enif_mutex_destroy(eh->error_mutex); + enif_mutex_destroy(eh->message_mutex); + enif_mutex_destroy(eh->progress_mutex); + if (eh->msg_env_message) + enif_free_env(eh->msg_env_message); + if (eh->msg_env_error) + enif_free_env(eh->msg_env_error); + if (eh->msg_env_progress) + enif_free_env(eh->msg_env_progress); + kh_destroy(conns, h); enif_mutex_unlock(priv->conns_mutex); enif_mutex_destroy(priv->conns_mutex); @@ -2005,6 +2231,9 @@ on_unload(ErlNifEnv *env, void *priv_data) static int on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) { + __UNUSED(priv_data); + __UNUSED(old_priv_data); + __UNUSED(load_info); ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement return 0; } @@ -2041,6 +2270,7 @@ static ErlNifFunc nif_funcs[] = {"cursor_search_near_nif", 4, wterl_cursor_search_near}, {"cursor_search_nif", 4, wterl_cursor_search}, {"cursor_update_nif", 4, wterl_cursor_update}, + {"set_event_handler_pid", 1, wterl_set_event_handler_pid}, }; ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload); diff --git a/rebar.config b/rebar.config index 9a8dc93..36d44a6 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,7 @@ %%-*- mode: erlang -*- %% ex: ft=erlang ts=4 sw=4 et -{require_otp_vsn, "R1[456]"}. +{require_otp_vsn, "R1[567]"}. {cover_enabled, true}. @@ -27,7 +27,7 @@ %strict_validation ]}. -{xref_checks, [undefined_function_calls]}. +{xref_checks, [undefined_function_calls, deprecated_function_calls]}. {deps, [ {lager, "2.*", {git, "git://github.com/basho/lager", {branch, "master"}}} @@ -36,7 +36,7 @@ {port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}. {port_env, [ - {"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"}, + {"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"}, {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"} ]}. diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index f9f08ba..2711852 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -107,7 +107,7 @@ start(Partition, Config) -> "lsm" end, {ok, Connection} = establish_connection(Config, Type), - Table = Type ++ ":" ++ integer_to_list(Partition), + Table = Type ++ ":wt" ++ integer_to_list(Partition), Compressor = case wterl:config_value(block_compressor, Config, "snappy") of {block_compressor, "snappy"}=C -> [C]; @@ -151,8 +151,8 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(_State) -> - ok. %% The connection is closed by wterl_conn:stop() +stop(#state{connection=Connection}) -> + wterl_conn:close(Connection). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> @@ -416,10 +416,14 @@ establish_connection(Config, Type) -> wterl:config_value(session_max, Config, max_sessions(Config)), wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec - wterl:config_value(verbose, Config, [ - %"ckpt" "block", "shared_cache", "evictserver", "fileops", - %"hazard", "mutex", "read", "readserver", "reconcile", - %"salvage", "verify", "write", "evict", "lsm" + wterl:config_value(verbose, Config, [ "salvage", "verify" + % Note: for some unknown reason, if you add these additional + % verbose flags Erlang SEGV's "size_object: bad tag for 0x80" + % no idea why... yet... you've been warned. + + %"block", "shared_cache", "reconcile", "evict", "lsm", + %"fileops", "read", "write", "readserver", "evictserver", + %"hazard", "mutex", "ckpt" ]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec diff --git a/src/wterl.erl b/src/wterl.erl index ee672a9..1b3c3d7 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -67,6 +67,8 @@ fold_keys/3, fold/3]). +-export([set_event_handler_pid/1]). + -include("async_nif.hrl"). -ifdef(TEST). @@ -95,8 +97,7 @@ nif_stub_error(Line) -> -spec init() -> ok | {error, any()}. init() -> erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), - [{wterl, "07061ed6e8252543c2f06b81a646eca6945cc558"}, - {wiredtiger, "6f7a4b961c744bfb21f0c21d4c28c2d162400f1b"}]). + [{wterl_vsn, "a1459ce"}, {wiredtiger_vsn, "1.5.2-2-g8f2685b"}]). -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. @@ -508,6 +509,11 @@ config_to_bin([{Key, Value} | Rest], Acc) -> end. +-spec set_event_handler_pid(pid()) -> ok. +set_event_handler_pid(Pid) + when is_pid(Pid) -> + ?nif_stub. + %% =================================================================== %% EUnit tests @@ -862,7 +868,7 @@ prop_put_delete() -> DataDir = "test/wterl.putdelete.qc", Table = "table:eqc", {ok, CWD} = file:get_cwd(), - ?cmd("rm -rf "++DataDir), + rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), ok = filelib:ensure_dir(filename:join([DataDir, "x"])), {ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]), try diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index b7ead8b..9f62d46 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -84,14 +84,14 @@ init([]) -> handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) -> {Reply, NState} = - case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of - {ok, ConnRef}=OK -> - Monitor = erlang:monitor(process, Caller), - true = ets:insert(wterl_ets, {Monitor, Caller}), - {OK, State#state{conn = ConnRef}}; - Error -> - {Error, State} - end, + case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of + {ok, ConnRef}=OK -> + Monitor = erlang:monitor(process, Caller), + true = ets:insert(wterl_ets, {Monitor, Caller}), + {OK, State#state{conn = ConnRef}}; + Error -> + {Error, State} + end, {reply, Reply, NState}; handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> Monitor = erlang:monitor(process, Caller), diff --git a/src/wterl_event_handler.erl b/src/wterl_event_handler.erl new file mode 100644 index 0000000..71c357c --- /dev/null +++ b/src/wterl_event_handler.erl @@ -0,0 +1,111 @@ +%% ------------------------------------------------------------------- +%% +%% wterl: Erlang Wrapper for WiredTiger +%% +%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(wterl_event_handler). + +-behaviour(gen_server). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +%% API +-export([start_link/0, stop/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(PREFIX, "wiredtiger"). + +%% ==================================================================== +%% API +%% ==================================================================== + +-spec start_link() -> {ok, pid()} | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec stop() -> ok. +stop() -> + gen_server:cast(?MODULE, stop). + +%% ==================================================================== +%% gen_server callbacks +%% ==================================================================== + +init([]) -> + wterl:set_event_handler_pid(self()), + {ok, []}. + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +handle_cast(stop, State) -> + {stop, normal, State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({error, {Errno, Message}}, State) -> + log(error, "~s: (~s) ~s", [?PREFIX, Errno, Message]), + {noreply, State}; +handle_info({message, Info}, State) -> + log(info, "~s: ~s", [?PREFIX, Info]), + {noreply, State}; +handle_info({progress, {Operation, Counter}}, State) -> + log(info, "~s: progress on ~s [~b]", [?PREFIX, Operation, Counter]), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% ==================================================================== +%% Internal functions +%% ==================================================================== + +%% @private +-spec log(error | info, string(), [any()]) -> ok. +log(Urgency, Format, Args) -> + case proplists:is_defined(lager, application:which_applications()) of + true -> + log(lager, Urgency, Format, Args); + false -> + log(stdio, Urgency, Format, Args) + end. + +-spec log(lager | stdio, error | info, string(), [any()]) -> ok. +log(lager, error, Format, Args) -> + lager:error(Format, Args); +log(lager, info, Format, Args) -> + lager:info(Format, Args); +log(stdio, _, Format, Args) -> + io:format(Format ++ "~n", Args). + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). +-endif. diff --git a/src/wterl_sup.erl b/src/wterl_sup.erl index 07bc6cd..8c544ae 100644 --- a/src/wterl_sup.erl +++ b/src/wterl_sup.erl @@ -46,4 +46,5 @@ start_link() -> init([]) -> {ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker), - ?CHILD(wterl_conn, worker)]}}. + ?CHILD(wterl_conn, worker), + ?CHILD(wterl_event_handler, worker)]}}.