diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 40c1e87..24b116e 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -50,9 +50,9 @@ %%-define(CAPABILITIES, [async_fold, indexes]). -define(CAPABILITIES, [async_fold]). --record(state, {conn :: reference(), +-record(state, {conn :: wterl:connection(), table :: string(), - session :: reference(), + session :: wterl:session(), partition :: integer()}). -type state() :: #state{}. @@ -109,9 +109,9 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{session=SRef}) -> +stop(#state{conn=ConnRef, session=SRef}) -> ok = wterl:session_close(SRef), - wterl_conn:close(). + wterl_conn:close(ConnRef). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> @@ -246,7 +246,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> {ok, SRef} = wterl:session_open(ConnRef), {ok, Cursor} = wterl:cursor_open(SRef, Table), try - wterl:fold(Cursor, FoldFun, Acc) + wterl:fold_keys(Cursor, FoldFun, Acc) catch {break, AccFinal} -> AccFinal diff --git a/src/wterl.erl b/src/wterl.erl index b5e2d78..0e46123 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -20,8 +20,6 @@ %% %% ------------------------------------------------------------------- -module(wterl). - - -export([conn_open/2, conn_close/1, cursor_close/1, @@ -62,6 +60,16 @@ config_to_bin/1, fold_keys/3]). +-type config() :: binary(). +-type config_list() :: [{atom(), any()}]. +-opaque connection() :: reference(). +-opaque session() :: reference(). +-opaque cursor() :: reference(). +-type key() :: binary(). +-type value() :: binary(). + +-export_type([connection/0, session/0]). + -on_load(init/0). -define(nif_stub, nif_stub_error(?LINE)). @@ -74,6 +82,7 @@ nif_stub_error(Line) -> -include_lib("eunit/include/eunit.hrl"). -endif. +-spec init() -> ok | {error, any()}. init() -> PrivDir = case code:priv_dir(?MODULE) of {error, bad_name} -> @@ -85,109 +94,147 @@ init() -> end, erlang:load_nif(filename:join(PrivDir, ?MODULE), 0). +-spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}. conn_open(_HomeDir, _Config) -> ?nif_stub. +-spec conn_close(connection()) -> ok | {error, term()}. conn_close(_ConnRef) -> ?nif_stub. +-spec session_open(connection()) -> {ok, reference()} | {error, term()}. session_open(_ConnRef) -> ?nif_stub. +-spec session_close(session()) -> ok | {error, term()}. session_close(_Ref) -> ?nif_stub. +-spec session_create(session(), string()) -> ok | {error, term()}. +-spec session_create(session(), string(), config()) -> ok | {error, term()}. session_create(Ref, Name) -> session_create(Ref, Name, ?EMPTY_CONFIG). session_create(_Ref, _Name, _Config) -> ?nif_stub. +-spec session_drop(session(), string()) -> ok | {error, term()}. +-spec session_drop(session(), string(), config()) -> ok | {error, term()}. session_drop(Ref, Name) -> session_drop(Ref, Name, ?EMPTY_CONFIG). session_drop(_Ref, _Name, _Config) -> ?nif_stub. +-spec session_delete(session(), string(), key()) -> ok | {error, term()}. session_delete(_Ref, _Table, _Key) -> ?nif_stub. +-spec session_get(session(), string(), key()) -> {ok, value()} | not_found | {error, term()}. session_get(_Ref, _Table, _Key) -> ?nif_stub. +-spec session_put(session(), string(), key(), value()) -> ok | {error, term()}. session_put(_Ref, _Table, _Key, _Value) -> ?nif_stub. +-spec session_rename(session(), string(), string()) -> ok | {error, term()}. +-spec session_rename(session(), string(), string(), config()) -> ok | {error, term()}. session_rename(Ref, OldName, NewName) -> session_rename(Ref, OldName, NewName, ?EMPTY_CONFIG). session_rename(_Ref, _OldName, _NewName, _Config) -> ?nif_stub. +-spec session_salvage(session(), string()) -> ok | {error, term()}. +-spec session_salvage(session(), string(), config()) -> ok | {error, term()}. session_salvage(Ref, Name) -> session_salvage(Ref, Name, ?EMPTY_CONFIG). session_salvage(_Ref, _Name, _Config) -> ?nif_stub. +-spec session_sync(session(), string()) -> ok | {error, term()}. +-spec session_sync(session(), string(), config()) -> ok | {error, term()}. session_sync(Ref, Name) -> session_sync(Ref, Name, ?EMPTY_CONFIG). session_sync(_Ref, _Name, _Config) -> ?nif_stub. +-spec session_truncate(session(), string()) -> ok | {error, term()}. +-spec session_truncate(session(), string(), config()) -> ok | {error, term()}. session_truncate(Ref, Name) -> session_truncate(Ref, Name, ?EMPTY_CONFIG). session_truncate(_Ref, _Name, _Config) -> ?nif_stub. +-spec session_upgrade(session(), string()) -> ok | {error, term()}. +-spec session_upgrade(session(), string(), config()) -> ok | {error, term()}. session_upgrade(Ref, Name) -> session_upgrade(Ref, Name, ?EMPTY_CONFIG). session_upgrade(_Ref, _Name, _Config) -> ?nif_stub. +-spec session_verify(session(), string()) -> ok | {error, term()}. +-spec session_verify(session(), string(), config()) -> ok | {error, term()}. session_verify(Ref, Name) -> session_verify(Ref, Name, ?EMPTY_CONFIG). session_verify(_Ref, _Name, _Config) -> ?nif_stub. +-spec cursor_open(session(), string()) -> {ok, cursor()} | {error, term()}. cursor_open(_Ref, _Table) -> ?nif_stub. +-spec cursor_close(cursor()) -> ok | {error, term()}. cursor_close(_Cursor) -> ?nif_stub. +-spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. cursor_next(_Cursor) -> ?nif_stub. +-spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}. cursor_next_key(_Cursor) -> ?nif_stub. +-spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}. cursor_next_value(_Cursor) -> ?nif_stub. +-spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. cursor_prev(_Cursor) -> ?nif_stub. +-spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}. cursor_prev_key(_Cursor) -> ?nif_stub. +-spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}. cursor_prev_value(_Cursor) -> ?nif_stub. +-spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}. cursor_search(_Cursor, _Key) -> ?nif_stub. +-spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}. cursor_search_near(_Cursor, _Key) -> ?nif_stub. +-spec cursor_reset(cursor()) -> ok | {error, term()}. cursor_reset(_Cursor) -> ?nif_stub. +-spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}. cursor_insert(_Cursor, _Key, _Value) -> ?nif_stub. +-spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}. cursor_update(_Cursor, _Key, _Value) -> ?nif_stub. +-spec cursor_remove(cursor(), key(), value()) -> ok | {error, term()}. cursor_remove(_Cursor, _Key, _Value) -> ?nif_stub. +-spec fold_keys(cursor(), fun(), any()) -> any(). fold_keys(Cursor, Fun, Acc0) -> fold_keys(Cursor, Fun, Acc0, cursor_next_key(Cursor)). fold_keys(_Cursor, _Fun, Acc, not_found) -> @@ -232,6 +279,7 @@ config_encode(bool, false) -> config_encode(_Type, _Value) -> invalid. +-spec config_to_bin(config_list()) -> config(). config_to_bin(Opts) -> config_to_bin(Opts, []). config_to_bin([], Acc) -> diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index 49c51a3..1db7b81 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -30,35 +30,41 @@ %% API -export([start_link/0, stop/0, - open/1, is_open/0, get/0, close/0]). + open/1, is_open/0, get/0, close/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {conn, - monitors}). +-record(state, {conn :: reference(), + monitors :: ets:tid()}). %% ==================================================================== %% API %% ==================================================================== +-spec start_link() -> {ok, pid()} | {error, term()}. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec stop() -> ok. stop() -> gen_server:cast(?MODULE, stop). +-spec open(string()) -> {ok, wterl:connection()} | {error, term()}. open(Dir) -> gen_server:call(?MODULE, {open, Dir, self()}, infinity). +-spec is_open() -> boolean(). is_open() -> gen_server:call(?MODULE, is_open, infinity). +-spec get() -> {ok, reference()} | {error, term()}. get() -> gen_server:call(?MODULE, get, infinity). -close() -> +-spec close(wterl:connection()) -> ok. +close(_Conn) -> gen_server:call(?MODULE, {close, self()}, infinity). %% ==================================================================== @@ -99,7 +105,7 @@ handle_call({close, Caller}, _From, #state{conn=ConnRef, monitors=Monitors}=Stat true = ets:delete(Monitors, Monitor), NState = case ets:info(Monitors, size) of 0 -> - close(ConnRef), + do_close(ConnRef), ets:delete(Monitors), State#state{conn=undefined, monitors=undefined}; _ -> @@ -110,7 +116,7 @@ handle_call({close, Caller}, _From, #state{conn=ConnRef, monitors=Monitors}=Stat handle_cast(stop, #state{conn=undefined}=State) -> {noreply, State}; handle_cast(stop, #state{conn=ConnRef, monitors=Monitors}=State) -> - close(ConnRef), + do_close(ConnRef), ets:foldl(fun({Monitor, _}, _) -> true = erl:demonitor(Monitor, [flush]) end, true, Monitors), @@ -124,7 +130,7 @@ handle_info({'DOWN', Monitor, _, _, _}, #state{conn=ConnRef, monitors=Monitors}= true = ets:delete(Monitors, Monitor), case ets:info(Monitors, size) of 0 -> - close(ConnRef), + do_close(ConnRef), ets:delete(Monitors), State#state{conn=undefined, monitors=undefined}; _ -> @@ -138,7 +144,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{conn = ConnRef}) -> - close(ConnRef), + do_close(ConnRef), ok. code_change(_OldVsn, State, _Extra) -> @@ -148,9 +154,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %% ==================================================================== -close(undefined) -> +do_close(undefined) -> ok; -close(ConnRef) -> +do_close(ConnRef) -> wterl:conn_close(ConnRef). @@ -182,18 +188,18 @@ simple_test_() -> end}]}. open_one() -> - {ok, _Ref} = open("test/wterl-backend"), + {ok, Ref} = open("test/wterl-backend"), true = is_open(), - close(), + close(Ref), false = is_open(), ok. open_and_wait(Pid) -> - {ok, _Ref} = open("test/wterl-backend"), + {ok, Ref} = open("test/wterl-backend"), Pid ! open, receive close -> - close(), + close(Ref), Pid ! close; exit -> exit(normal)