Added flags to the put function.

This commit is contained in:
Phillip Toland 2008-12-10 16:04:34 -06:00
parent eab107df7a
commit 7ec347c73e
2 changed files with 27 additions and 18 deletions

View file

@ -80,8 +80,9 @@ static hive_hash* G_DATABASES_NAMES;
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) #define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
#define DECODE_BYTE(_buf, _off) (_buf[_off]) #define DECODE_BYTE(_buf, _off) (_buf[_off])
#define DECODE_INT(_buf, _off) (*((int*)_buf+_off)) #define DECODE_INT(_buf, _off) *((int*)(_buf+(_off)))
#define DECODE_STRING(_buf, _off) (char*)(_buf+_off) #define DECODE_STRING(_buf, _off) (char*)(_buf+(_off))
#define DECODE_BLOB(_buf, _off) (void*)(_buf+(_off))
#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.bin->orig_size; #define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.bin->orig_size;
@ -585,8 +586,11 @@ static int close_database(int dbref, unsigned flags, PortData* data)
static void do_async_put(void* arg) static void do_async_put(void* arg)
{ {
printf("do_async_put\n"); printf("do_async_put\n");
// Payload is: <<DbRef:32, Flags:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>>
AsyncData* adata = (AsyncData*)arg; AsyncData* adata = (AsyncData*)arg;
unsigned flags = DECODE_INT(adata->payload, 4);
// Setup DBTs // Setup DBTs
DBT key; DBT key;
DBT value; DBT value;
@ -594,16 +598,15 @@ static void do_async_put(void* arg)
memset(&value, '\0', sizeof(DBT)); memset(&value, '\0', sizeof(DBT));
// Parse payload into DBTs // Parse payload into DBTs
// Payload is: << DbRef:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>> key.size = DECODE_INT(adata->payload, 8);
key.size = *((int*)(adata->payload + 4)); key.data = DECODE_BLOB(adata->payload, 12);
key.data = (void*)(adata->payload + 8); value.size = DECODE_INT(adata->payload, 12 + key.size);
value.size = *((int*)(adata->payload + 8 + key.size)); value.data = DECODE_BLOB(adata->payload, 12 + key.size + 4);
value.data = (void*)(adata->payload + 8 + key.size + 4);
// Execute the actual put -- we'll process the result back in the driver_async_ready function // Execute the actual put -- we'll process the result back in the driver_async_ready function
// All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still // All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still
// be atomic // be atomic
adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0); adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, flags);
} }
static void do_async_put_free(void* arg) static void do_async_put_free(void* arg)

View file

@ -10,7 +10,7 @@
open_database/3, open_database/4, open_database/3, open_database/4,
close_database/2, close_database/2,
txn_begin/1, txn_commit/1, txn_abort/1, txn_begin/1, txn_commit/1, txn_abort/1,
put/4, put/4, put/5,
get/3]). get/3]).
-define(CMD_NONE, 0). -define(CMD_NONE, 0).
@ -36,7 +36,10 @@
-define(DB_THREAD, 16#00000004). -define(DB_THREAD, 16#00000004).
-define(DB_TRUNCATE, 16#00008000). -define(DB_TRUNCATE, 16#00008000).
-define(DB_NOSYNC, 21). -define(DB_APPEND, 2).
-define(DB_NODUPDATA, 19).
-define(DB_NOOVERWRITE, 20).
-define(DB_NOSYNC, 21).
-define(STATUS_OK, 0). -define(STATUS_OK, 0).
-define(STATUS_ERROR, 1). -define(STATUS_ERROR, 1).
@ -119,11 +122,14 @@ txn_abort(Port) ->
?ERROR_NO_TXN -> {error, no_txn} ?ERROR_NO_TXN -> {error, no_txn}
end. end.
put(Port, DbRef, Key, Value) -> put(Port, DbRef, Key, Value) ->
put(Port, DbRef, Key, Value, []).
put(Port, DbRef, Key, Value, Opts) ->
{KeyLen, KeyBin} = to_binary(Key), {KeyLen, KeyBin} = to_binary(Key),
{ValLen, ValBin} = to_binary(Value), {ValLen, ValBin} = to_binary(Value),
Cmd = <<DbRef:32/native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>, Flags = process_flags(Opts),
Cmd = <<DbRef:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>,
<<Result:32/native>> = erlang:port_control(Port, ?CMD_PUT, Cmd), <<Result:32/native>> = erlang:port_control(Port, ?CMD_PUT, Cmd),
case Result of case Result of
?ERROR_NONE -> ?ERROR_NONE ->
@ -150,27 +156,27 @@ get(Port, DbRef, Key) ->
?ERROR_ASYNC_PENDING -> {error, async_pending}; ?ERROR_ASYNC_PENDING -> {error, async_pending};
?ERROR_INVALID_DBREF -> {error, invalid_dbref} ?ERROR_INVALID_DBREF -> {error, invalid_dbref}
end. end.
to_binary(Term) -> to_binary(Term) ->
Bin = term_to_binary(Term), Bin = term_to_binary(Term),
{size(Bin), Bin}. {size(Bin), Bin}.
process_flags([Flag]) -> process_flags([]) ->
flag_value(Flag); 0;
process_flags([Flag|Flags]) -> process_flags([Flag|Flags]) ->
flag_value(Flag) bor process_flags(Flags). flag_value(Flag) bor process_flags(Flags).
flag_value(Flag) -> flag_value(Flag) ->
case Flag of case Flag of
append -> ?DB_APPEND;
auto_commit -> ?DB_AUTO_COMMIT; auto_commit -> ?DB_AUTO_COMMIT;
create -> ?DB_CREATE; create -> ?DB_CREATE;
exclusive -> ?DB_EXCL; exclusive -> ?DB_EXCL;
multiversion -> ?DB_MULTIVERSION; multiversion -> ?DB_MULTIVERSION;
no_duplicate -> ?DB_NODUPDATA;
no_mmap -> ?DB_NOMMAP; no_mmap -> ?DB_NOMMAP;
no_overwrite -> ?DB_NOOVERWRITE;
no_sync -> ?DB_NOSYNC; no_sync -> ?DB_NOSYNC;
readonly -> ?DB_RDONLY; readonly -> ?DB_RDONLY;
threaded -> ?DB_THREAD; threaded -> ?DB_THREAD;