Replace bdberl_db with a merged bdberl API.

This commit is contained in:
Phillip Toland 2008-12-12 12:39:51 -06:00
parent e1bcd7aa52
commit 5f2b99554a
4 changed files with 309 additions and 90 deletions

View file

@ -1,7 +1,7 @@
{application, bdberl,
[{description, "Berkeley DB Erlang Driver"},
{vsn, "1"},
{modules, [ bdberl_port, bdberl_db ]},
{modules, [ bdberl_port, bdberl ]},
{registered, []},
{applications, [kernel,
stdlib]},

282
src/bdberl.erl Normal file
View file

@ -0,0 +1,282 @@
%% -------------------------------------------------------------------
%%
%% bdberl: Interface to BerkeleyDB
%% Copyright (c) 2008 The Hive. All rights reserved.
%%
%% -------------------------------------------------------------------
-module(bdberl).
-export([init/0,
open/2, open/3,
close/1, close/2,
txn_begin/0, txn_begin/1,
txn_commit/0, txn_commit/1, txn_abort/0,
get_cache_size/0, set_cache_size/3,
get_txn_timeout/0, set_txn_timeout/1,
transaction/1,
put/3, put/4,
get/2, get/3,
update/3]).
-include("bdberl.hrl").
init() ->
case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of
ok -> ok;
{error, permanent} -> ok % Means that the driver is already active
end,
Port = open_port({spawn, bdberl_drv}, [binary]),
erlang:put(bdb_port, Port),
ok.
open(Name, Type) ->
open(Name, Type, [create]).
open(Name, Type, Opts) ->
%% Map database type into an integer code
case Type of
btree -> TypeCode = ?DB_TYPE_BTREE;
hash -> TypeCode = ?DB_TYPE_HASH
end,
Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])),
Cmd = <<Flags:32/unsigned-native-integer, TypeCode:8/native-integer, (list_to_binary(Name))/bytes, 0:8/native-integer>>,
case erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd) of
<<?STATUS_OK:8, Db:32/native>> ->
{ok, Db};
<<?STATUS_ERROR:8, Errno:32/native>> ->
{error, Errno}
end.
close(Db) ->
close(Db, []).
close(Db, Opts) ->
Flags = process_flags(Opts),
Cmd = <<Db:32/native-integer, Flags:32/unsigned-native-integer>>,
case erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd) of
<<0:32/native-integer>> ->
{error, invalid_db};
<<1:32/native-integer>> ->
ok
end.
txn_begin() ->
txn_begin([]).
txn_begin(Opts) ->
Flags = process_flags(Opts),
Cmd = <<Flags:32/unsigned-native>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd),
case decode_rc(Result) of
ok -> ok;
Error -> {error, {txn_begin, Error}}
end.
txn_commit() ->
txn_commit([]).
txn_commit(Opts) ->
Flags = process_flags(Opts),
Cmd = <<Flags:32/unsigned-native>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd),
case decode_rc(Result) of
ok ->
receive
ok -> ok;
{error, Reason} -> {error, {txn_commit, decode_rc(Reason)}}
end;
Error ->
{error, {txn_commit, Error}}
end.
txn_abort() ->
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_TXN_ABORT, <<>>),
case decode_rc(Result) of
ok ->
receive
ok -> ok;
{error, Reason} -> {error, {txn_abort, decode_rc(Reason)}}
end;
Error ->
{error, {txn_abort, Error}}
end.
transaction(Fun) ->
txn_begin(),
try Fun() of
abort ->
txn_abort(),
{error, transaction_aborted};
Value ->
txn_commit(),
{ok, Value}
catch
_ : Reason ->
txn_abort(),
{error, {transaction_failed, Reason}}
end.
put(Db, Key, Value) ->
put(Db, Key, Value, []).
put(Db, Key, Value, Opts) ->
{KeyLen, KeyBin} = to_binary(Key),
{ValLen, ValBin} = to_binary(Value),
Flags = process_flags(Opts),
Cmd = <<Db:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_PUT, Cmd),
case decode_rc(Result) of
ok ->
receive
ok -> ok;
{error, Reason} -> {error, {put, decode_rc(Reason)}}
end;
Error ->
{error, {put, decode_rc(Error)}}
end.
get(Db, Key) ->
get(Db, Key, []).
get(Db, Key, Opts) ->
{KeyLen, KeyBin} = to_binary(Key),
Flags = process_flags(Opts),
Cmd = <<Db:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_GET, Cmd),
case decode_rc(Result) of
ok ->
receive
{ok, Bin} -> {ok, binary_to_term(Bin)};
not_found -> not_found;
{error, Reason} -> {error, {get, decode_rc(Reason)}}
end;
Error ->
{error, {get, decode_rc(Error)}}
end.
update(Db, Key, Fun) ->
F = fun() ->
{ok, Value} = get(Db, Key, [rmw]),
NewValue = Fun(Key, Value),
ok = put(Db, Key, NewValue),
NewValue
end,
transaction(F).
get_cache_size() ->
Cmd = <<?SYSP_CACHESIZE_GET:32/native>>,
<<Result:32/signed-native, Gbytes:32/native, Bytes:32/native, Ncaches:32/native>> =
erlang:port_control(get_port(), ?CMD_TUNE, Cmd),
case Result of
0 ->
{ok, Gbytes, Bytes, Ncaches};
_ ->
{error, Result}
end.
set_cache_size(Gbytes, Bytes, Ncaches) ->
Cmd = <<?SYSP_CACHESIZE_SET:32/native, Gbytes:32/native, Bytes:32/native, Ncaches:32/native>>,
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TUNE, Cmd),
case Result of
0 ->
ok;
_ ->
{error, Result}
end.
get_txn_timeout() ->
Cmd = <<?SYSP_TXN_TIMEOUT_GET:32/native>>,
<<Result:32/signed-native, Timeout:32/native>> = erlang:port_control(get_port(), ?CMD_TUNE, Cmd),
case Result of
0 ->
{ok, Timeout};
_ ->
{error, Result}
end.
set_txn_timeout(Timeout) ->
Cmd = <<?SYSP_TXN_TIMEOUT_SET:32/native, Timeout:32/native>>,
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TUNE, Cmd),
case Result of
0 ->
ok;
_ ->
{error, Result}
end.
%% ====================================================================
%% Internal functions
%% ====================================================================
get_port() ->
case erlang:get(bdb_port) of
undefined ->
ok = init(),
erlang:get(bdb_port);
Port ->
Port
end.
%%
%% Decode a integer return value into an atom representation
%%
decode_rc(?ERROR_NONE) -> ok;
decode_rc(?ERROR_ASYNC_PENDING) -> async_pending;
decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref;
decode_rc(?ERROR_NO_TXN) -> no_txn;
decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted;
decode_rc(?ERROR_DB_LOCK_DEADLOCK) -> deadlock;
decode_rc(Rc) -> {unknown, Rc}.
%%
%% Convert a term into a binary, returning a tuple with the binary and the length of the binary
%%
to_binary(Term) ->
Bin = term_to_binary(Term),
{size(Bin), Bin}.
%%
%% Given an array of options, produce a single integer with the numeric values
%% of the options joined with binary OR
%%
process_flags([]) ->
0;
process_flags([Flag|Flags]) ->
flag_value(Flag) bor process_flags(Flags).
%%
%% Given an option as an atom, return the numeric value
%%
flag_value(Flag) ->
case Flag of
append -> ?DB_APPEND;
auto_commit -> ?DB_AUTO_COMMIT;
consume -> ?DB_CONSUME;
consume_wait -> ?DB_CONSUME_WAIT;
create -> ?DB_CREATE;
exclusive -> ?DB_EXCL;
get_both -> ?DB_GET_BOTH;
ignore_lease -> ?DB_IGNORE_LEASE;
multiple -> ?DB_MULTIPLE;
multiversion -> ?DB_MULTIVERSION;
no_duplicate -> ?DB_NODUPDATA;
no_mmap -> ?DB_NOMMAP;
no_overwrite -> ?DB_NOOVERWRITE;
no_sync -> ?DB_NOSYNC;
read_committed -> ?DB_READ_COMMITTED;
read_uncommitted -> ?DB_READ_UNCOMMITTED;
readonly -> ?DB_RDONLY;
rmw -> ?DB_RMW;
set_recno -> ?DB_SET_RECNO;
threaded -> ?DB_THREAD;
truncate -> ?DB_TRUNCATE;
txn_no_sync -> ?DB_TXN_NOSYNC;
txn_no_wait -> ?DB_TXN_NOWAIT;
txn_snapshot -> ?DB_TXN_SNAPSHOT;
txn_sync -> ?DB_TXN_SYNC;
txn_wait -> ?DB_TXN_WAIT;
txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC
end.

View file

@ -1,68 +0,0 @@
%% -------------------------------------------------------------------
%%
%% bdberl: API Interface
%% Copyright (c) 2008 The Hive. All rights reserved.
%%
%% -------------------------------------------------------------------
-module(bdberl_db).
-export([open/3, open/4,
close/1, close/2,
put/3, put/4,
get/2, get/3,
transaction/2,
update/3]).
open(Port, Name, Type) ->
open(Port, Name, Type, [create]).
open(Port, Name, Type, Opts) ->
case bdberl_port:open_database(Port, Name, Type, Opts) of
{ok, DbRef} -> {ok, {db, Port, DbRef}};
{error, Reason} -> {error, Reason}
end.
close(Db) ->
close(Db, []).
close({db, Port, DbRef}, Opts) ->
bdberl_port:close_database(Port, DbRef, Opts).
put(Db, Key, Value) ->
put(Db, Key, Value, []).
put({db, Port, DbRef}, Key, Value, Opts) ->
bdberl_port:put(Port, DbRef, Key, Value, Opts).
get(Db, Key) ->
get(Db, Key, []).
get({db, Port, DbRef}, Key, Opts) ->
bdberl_port:get(Port, DbRef, Key, Opts).
transaction({db, Port, _DbRef}, Fun) ->
bdberl_port:txn_begin(Port),
try Fun() of
abort ->
bdberl_port:txn_abort(Port),
{error, transaction_aborted};
Value ->
bdberl_port:txn_commit(Port),
{ok, Value}
catch
_ : Reason ->
bdberl_port:txn_abort(Port),
{error, {transaction_failed, Reason}}
end.
update(Db, Key, Fun) ->
F = fun() ->
{ok, Value} = get(Db, Key, [rmw]),
NewValue = Fun(Key, Value),
ok = put(Db, Key, NewValue),
NewValue
end,
transaction(Db, F).

View file

@ -4,7 +4,7 @@
%% Copyright (c) 2008 The Hive. All rights reserved.
%%
%% -------------------------------------------------------------------
-module(db_api_SUITE).
-module(bdberl_SUITE).
-compile(export_all).
@ -20,14 +20,19 @@ all() ->
update_should_save_value_if_successful].
init_per_suite(Config) ->
ok = bdberl:init(),
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
{ok, Port} = bdberl_port:new(),
{ok, Db} = bdberl_db:open(Port, "api_test.db", btree, [create, exclusive]),
[{port, Port},{db, Db}|Config].
{ok, Db} = bdberl:open("api_test.db", btree, [create, exclusive]),
[{db, Db}|Config].
end_per_testcase(_TestCase, Config) ->
ok = bdberl_db:close(?config(db, Config)),
true = port_close(?config(port, Config)),
ok = bdberl:close(?config(db, Config)),
ok = file:delete("api_test.db").
@ -35,44 +40,44 @@ open_should_create_database_if_none_exists(_Config) ->
true = filelib:is_file("api_test.db").
get_should_fail_when_getting_a_nonexistant_record(Config) ->
not_found = bdberl_db:get(?config(db, Config), bad_key).
not_found = bdberl:get(?config(db, Config), bad_key).
get_should_return_a_value_when_getting_a_valid_record(Config) ->
Db = ?config(db, Config),
ok = bdberl_db:put(Db, mykey, avalue),
{ok, avalue} = bdberl_db:get(Db, mykey).
ok = bdberl:put(Db, mykey, avalue),
{ok, avalue} = bdberl:get(Db, mykey).
transaction_should_commit_on_success(Config) ->
Db = ?config(db, Config),
F = fun() -> bdberl_db:put(Db, mykey, avalue) end,
{ok, ok} = bdberl_db:transaction(Db, F),
{ok, avalue} = bdberl_db:get(Db, mykey).
F = fun() -> bdberl:put(Db, mykey, avalue) end,
{ok, ok} = bdberl:transaction(F),
{ok, avalue} = bdberl:get(Db, mykey).
transaction_should_abort_on_exception(Config) ->
Db = ?config(db, Config),
F = fun() ->
bdberl_db:put(Db, mykey, should_not_see_this),
bdberl:put(Db, mykey, should_not_see_this),
throw(testing)
end,
{error, {transaction_failed, testing}} = bdberl_db:transaction(Db, F),
not_found = bdberl_db:get(Db, mykey).
{error, {transaction_failed, testing}} = bdberl:transaction(F),
not_found = bdberl:get(Db, mykey).
transaction_should_abort_on_user_abort(Config) ->
Db = ?config(db, Config),
F = fun() ->
bdberl_db:put(Db, mykey, should_not_see_this),
bdberl:put(Db, mykey, should_not_see_this),
abort
end,
{error, transaction_aborted} = bdberl_db:transaction(Db, F),
not_found = bdberl_db:get(Db, mykey).
{error, transaction_aborted} = bdberl:transaction(F),
not_found = bdberl:get(Db, mykey).
update_should_save_value_if_successful(Config) ->
Db = ?config(db, Config),
ok = bdberl_db:put(Db, mykey, avalue),
ok = bdberl:put(Db, mykey, avalue),
F = fun(Key, Value) ->
mykey = Key,
@ -80,6 +85,6 @@ update_should_save_value_if_successful(Config) ->
newvalue
end,
{ok, newvalue} = bdberl_db:update(Db, mykey, F),
{ok, newvalue} = bdberl_db:get(Db, mykey).
{ok, newvalue} = bdberl:update(Db, mykey, F),
{ok, newvalue} = bdberl:get(Db, mykey).