Renaming from wterl to wt (less redundant and more meaningful name)

and working on a shared single cache for all vnodes.
This commit is contained in:
Gregory Burd 2013-03-10 21:42:31 -04:00
parent 57917f8bc6
commit 83dfc9e396
16 changed files with 970 additions and 862 deletions

View file

@ -1,4 +1,4 @@
TARGET= wterl TARGET= wt
REBAR= ./rebar REBAR= ./rebar
#REBAR= /usr/bin/env rebar #REBAR= /usr/bin/env rebar
@ -18,15 +18,15 @@ get-deps:
update-deps: update-deps:
@$(REBAR) update-deps @$(REBAR) update-deps
c_src/wterl.o: c_src/wt.o:
touch c_src/wterl.c touch c_src/wt.c
ebin/app_helper.beam: ebin/app_helper.beam:
@echo You need to: @echo You need to:
@echo cp ../riak/deps/riak_core/ebin/app_helper.beam ebin @echo cp ../riak/deps/riak_core/ebin/app_helper.beam ebin
@/bin/false @/bin/false
compile: c_src/wterl.o ebin/app_helper.beam compile: c_src/wt.o ebin/app_helper.beam
@$(REBAR) compile @$(REBAR) compile
clean: clean:

View file

@ -1,19 +1,19 @@
`wterl` is an Erlang interface to the WiredTiger database, and is written `wt` is an Erlang interface to the WiredTiger database, and is written
to support a Riak storage backend that uses WiredTiger. to support a Riak storage backend that uses WiredTiger.
This backend currently supports only key-value storage and retrieval. This backend currently supports only key-value storage and retrieval.
Remaining work includes: Remaining work includes:
* The `wterl:session_create` function currently returns an error under * The `wt:session_create` function currently returns an error under
certain circumstances, so we currently ignore its return value. certain circumstances, so we currently ignore its return value.
* The `riak_kv_wterl_backend` module is currently designed to rely on the * The `riak_kv_wt_backend` module is currently designed to rely on the
fact that it runs in just a single Erlang scheduler thread, which is fact that it runs in just a single Erlang scheduler thread, which is
necessary because WiredTiger doesn't allow a session to be used necessary because WiredTiger doesn't allow a session to be used
concurrently by different threads. If the KV node design ever changes to concurrently by different threads. If the KV node design ever changes to
involve concurrency across scheduler threads, this current design will no involve concurrency across scheduler threads, this current design will no
longer work correctly. longer work correctly.
* Currently the `riak_kv_wterl_backend` module is stored in this * Currently the `riak_kv_wt_backend` module is stored in this
repository, but it really belongs in the `riak_kv` repository. repository, but it really belongs in the `riak_kv` repository.
* There are currently some stability issues with WiredTiger that can * There are currently some stability issues with WiredTiger that can
sometimes cause errors when restarting KV nodes with non-empty WiredTiger sometimes cause errors when restarting KV nodes with non-empty WiredTiger
@ -25,8 +25,8 @@ under development but are not yet available.
Deploying Deploying
--------- ---------
You can deploy `wterl` into a Riak devrel cluster using the `enable-wterl` You can deploy `wt` into a Riak devrel cluster using the `enable-wt`
script. Clone the `riak` repo, change your working directory to it, and script. Clone the `riak` repo, change your working directory to it, and
then execute the `enable-wterl` script. It adds `wterl` as a dependency, then execute the `enable-wt` script. It adds `wt` as a dependency,
runs `make all devrel`, and then modifies the configuration settings of the runs `make all devrel`, and then modifies the configuration settings of the
resulting dev nodes to use the WiredTiger storage backend. resulting dev nodes to use the WiredTiger storage backend.

View file

@ -18,6 +18,7 @@ case "$1" in
tar -xjf wiredtiger-$WT_VSN.tar.bz2 tar -xjf wiredtiger-$WT_VSN.tar.bz2
# --enable-snappy --enable-bzip2 --enable-lz4 \
(cd wiredtiger-$WT_VSN/build_posix && \ (cd wiredtiger-$WT_VSN/build_posix && \
../configure --with-pic \ ../configure --with-pic \
--prefix=$BASEDIR/system && \ --prefix=$BASEDIR/system && \

672
c_src/wt.c Normal file
View file

@ -0,0 +1,672 @@
// -------------------------------------------------------------------
//
// wt: Erlang Wrapper for WiredTiger
//
// Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
//
// This file is provided to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// -------------------------------------------------------------------
#include "erl_nif.h"
#include "erl_driver.h"
#include <stdio.h>
#include <string.h>
#include "wiredtiger.h"
static ErlNifResourceType* wt_conn_RESOURCE;
static ErlNifResourceType* wt_session_RESOURCE;
static ErlNifResourceType* wt_cursor_RESOURCE;
typedef struct {
WT_CONNECTION* conn;
} WtConnHandle;
typedef struct {
WT_SESSION* session;
} WtSessionHandle;
typedef struct {
WT_CURSOR* cursor;
} WtCursorHandle;
typedef char Uri[128]; // object names
// Atoms (initialized in on_load)
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
typedef ERL_NIF_TERM (*CursorRetFun)(ErlNifEnv* env, WT_CURSOR* cursor, int rc);
// Prototypes
static ERL_NIF_TERM wt_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc);
static ERL_NIF_TERM wt_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc);
static ERL_NIF_TERM wt_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[],
CursorRetFun cursor_ret_fun, int next);
static ERL_NIF_TERM wt_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near);
static ERL_NIF_TERM wt_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc);
static ERL_NIF_TERM wt_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wt_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ErlNifFunc nif_funcs[] =
{
{"conn_close", 1, wt_conn_close},
{"conn_open", 2, wt_conn_open},
{"cursor_close", 1, wt_cursor_close},
{"cursor_insert", 3, wt_cursor_insert},
{"cursor_next", 1, wt_cursor_next},
{"cursor_next_key", 1, wt_cursor_next_key},
{"cursor_next_value", 1, wt_cursor_next_value},
{"cursor_open", 2, wt_cursor_open},
{"cursor_prev", 1, wt_cursor_prev},
{"cursor_prev_key", 1, wt_cursor_prev_key},
{"cursor_prev_value", 1, wt_cursor_prev_value},
{"cursor_remove", 3, wt_cursor_remove},
{"cursor_reset", 1, wt_cursor_reset},
{"cursor_search", 2, wt_cursor_search},
{"cursor_search_near", 2, wt_cursor_search_near},
{"cursor_update", 3, wt_cursor_update},
{"session_checkpoint", 2, wt_session_checkpoint},
{"session_close", 1, wt_session_close},
{"session_create", 3, wt_session_create},
{"session_delete", 3, wt_session_delete},
{"session_drop", 3, wt_session_drop},
{"session_get", 3, wt_session_get},
{"session_open", 2, wt_session_open},
{"session_put", 4, wt_session_put},
{"session_rename", 4, wt_session_rename},
{"session_salvage", 3, wt_session_salvage},
{"session_truncate", 3, wt_session_truncate},
{"session_upgrade", 3, wt_session_upgrade},
{"session_verify", 3, wt_session_verify},
};
static inline ERL_NIF_TERM wt_strerror(ErlNifEnv* env, int rc)
{
return rc == WT_NOTFOUND ?
enif_make_atom(env, "not_found") :
enif_make_tuple2(env, ATOM_ERROR,
enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1));
}
static ERL_NIF_TERM wt_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
ErlNifBinary config;
char homedir[4096];
if (enif_get_string(env, argv[0], homedir, sizeof homedir, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[1], &config))
{
WT_CONNECTION* conn;
int rc = wiredtiger_open(homedir, NULL, (const char*)config.data, &conn);
if (rc == 0)
{
WtConnHandle* conn_handle = enif_alloc_resource(wt_conn_RESOURCE, sizeof(WtConnHandle));
conn_handle->conn = conn;
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
enif_release_resource(conn_handle);
return enif_make_tuple2(env, ATOM_OK, result);
}
else
{
return wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtConnHandle* conn_handle;
if (enif_get_resource(env, argv[0], wt_conn_RESOURCE, (void**)&conn_handle))
{
WT_CONNECTION* conn = conn_handle->conn;
int rc = conn->close(conn, NULL);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
return enif_make_badarg(env);
}
#define WT_OP_CREATE 1
#define WT_OP_DROP 2
#define WT_OP_SALVAGE 3
#define WT_OP_TRUNCATE 4
#define WT_OP_UPGRADE 5
#define WT_OP_VERIFY 6
static inline ERL_NIF_TERM wt_session_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op)
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
WT_SESSION* session = session_handle->session;
int rc;
Uri uri;
ErlNifBinary config;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &config))
{
switch (op)
{
case WT_OP_CREATE:
rc = session->create(session, uri, (const char*)config.data);
break;
case WT_OP_DROP:
rc = session->drop(session, uri, (const char*)config.data);
break;
case WT_OP_SALVAGE:
rc = session->salvage(session, uri, (const char*)config.data);
break;
case WT_OP_TRUNCATE:
// Ignore the cursor start/stop form of truncation for now,
// support only the full file truncation.
rc = session->truncate(session, uri, NULL, NULL, (const char*)config.data);
break;
case WT_OP_UPGRADE:
rc = session->upgrade(session, uri, (const char*)config.data);
break;
case WT_OP_VERIFY:
default:
rc = session->verify(session, uri, (const char*)config.data);
break;
}
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtConnHandle* conn_handle;
ErlNifBinary config;
if (enif_get_resource(env, argv[0], wt_conn_RESOURCE, (void**)&conn_handle) &&
enif_inspect_binary(env, argv[1], &config))
{
WT_CONNECTION* conn = conn_handle->conn;
WT_SESSION* session;
int rc = conn->open_session(conn, NULL, (const char *)config.data, &session);
if (rc == 0)
{
WtSessionHandle* session_handle =
enif_alloc_resource(wt_session_RESOURCE, sizeof(WtSessionHandle));
session_handle->session = session;
ERL_NIF_TERM result = enif_make_resource(env, session_handle);
enif_keep_resource(conn_handle);
enif_release_resource(session_handle);
return enif_make_tuple2(env, ATOM_OK, result);
}
else
{
return wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
WT_SESSION* session = session_handle->session;
int rc = session->close(session, NULL);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_session_worker(env, argc, argv, WT_OP_CREATE);
}
static ERL_NIF_TERM wt_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_session_worker(env, argc, argv, WT_OP_DROP);
}
static ERL_NIF_TERM wt_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
ErlNifBinary config;
Uri oldname, newname;
if (enif_get_string(env, argv[1], oldname, sizeof oldname, ERL_NIF_LATIN1) &&
enif_get_string(env, argv[2], newname, sizeof newname, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[3], &config))
{
WT_SESSION* session = session_handle->session;
int rc = session->rename(session, oldname, newname, (const char*)config.data);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_session_worker(env, argc, argv, WT_OP_SALVAGE);
}
static ERL_NIF_TERM wt_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
ErlNifBinary config;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle) &&
enif_inspect_binary(env, argv[1], &config))
{
WT_SESSION* session = session_handle->session;
int rc = session->checkpoint(session, (const char*)config.data);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_session_worker(env, argc, argv, WT_OP_TRUNCATE);
}
static ERL_NIF_TERM wt_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_session_worker(env, argc, argv, WT_OP_UPGRADE);
}
static ERL_NIF_TERM wt_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_session_worker(env, argc, argv, WT_OP_VERIFY);
}
static ERL_NIF_TERM wt_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
Uri uri;
ErlNifBinary key;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &key))
{
WT_SESSION* session = session_handle->session;
WT_CURSOR* cursor;
int rc = session->open_cursor(session, uri, NULL, "raw", &cursor);
if (rc != 0)
{
return wt_strerror(env, rc);
}
WT_ITEM raw_key;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
rc = cursor->remove(cursor);
cursor->close(cursor);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
Uri uri;
ErlNifBinary key;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &key))
{
WT_SESSION* session = session_handle->session;
WT_CURSOR* cursor;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor);
if (rc != 0)
{
return wt_strerror(env, rc);
}
WT_ITEM raw_key, raw_value;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
rc = cursor->search(cursor);
if (rc == 0)
{
rc = cursor->get_value(cursor, &raw_value);
if (rc == 0)
{
ERL_NIF_TERM value;
unsigned char* bin = enif_make_new_binary(env, raw_value.size, &value);
memcpy(bin, raw_value.data, raw_value.size);
cursor->close(cursor);
return enif_make_tuple2(env, ATOM_OK, value);
}
}
cursor->close(cursor);
return wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
Uri uri;
ErlNifBinary key, value;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &key) &&
enif_inspect_binary(env, argv[3], &value))
{
WT_SESSION* session = session_handle->session;
WT_CURSOR* cursor;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor);
if (rc != 0)
{
return wt_strerror(env, rc);
}
WT_ITEM raw_key, raw_value;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
raw_value.data = value.data;
raw_value.size = value.size;
cursor->set_value(cursor, &raw_value);
rc = cursor->insert(cursor);
cursor->close(cursor);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wt_session_RESOURCE, (void**)&session_handle))
{
WT_CURSOR* cursor;
Uri uri;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1))
{
WT_SESSION* session = session_handle->session;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor);
if (rc == 0)
{
WtCursorHandle* cursor_handle =
enif_alloc_resource(wt_cursor_RESOURCE, sizeof(WtCursorHandle));
cursor_handle->cursor = cursor;
ERL_NIF_TERM result = enif_make_resource(env, cursor_handle);
enif_keep_resource(session_handle);
enif_release_resource(cursor_handle);
return enif_make_tuple2(env, ATOM_OK, result);
}
else
{
return wt_strerror(env, rc);
}
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wt_cursor_RESOURCE, (void**)&cursor_handle))
{
WT_CURSOR* cursor = cursor_handle->cursor;
int rc = cursor->close(cursor);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc)
{
if (rc == 0)
{
WT_ITEM raw_key;
rc = cursor->get_key(cursor, &raw_key);
if (rc == 0)
{
ERL_NIF_TERM key;
memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size);
return enif_make_tuple2(env, ATOM_OK, key);
}
}
return wt_strerror(env, rc);
}
static ERL_NIF_TERM wt_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc)
{
if (rc == 0)
{
WT_ITEM raw_key, raw_value;
rc = cursor->get_key(cursor, &raw_key);
if (rc == 0)
{
rc = cursor->get_value(cursor, &raw_value);
if (rc == 0)
{
ERL_NIF_TERM key, value;
memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size);
memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size);
return enif_make_tuple3(env, ATOM_OK, key, value);
}
}
}
return wt_strerror(env, rc);
}
static ERL_NIF_TERM wt_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc)
{
if (rc == 0)
{
WT_ITEM raw_value;
rc = cursor->get_value(cursor, &raw_value);
if (rc == 0)
{
ERL_NIF_TERM value;
memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size);
return enif_make_tuple2(env, ATOM_OK, value);
}
}
return wt_strerror(env, rc);
}
static ERL_NIF_TERM wt_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[],
CursorRetFun cursor_ret, int prev)
{
WtCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wt_cursor_RESOURCE, (void**)&cursor_handle))
{
WT_CURSOR* cursor = cursor_handle->cursor;
return cursor_ret(env, cursor, prev == 0 ? cursor->next(cursor) : cursor->prev(cursor));
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_np_worker(env, argc, argv, wt_cursor_kv_ret, 0);
}
static ERL_NIF_TERM wt_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_np_worker(env, argc, argv, wt_cursor_key_ret, 0);
}
static ERL_NIF_TERM wt_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_np_worker(env, argc, argv, wt_cursor_value_ret, 0);
}
static ERL_NIF_TERM wt_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_np_worker(env, argc, argv, wt_cursor_kv_ret, 1);
}
static ERL_NIF_TERM wt_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_np_worker(env, argc, argv, wt_cursor_key_ret, 1);
}
static ERL_NIF_TERM wt_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_np_worker(env, argc, argv, wt_cursor_value_ret, 1);
}
static ERL_NIF_TERM wt_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near)
{
WtCursorHandle *cursor_handle;
ErlNifBinary key;
if (enif_get_resource(env, argv[0], wt_cursor_RESOURCE, (void**)&cursor_handle) &&
enif_inspect_binary(env, argv[1], &key))
{
WT_CURSOR* cursor = cursor_handle->cursor;
WT_ITEM raw_key;
int exact;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
// We currently ignore the less-than, greater-than or equals-to return information
// from the cursor.search_near method.
return wt_cursor_value_ret(env, cursor,
near == 1 ?
cursor->search_near(cursor, &exact) : cursor->search(cursor));
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_search_worker(env, argc, argv, 0);
}
static ERL_NIF_TERM wt_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_search_worker(env, argc, argv, 1);
}
static ERL_NIF_TERM wt_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WtCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wt_cursor_RESOURCE, (void**)&cursor_handle))
{
WT_CURSOR* cursor = cursor_handle->cursor;
int rc = cursor->reset(cursor);
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
return enif_make_badarg(env);
}
#define WT_OP_CURSOR_INSERT 1
#define WT_OP_CURSOR_UPDATE 2
#define WT_OP_CURSOR_REMOVE 3
static inline ERL_NIF_TERM wt_cursor_data_op(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op)
{
WtCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wt_cursor_RESOURCE, (void**)&cursor_handle))
{
ErlNifBinary key, value;
int rc;
if (enif_inspect_binary(env, argv[1], &key) && enif_inspect_binary(env, argv[2], &value))
{
WT_CURSOR* cursor = cursor_handle->cursor;
WT_ITEM raw_key, raw_value;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
raw_value.data = value.data;
raw_value.size = value.size;
cursor->set_value(cursor, &raw_value);
switch (op)
{
case WT_OP_CURSOR_INSERT:
rc = cursor->insert(cursor);
break;
case WT_OP_CURSOR_UPDATE:
rc = cursor->update(cursor);
break;
case WT_OP_CURSOR_REMOVE:
default:
rc = cursor->remove(cursor);
break;
}
return rc == 0 ? ATOM_OK : wt_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wt_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_data_op(env, argc, argv, WT_OP_CURSOR_INSERT);
}
static ERL_NIF_TERM wt_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_data_op(env, argc, argv, WT_OP_CURSOR_UPDATE);
}
static ERL_NIF_TERM wt_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wt_cursor_data_op(env, argc, argv, WT_OP_CURSOR_REMOVE);
}
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
wt_conn_RESOURCE = enif_open_resource_type(env, NULL, "wt_conn_resource", NULL, flags, NULL);
wt_session_RESOURCE = enif_open_resource_type(env, NULL, "wt_session_resource", NULL, flags, NULL);
wt_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wt_cursor_resource", NULL, flags, NULL);
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
return 0;
}
ERL_NIF_INIT(wt, nif_funcs, &on_load, NULL, NULL, NULL);

View file

@ -1,672 +0,0 @@
// -------------------------------------------------------------------
//
// wterl: Erlang Wrapper for WiredTiger
//
// Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
//
// This file is provided to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// -------------------------------------------------------------------
#include "erl_nif.h"
#include "erl_driver.h"
#include <stdio.h>
#include <string.h>
#include "wiredtiger.h"
static ErlNifResourceType* wterl_conn_RESOURCE;
static ErlNifResourceType* wterl_session_RESOURCE;
static ErlNifResourceType* wterl_cursor_RESOURCE;
typedef struct {
WT_CONNECTION* conn;
} WterlConnHandle;
typedef struct {
WT_SESSION* session;
} WterlSessionHandle;
typedef struct {
WT_CURSOR* cursor;
} WterlCursorHandle;
typedef char Uri[128]; // object names
// Atoms (initialized in on_load)
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
typedef ERL_NIF_TERM (*CursorRetFun)(ErlNifEnv* env, WT_CURSOR* cursor, int rc);
// Prototypes
static ERL_NIF_TERM wterl_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc);
static ERL_NIF_TERM wterl_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc);
static ERL_NIF_TERM wterl_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[],
CursorRetFun cursor_ret_fun, int next);
static ERL_NIF_TERM wterl_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near);
static ERL_NIF_TERM wterl_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc);
static ERL_NIF_TERM wterl_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM wterl_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
static ErlNifFunc nif_funcs[] =
{
{"conn_close", 1, wterl_conn_close},
{"conn_open", 2, wterl_conn_open},
{"cursor_close", 1, wterl_cursor_close},
{"cursor_insert", 3, wterl_cursor_insert},
{"cursor_next", 1, wterl_cursor_next},
{"cursor_next_key", 1, wterl_cursor_next_key},
{"cursor_next_value", 1, wterl_cursor_next_value},
{"cursor_open", 2, wterl_cursor_open},
{"cursor_prev", 1, wterl_cursor_prev},
{"cursor_prev_key", 1, wterl_cursor_prev_key},
{"cursor_prev_value", 1, wterl_cursor_prev_value},
{"cursor_remove", 3, wterl_cursor_remove},
{"cursor_reset", 1, wterl_cursor_reset},
{"cursor_search", 2, wterl_cursor_search},
{"cursor_search_near", 2, wterl_cursor_search_near},
{"cursor_update", 3, wterl_cursor_update},
{"session_checkpoint", 2, wterl_session_checkpoint},
{"session_close", 1, wterl_session_close},
{"session_create", 3, wterl_session_create},
{"session_delete", 3, wterl_session_delete},
{"session_drop", 3, wterl_session_drop},
{"session_get", 3, wterl_session_get},
{"session_open", 2, wterl_session_open},
{"session_put", 4, wterl_session_put},
{"session_rename", 4, wterl_session_rename},
{"session_salvage", 3, wterl_session_salvage},
{"session_truncate", 3, wterl_session_truncate},
{"session_upgrade", 3, wterl_session_upgrade},
{"session_verify", 3, wterl_session_verify},
};
static inline ERL_NIF_TERM wterl_strerror(ErlNifEnv* env, int rc)
{
return rc == WT_NOTFOUND ?
enif_make_atom(env, "not_found") :
enif_make_tuple2(env, ATOM_ERROR,
enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1));
}
static ERL_NIF_TERM wterl_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
ErlNifBinary config;
char homedir[4096];
if (enif_get_string(env, argv[0], homedir, sizeof homedir, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[1], &config))
{
WT_CONNECTION* conn;
int rc = wiredtiger_open(homedir, NULL, (const char*)config.data, &conn);
if (rc == 0)
{
WterlConnHandle* conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle));
conn_handle->conn = conn;
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
enif_release_resource(conn_handle);
return enif_make_tuple2(env, ATOM_OK, result);
}
else
{
return wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlConnHandle* conn_handle;
if (enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&conn_handle))
{
WT_CONNECTION* conn = conn_handle->conn;
int rc = conn->close(conn, NULL);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
return enif_make_badarg(env);
}
#define WTERL_OP_CREATE 1
#define WTERL_OP_DROP 2
#define WTERL_OP_SALVAGE 3
#define WTERL_OP_TRUNCATE 4
#define WTERL_OP_UPGRADE 5
#define WTERL_OP_VERIFY 6
static inline ERL_NIF_TERM wterl_session_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op)
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
WT_SESSION* session = session_handle->session;
int rc;
Uri uri;
ErlNifBinary config;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &config))
{
switch (op)
{
case WTERL_OP_CREATE:
rc = session->create(session, uri, (const char*)config.data);
break;
case WTERL_OP_DROP:
rc = session->drop(session, uri, (const char*)config.data);
break;
case WTERL_OP_SALVAGE:
rc = session->salvage(session, uri, (const char*)config.data);
break;
case WTERL_OP_TRUNCATE:
// Ignore the cursor start/stop form of truncation for now,
// support only the full file truncation.
rc = session->truncate(session, uri, NULL, NULL, (const char*)config.data);
break;
case WTERL_OP_UPGRADE:
rc = session->upgrade(session, uri, (const char*)config.data);
break;
case WTERL_OP_VERIFY:
default:
rc = session->verify(session, uri, (const char*)config.data);
break;
}
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlConnHandle* conn_handle;
ErlNifBinary config;
if (enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&conn_handle) &&
enif_inspect_binary(env, argv[1], &config))
{
WT_CONNECTION* conn = conn_handle->conn;
WT_SESSION* session;
int rc = conn->open_session(conn, NULL, (const char *)config.data, &session);
if (rc == 0)
{
WterlSessionHandle* session_handle =
enif_alloc_resource(wterl_session_RESOURCE, sizeof(WterlSessionHandle));
session_handle->session = session;
ERL_NIF_TERM result = enif_make_resource(env, session_handle);
enif_keep_resource(conn_handle);
enif_release_resource(session_handle);
return enif_make_tuple2(env, ATOM_OK, result);
}
else
{
return wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
WT_SESSION* session = session_handle->session;
int rc = session->close(session, NULL);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_session_worker(env, argc, argv, WTERL_OP_CREATE);
}
static ERL_NIF_TERM wterl_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_session_worker(env, argc, argv, WTERL_OP_DROP);
}
static ERL_NIF_TERM wterl_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
ErlNifBinary config;
Uri oldname, newname;
if (enif_get_string(env, argv[1], oldname, sizeof oldname, ERL_NIF_LATIN1) &&
enif_get_string(env, argv[2], newname, sizeof newname, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[3], &config))
{
WT_SESSION* session = session_handle->session;
int rc = session->rename(session, oldname, newname, (const char*)config.data);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_session_worker(env, argc, argv, WTERL_OP_SALVAGE);
}
static ERL_NIF_TERM wterl_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
ErlNifBinary config;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle) &&
enif_inspect_binary(env, argv[1], &config))
{
WT_SESSION* session = session_handle->session;
int rc = session->checkpoint(session, (const char*)config.data);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_session_worker(env, argc, argv, WTERL_OP_TRUNCATE);
}
static ERL_NIF_TERM wterl_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_session_worker(env, argc, argv, WTERL_OP_UPGRADE);
}
static ERL_NIF_TERM wterl_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_session_worker(env, argc, argv, WTERL_OP_VERIFY);
}
static ERL_NIF_TERM wterl_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
Uri uri;
ErlNifBinary key;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &key))
{
WT_SESSION* session = session_handle->session;
WT_CURSOR* cursor;
int rc = session->open_cursor(session, uri, NULL, "raw", &cursor);
if (rc != 0)
{
return wterl_strerror(env, rc);
}
WT_ITEM raw_key;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
rc = cursor->remove(cursor);
cursor->close(cursor);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
Uri uri;
ErlNifBinary key;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &key))
{
WT_SESSION* session = session_handle->session;
WT_CURSOR* cursor;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor);
if (rc != 0)
{
return wterl_strerror(env, rc);
}
WT_ITEM raw_key, raw_value;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
rc = cursor->search(cursor);
if (rc == 0)
{
rc = cursor->get_value(cursor, &raw_value);
if (rc == 0)
{
ERL_NIF_TERM value;
unsigned char* bin = enif_make_new_binary(env, raw_value.size, &value);
memcpy(bin, raw_value.data, raw_value.size);
cursor->close(cursor);
return enif_make_tuple2(env, ATOM_OK, value);
}
}
cursor->close(cursor);
return wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
Uri uri;
ErlNifBinary key, value;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) &&
enif_inspect_binary(env, argv[2], &key) &&
enif_inspect_binary(env, argv[3], &value))
{
WT_SESSION* session = session_handle->session;
WT_CURSOR* cursor;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor);
if (rc != 0)
{
return wterl_strerror(env, rc);
}
WT_ITEM raw_key, raw_value;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
raw_value.data = value.data;
raw_value.size = value.size;
cursor->set_value(cursor, &raw_value);
rc = cursor->insert(cursor);
cursor->close(cursor);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlSessionHandle* session_handle;
if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle))
{
WT_CURSOR* cursor;
Uri uri;
if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1))
{
WT_SESSION* session = session_handle->session;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor);
if (rc == 0)
{
WterlCursorHandle* cursor_handle =
enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle));
cursor_handle->cursor = cursor;
ERL_NIF_TERM result = enif_make_resource(env, cursor_handle);
enif_keep_resource(session_handle);
enif_release_resource(cursor_handle);
return enif_make_tuple2(env, ATOM_OK, result);
}
else
{
return wterl_strerror(env, rc);
}
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle))
{
WT_CURSOR* cursor = cursor_handle->cursor;
int rc = cursor->close(cursor);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc)
{
if (rc == 0)
{
WT_ITEM raw_key;
rc = cursor->get_key(cursor, &raw_key);
if (rc == 0)
{
ERL_NIF_TERM key;
memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size);
return enif_make_tuple2(env, ATOM_OK, key);
}
}
return wterl_strerror(env, rc);
}
static ERL_NIF_TERM wterl_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc)
{
if (rc == 0)
{
WT_ITEM raw_key, raw_value;
rc = cursor->get_key(cursor, &raw_key);
if (rc == 0)
{
rc = cursor->get_value(cursor, &raw_value);
if (rc == 0)
{
ERL_NIF_TERM key, value;
memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size);
memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size);
return enif_make_tuple3(env, ATOM_OK, key, value);
}
}
}
return wterl_strerror(env, rc);
}
static ERL_NIF_TERM wterl_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc)
{
if (rc == 0)
{
WT_ITEM raw_value;
rc = cursor->get_value(cursor, &raw_value);
if (rc == 0)
{
ERL_NIF_TERM value;
memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size);
return enif_make_tuple2(env, ATOM_OK, value);
}
}
return wterl_strerror(env, rc);
}
static ERL_NIF_TERM wterl_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[],
CursorRetFun cursor_ret, int prev)
{
WterlCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle))
{
WT_CURSOR* cursor = cursor_handle->cursor;
return cursor_ret(env, cursor, prev == 0 ? cursor->next(cursor) : cursor->prev(cursor));
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_kv_ret, 0);
}
static ERL_NIF_TERM wterl_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_key_ret, 0);
}
static ERL_NIF_TERM wterl_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_value_ret, 0);
}
static ERL_NIF_TERM wterl_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_kv_ret, 1);
}
static ERL_NIF_TERM wterl_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_key_ret, 1);
}
static ERL_NIF_TERM wterl_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_value_ret, 1);
}
static ERL_NIF_TERM wterl_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near)
{
WterlCursorHandle *cursor_handle;
ErlNifBinary key;
if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle) &&
enif_inspect_binary(env, argv[1], &key))
{
WT_CURSOR* cursor = cursor_handle->cursor;
WT_ITEM raw_key;
int exact;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
// We currently ignore the less-than, greater-than or equals-to return information
// from the cursor.search_near method.
return wterl_cursor_value_ret(env, cursor,
near == 1 ?
cursor->search_near(cursor, &exact) : cursor->search(cursor));
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_search_worker(env, argc, argv, 0);
}
static ERL_NIF_TERM wterl_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_search_worker(env, argc, argv, 1);
}
static ERL_NIF_TERM wterl_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
WterlCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle))
{
WT_CURSOR* cursor = cursor_handle->cursor;
int rc = cursor->reset(cursor);
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
return enif_make_badarg(env);
}
#define WTERL_OP_CURSOR_INSERT 1
#define WTERL_OP_CURSOR_UPDATE 2
#define WTERL_OP_CURSOR_REMOVE 3
static inline ERL_NIF_TERM wterl_cursor_data_op(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op)
{
WterlCursorHandle *cursor_handle;
if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle))
{
ErlNifBinary key, value;
int rc;
if (enif_inspect_binary(env, argv[1], &key) && enif_inspect_binary(env, argv[2], &value))
{
WT_CURSOR* cursor = cursor_handle->cursor;
WT_ITEM raw_key, raw_value;
raw_key.data = key.data;
raw_key.size = key.size;
cursor->set_key(cursor, &raw_key);
raw_value.data = value.data;
raw_value.size = value.size;
cursor->set_value(cursor, &raw_value);
switch (op)
{
case WTERL_OP_CURSOR_INSERT:
rc = cursor->insert(cursor);
break;
case WTERL_OP_CURSOR_UPDATE:
rc = cursor->update(cursor);
break;
case WTERL_OP_CURSOR_REMOVE:
default:
rc = cursor->remove(cursor);
break;
}
return rc == 0 ? ATOM_OK : wterl_strerror(env, rc);
}
}
return enif_make_badarg(env);
}
static ERL_NIF_TERM wterl_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_INSERT);
}
static ERL_NIF_TERM wterl_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_UPDATE);
}
static ERL_NIF_TERM wterl_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_REMOVE);
}
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", NULL, flags, NULL);
wterl_session_RESOURCE = enif_open_resource_type(env, NULL, "wterl_session_resource", NULL, flags, NULL);
wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", NULL, flags, NULL);
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
return 0;
}
ERL_NIF_INIT(wterl, nif_funcs, &on_load, NULL, NULL, NULL);

View file

@ -1,12 +1,12 @@
#!/bin/sh #!/bin/sh
# This script adds wterl to a riak github repo. Run it in the riak repo # This script adds wt to a riak github repo. Run it in the riak repo
# directory. # directory.
# #
# First it adds wterl, then runs "make all devrel" and then enables the # First it adds wt, then runs "make all devrel" and then enables the
# wterl storage backend in the resulting dev nodes. # wt storage backend in the resulting dev nodes.
# #
# This script is intended to be temporary. Once wterl is made into a proper # This script is intended to be temporary. Once wt is made into a proper
# riak citizen, this script will no longer be needed. # riak citizen, this script will no longer be needed.
set -e set -e
@ -35,17 +35,17 @@ fi
rebar get-deps rebar get-deps
file=./deps/riak_kv/src/riak_kv.app.src file=./deps/riak_kv/src/riak_kv.app.src
if ! grep -q hanoidb $file && ! grep -q wterl $file ; then if ! grep -q hanoidb $file && ! grep -q wt $file ; then
echo echo
echo "Modifying $file, saving the original as ${file}.orig ..." echo "Modifying $file, saving the original as ${file}.orig ..."
perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file perl -i.orig -pe '/\bos_mon,/ && print qq( wt,\n)' $file
fi fi
file=./deps/riak_kv/rebar.config file=./deps/riak_kv/rebar.config
if ! grep -q wterl $file ; then if ! grep -q wt $file ; then
echo echo
echo "Modifying $file, saving the original as ${file}.orig ..." echo "Modifying $file, saving the original as ${file}.orig ..."
perl -i.orig -pe '/\bsext\b/ && print qq( {wterl, ".*", {git, "git\@github.com:basho-labs/wterl.git", "master"}},\n)' $file perl -i.orig -pe '/\bsext\b/ && print qq( {wt, ".*", {git, "git\@github.com:basho-labs/wt.git", "master"}},\n)' $file
fi fi
rebar get-deps rebar get-deps
@ -55,6 +55,6 @@ make all stagedevrel
echo echo
echo 'Modifying all dev/dev*/etc/app.config files, saving originals with .orig suffix...' echo 'Modifying all dev/dev*/etc/app.config files, saving originals with .orig suffix...'
perl -i.orig -ne 'if (/\bstorage_backend,/) { s/(storage_backend, )[^\}]+/\1riak_kv_wterl_backend/; print } elsif (/\{eleveldb,/) { $eleveldb++; print } elsif ($eleveldb && /^\s+\]\},/) { $eleveldb = 0; print; print qq(\n {wterl, [\n {data_root, "./data/wt"}\n ]},\n\n) } else { print }' dev/dev*/etc/app.config perl -i.orig -ne 'if (/\bstorage_backend,/) { s/(storage_backend, )[^\}]+/\1riak_kv_wt_backend/; print } elsif (/\{eleveldb,/) { $eleveldb++; print } elsif ($eleveldb && /^\s+\]\},/) { $eleveldb = 0; print; print qq(\n {wt, [\n {data_root, "./data/wt"}\n ]},\n\n) } else { print }' dev/dev*/etc/app.config
exit 0 exit 0

View file

@ -1,6 +1,39 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et %% ex: ft=erlang ts=4 sw=4 et
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{require_otp_vsn, "R1[456]"}.
{cover_enabled, true}.
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [%{d,'DEBUG',true},
debug_info,
fail_on_warning,
warn_unused_vars,
warn_export_all,
warn_shadow_vars,
warn_unused_import,
warn_unused_function,
warn_bif_clash,
warn_unused_record,
warn_deprecated_function,
warn_obsolete_guard,
warn_export_vars,
warn_exported_vars,
warn_untyped_record,
{parse_transform, lager_transform}
%warn_missing_spec,
%strict_validation
]}.
{xref_checks, [undefined_function_calls]}.
{deps, [
{lager, "1.2.2", {git, "git://github.com/basho/lager", {tag, "1.2.2"}}}
]}.
{port_specs, [{"priv/wt.so", ["c_src/*.c"]}]}.
{port_env, [ {port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"}, {"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
@ -10,4 +43,3 @@
{pre_hooks, [{compile, "c_src/build_deps.sh"}]}. {pre_hooks, [{compile, "c_src/build_deps.sh"}]}.
{post_hooks, [{clean, "c_src/build_deps.sh clean"}]}. {post_hooks, [{clean, "c_src/build_deps.sh clean"}]}.

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% riak_kv_wterl_backend: WiredTiger Driver for Riak %% riak_kv_wiredtiger_backend: Use WiredTiger for Riak/KV storage
%% %%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
@ -20,7 +20,7 @@
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(riak_kv_wterl_backend). -module(riak_kv_wiredtiger_backend).
-behavior(temp_riak_kv_backend). -behavior(temp_riak_kv_backend).
-author('Steve Vinoski <steve@basho.com>'). -author('Steve Vinoski <steve@basho.com>').
@ -50,9 +50,9 @@
%%-define(CAPABILITIES, [async_fold, indexes]). %%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(state, {conn :: wterl:connection(), -record(state, {conn :: wt:connection(), %% There is one shared conection
session :: wt:session(), %% But a session per
table :: string(), table :: string(),
session :: wterl:session(),
partition :: integer()}). partition :: integer()}).
-type state() :: #state{}. -type state() :: #state{}.
@ -78,33 +78,57 @@ capabilities(_) ->
capabilities(_, _) -> capabilities(_, _) ->
{ok, ?CAPABILITIES}. {ok, ?CAPABILITIES}.
%% @doc Start the wterl backend %% @doc Start the WiredTiger backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}. -spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) -> start(Partition, Config) ->
%% Get the data root directory %% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of case app_helper:get_prop_or_env(data_root, Config, wt) of
<<"">> ->
lager:error("Failed to startup WiredTiger: data_root is not valid"),
{error, data_root_unset};
[] ->
lager:error("Failed to startup WiredTiger: data_root is empty"),
{error, data_root_unset};
undefined -> undefined ->
lager:error("Failed to create wterl dir: data_root is not set"), lager:error("Failed to startup WiredTiger: data_root is not set"),
{error, data_root_unset}; {error, data_root_unset};
DataRoot -> DataRoot ->
AppStart = case application:start(wterl) of CacheSize =
case proplists:get_value(cache_size, Config) of
undefined ->
case application:get_env(wt, cache_size) of
{ok, Value} ->
Value;
_ ->
SizeEst = best_guess_at_a_reasonable_cache_size(64),
%% lager:warning("Using estimated best cache size of ~p for WiredTiger backend.", [SizeEst]),
SizeEst
end;
Value ->
Value
end,
AppStarted =
case application:start(wt) of
ok -> ok ->
ok; ok;
{error, {already_started, _}} -> {error, {already_started, _}} ->
ok; ok;
{error, Reason} -> {error, Reason} ->
lager:error("Failed to start wterl: ~p", [Reason]), lager:error("Failed to start WiredTiger: ~p", [Reason]),
{error, Reason} {error, Reason}
end, end,
case AppStart of case AppStarted of
ok -> ok ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")), ConnectionOpts =
ConnectionOpts = [Config, [Config,
{create, true}, {create, true},
{logging, true}, {logging, true},
{transactional, true}, {transactional, true},
{session_max, 128}, {session_max, 128},
{cache_size, "2GB"}, {shared_cache, [{chunk, "64MB"},
{min, "1GB"},
{name, "wt-vnode-cache"},
{size, CacheSize}]},
{sync, false} {sync, false}
%% {verbose, %% {verbose,
%% ["block", "shared_cache", "ckpt", "evict", %% ["block", "shared_cache", "ckpt", "evict",
@ -112,23 +136,24 @@ start(Partition, Config) ->
%% "mutex", "read", "readserver", "reconcile", %% "mutex", "read", "readserver", "reconcile",
%% "salvage", "verify", "write"]} %% "salvage", "verify", "write"]}
], ],
case wterl_conn:open(DataRoot, ConnectionOpts) of ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
case wt_conn:open(DataRoot, ConnectionOpts) of
{ok, ConnRef} -> {ok, ConnRef} ->
Table = "lsm:wt" ++ integer_to_list(Partition), Table = "lsm:wt" ++ integer_to_list(Partition),
SessionOpenOpts = [{isolation, "snapshot"}], {ok, SRef} = wt:session_open(ConnRef),
{ok, SRef} = wterl:session_open(ConnRef, wterl:config_to_bin(SessionOpenOpts)), SessionOpts =
SessionOpts = [%TODO {block_compressor, "snappy"}, [%TODO {block_compressor, "snappy"},
{internal_page_max, "128K"}, {internal_page_max, "128K"},
{leaf_page_max, "256K"}, {leaf_page_max, "256K"},
{lsm_chunk_size, "256MB"}, {lsm_chunk_size, "256MB"},
{lsm_bloom_config, [{leaf_page_max, "16MB"}]} ], {lsm_bloom_config, [{leaf_page_max, "16MB"}]} ],
ok = wterl:session_create(SRef, Table, wterl:config_to_bin(SessionOpts)), ok = wt:session_create(SRef, Table, wt:config_to_bin(SessionOpts)),
{ok, #state{conn=ConnRef, {ok, #state{conn=ConnRef,
table=Table, table=Table,
session=SRef, session=SRef,
partition=Partition}}; partition=Partition}};
{error, ConnReason}=ConnError -> {error, ConnReason}=ConnError ->
lager:error("Failed to start wterl backend: ~p\n", lager:error("Failed to start WiredTiger storage backend: ~p\n",
[ConnReason]), [ConnReason]),
ConnError ConnError
end; end;
@ -137,20 +162,20 @@ start(Partition, Config) ->
end end
end. end.
%% @doc Stop the wterl backend %% @doc Stop the WiredTiger backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{conn=ConnRef, session=SRef}) -> stop(#state{conn=ConnRef, session=SRef}) ->
ok = wterl:session_close(SRef), ok = wt:session_close(SRef),
wterl_conn:close(ConnRef). wt_conn:close(ConnRef).
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the WiredTiger backend
-spec get(riak_object:bucket(), riak_object:key(), state()) -> -spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} | {ok, any(), state()} |
{ok, not_found, state()} | {ok, not_found, state()} |
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{table=Table, session=SRef}=State) -> get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:session_get(SRef, Table, WTKey) of case wt:session_get(SRef, Table, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State};
not_found -> not_found ->
@ -159,8 +184,8 @@ get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
{error, Reason, State} {error, Reason, State}
end. end.
%% @doc Insert an object into the wterl backend. %% @doc Insert an object into the WiredTiger backend.
%% NOTE: The wterl backend does not currently support %% NOTE: The WiredTiger backend does not currently support
%% secondary indexing and the_IndexSpecs parameter %% secondary indexing and the_IndexSpecs parameter
%% is ignored. %% is ignored.
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}. -type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
@ -169,15 +194,15 @@ get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
case wterl:session_put(SRef, Table, WTKey, Val) of case wt:session_put(SRef, Table, WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State}
end. end.
%% @doc Delete an object from the wterl backend %% @doc Delete an object from the WiredTiger backend
%% NOTE: The wterl backend does not currently support %% NOTE: The WiredTiger backend does not currently support
%% secondary indexing and the_IndexSpecs parameter %% secondary indexing and the_IndexSpecs parameter
%% is ignored. %% is ignored.
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
@ -185,7 +210,7 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:session_delete(SRef, Table, WTKey) of case wt:session_delete(SRef, Table, WTKey) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -201,18 +226,18 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun), FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder = BucketFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(ConnRef), {ok, SRef} = wt:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wt:cursor_open(SRef, Table),
try try
{FoldResult, _} = {FoldResult, _} =
wterl:fold_keys(Cursor, FoldFun, {Acc, []}), wt:fold_keys(Cursor, FoldFun, {Acc, []}),
FoldResult FoldResult
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wt:cursor_close(Cursor),
ok = wterl:session_close(SRef) ok = wt:session_close(SRef)
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -244,16 +269,16 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_keys_fun(FoldKeysFun, Limiter), FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder = KeyFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(ConnRef), {ok, SRef} = wt:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wt:cursor_open(SRef, Table),
try try
wterl:fold_keys(Cursor, FoldFun, Acc) wt:fold_keys(Cursor, FoldFun, Acc)
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wt:cursor_close(Cursor),
ok = wterl:session_close(SRef) ok = wt:session_close(SRef)
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -273,16 +298,16 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder = ObjectFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(ConnRef), {ok, SRef} = wt:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wt:cursor_open(SRef, Table),
try try
wterl:fold(Cursor, FoldFun, Acc) wt:fold(Cursor, FoldFun, Acc)
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wt:cursor_close(Cursor),
ok = wterl:session_close(SRef) ok = wt:session_close(SRef)
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -292,36 +317,36 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
{ok, ObjectFolder()} {ok, ObjectFolder()}
end. end.
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this WiredTiger backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{table=Table, session=SRef}=State) -> drop(#state{table=Table, session=SRef}=State) ->
case wterl:session_truncate(SRef, Table) of case wt:session_truncate(SRef, Table) of
ok -> ok ->
{ok, State}; {ok, State};
Error -> Error ->
{error, Error, State} {error, Error, State}
end. end.
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this WiredTiger backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{table=Table, session=SRef}) -> is_empty(#state{table=Table, session=SRef}) ->
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wt:cursor_open(SRef, Table),
try try
not_found =:= wterl:cursor_next(Cursor) not_found =:= wt:cursor_next(Cursor)
after after
ok = wterl:cursor_close(Cursor) ok = wt:cursor_close(Cursor)
end. end.
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this WiredTiger backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{table=Table, session=SRef}) -> status(#state{table=Table, session=SRef}) ->
{ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table), {ok, Cursor} = wt:cursor_open(SRef, "statistics:"++Table),
try try
Stats = fetch_status(Cursor), Stats = fetch_status(Cursor),
[{stats, Stats}] [{stats, Stats}]
after after
ok = wterl:cursor_close(Cursor) ok = wt:cursor_close(Cursor)
end. end.
%% @doc Register an asynchronous callback %% @doc Register an asynchronous callback
@ -440,14 +465,42 @@ from_index_key(LKey) ->
end. end.
%% @private %% @private
%% Return all status from wterl statistics cursor %% Return all status from WiredTiger statistics cursor
fetch_status(Cursor) -> fetch_status(Cursor) ->
fetch_status(Cursor, wterl:cursor_next_value(Cursor), []). fetch_status(Cursor, wt:cursor_next_value(Cursor), []).
fetch_status(_Cursor, not_found, Acc) -> fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc); lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) -> fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])], [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]). fetch_status(Cursor, wt:cursor_next_value(Cursor), [{What,Val}|Acc]).
best_guess_at_a_reasonable_cache_size(ChunkSizeInMB) ->
RunningApps = application:which_applications(),
case proplists:is_defined(sasl, RunningApps) andalso
proplists:is_defined(os_mon, RunningApps) of
true ->
MemInfo = memsup:get_system_memory_data(),
AvailableRAM = proplists:get_value(system_total_memory, MemInfo),
FreeRAM = proplists:get_value(free_memory, MemInfo),
CurrentlyInUseByErlang = proplists:get_value(total, erlang:memory()),
OneThirdOfRemainingRAM = ((AvailableRAM - CurrentlyInUseByErlang) div 3),
Remainder = OneThirdOfRemainingRAM rem (ChunkSizeInMB * 1024 * 1024),
EstCacheSize = (OneThirdOfRemainingRAM - Remainder),
GuessedSize =
case EstCacheSize > FreeRAM of
true ->
FreeRAM - (FreeRAM rem (ChunkSizeInMB * 1024 * 1024));
_ ->
EstCacheSize
end,
case GuessedSize < 809238528 of
true -> "1GB";
false -> integer_to_list(GuessedSize div (1024 * 1024)) ++ "MB"
end;
false ->
"1GB"
end.
%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests
@ -455,13 +508,13 @@ fetch_status(Cursor, {ok, Stat}, Acc) ->
-ifdef(TEST). -ifdef(TEST).
simple_test_() -> simple_test_() ->
?assertCmd("rm -rf test/wterl-backend"), ?assertCmd("rm -rf test/wiredtiger-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"), application:set_env(wt, data_root, "test/wiredtiger-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []). temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() -> custom_config_test_() ->
?assertCmd("rm -rf test/wterl-backend"), ?assertCmd("rm -rf test/wiredtiger-backend"),
application:set_env(wterl, data_root, ""), application:set_env(wt, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]). temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wiredtiger-backend"}]).
-endif. -endif.

View file

@ -23,8 +23,8 @@
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE %%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
%%% %%%
%%% This is a temporary copy of riak_kv_backend, just here to keep %%% This is a temporary copy of riak_kv_backend, just here to keep
%%% wterl development private for now. When riak_kv_wterl_backend is %%% WiredTiger development private for now. When riak_kv_wiredtiger_backend
%%% moved to riak_kv, delete this file. %%% is moved to riak_kv, delete this file.
%%% %%%
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE %%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
@ -36,7 +36,6 @@
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-export([standard_test/2]). -export([standard_test/2]).
-endif. -endif.
@ -272,8 +271,7 @@ empty_check({Backend, State}) ->
}. }.
setup({BackendMod, Config}) -> setup({BackendMod, Config}) ->
%% Start the backend {ok, S} = BackendMod:start(0, Config),
{ok, S} = BackendMod:start(42, Config),
{BackendMod, S}. {BackendMod, S}.
cleanup({BackendMod, S}) -> cleanup({BackendMod, S}) ->

12
src/wt.app.src Normal file
View file

@ -0,0 +1,12 @@
{application, wt,
[
{description, "Erlang NIF Wrapper for WiredTiger"},
{vsn, "1.0.0"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, {wt_app, []}},
{env, []}
]}.

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% wterl: Erlang Wrapper for WiredTiger %% wt: Erlang Wrapper for WiredTiger
%% %%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
@ -19,7 +19,7 @@
%% under the License. %% under the License.
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(wterl). -module(wt).
-export([conn_open/2, -export([conn_open/2,
conn_close/1, conn_close/1,
cursor_close/1, cursor_close/1,
@ -64,6 +64,7 @@
fold/3]). fold/3]).
-ifdef(TEST). -ifdef(TEST).
-export([config_to_bin/1]).
-ifdef(EQC). -ifdef(EQC).
-include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc.hrl").
-define(QC_OUT(P), -define(QC_OUT(P),
@ -272,6 +273,7 @@ fold(Cursor, Fun, Acc, {ok, Key, Value}) ->
config_types() -> config_types() ->
[{block_compressor, string}, [{block_compressor, string},
{cache_size, string}, {cache_size, string},
{chunk, string},
{create, bool}, {create, bool},
{direct_io, map}, {direct_io, map},
{drop, list}, {drop, list},
@ -290,16 +292,19 @@ config_types() ->
{logging, bool}, {logging, bool},
{lsm_bloom_config, config}, {lsm_bloom_config, config},
{lsm_chunk_size, string}, {lsm_chunk_size, string},
{min, string},
{multiprocess, bool}, {multiprocess, bool},
{name, string}, {name, string},
{session_max, integer}, {session_max, integer},
{shared_cache, config},
{size, string},
{sync, bool}, {sync, bool},
{target, list}, {target, list},
{transactional, bool}, {transactional, bool},
{verbose, map}]. {verbose, map}].
config_value(Key, Config, Default) -> config_value(Key, Config, Default) ->
{Key, app_helper:get_prop_or_env(Key, Config, wterl, Default)}. {Key, app_helper:get_prop_or_env(Key, Config, wt, Default)}.
config_encode(integer, Value) -> config_encode(integer, Value) ->
try try
@ -328,6 +333,8 @@ config_to_bin(Opts) ->
iolist_to_binary([config_to_bin(Opts, []), <<"\0">>]). iolist_to_binary([config_to_bin(Opts, []), <<"\0">>]).
config_to_bin([], Acc) -> config_to_bin([], Acc) ->
iolist_to_binary(Acc); iolist_to_binary(Acc);
config_to_bin([ [] | Rest], Acc) ->
config_to_bin(Rest, Acc);
config_to_bin([{Key, Value} | Rest], Acc) -> config_to_bin([{Key, Value} | Rest], Acc) ->
case lists:keysearch(Key, 1, config_types()) of case lists:keysearch(Key, 1, config_types()) of
{value, {Key, Type}} -> {value, {Key, Type}} ->
@ -353,12 +360,12 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
%% =================================================================== %% ===================================================================
-ifdef(TEST). -ifdef(TEST).
-define(TEST_DATA_DIR, "test/wterl.basic"). -define(TEST_DATA_DIR, "test/wt.basic").
open_test_conn(DataDir) -> open_test_conn(DataDir) ->
?assertCmd("rm -rf "++DataDir), ?assertCmd("rm -rf "++DataDir),
?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))), ?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))),
OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]), OpenConfig = config_to_bin([{create,true},{cache_size,"1GB"}]),
{ok, ConnRef} = conn_open(DataDir, OpenConfig), {ok, ConnRef} = conn_open(DataDir, OpenConfig),
ConnRef. ConnRef.
@ -433,7 +440,7 @@ various_session_test_() ->
end}, end},
{"session checkpoint", {"session checkpoint",
fun() -> fun() ->
Cfg = wterl:config_to_bin([{target, ["\"table:test\""]}]), Cfg = wt:config_to_bin([{target, ["\"table:test\""]}]),
?assertMatch(ok, session_checkpoint(SRef, Cfg)), ?assertMatch(ok, session_checkpoint(SRef, Cfg)),
?assertMatch({ok, <<"apple">>}, ?assertMatch({ok, <<"apple">>},
session_get(SRef, "table:test", <<"a">>)) session_get(SRef, "table:test", <<"a">>))
@ -608,10 +615,10 @@ ops(Keys, Values) ->
apply_kv_ops([], _SRef, _Tbl, Acc0) -> apply_kv_ops([], _SRef, _Tbl, Acc0) ->
Acc0; Acc0;
apply_kv_ops([{put, K, V} | Rest], SRef, Tbl, Acc0) -> apply_kv_ops([{put, K, V} | Rest], SRef, Tbl, Acc0) ->
ok = wterl:session_put(SRef, Tbl, K, V), ok = wt:session_put(SRef, Tbl, K, V),
apply_kv_ops(Rest, SRef, Tbl, orddict:store(K, V, Acc0)); apply_kv_ops(Rest, SRef, Tbl, orddict:store(K, V, Acc0));
apply_kv_ops([{delete, K, _} | Rest], SRef, Tbl, Acc0) -> apply_kv_ops([{delete, K, _} | Rest], SRef, Tbl, Acc0) ->
ok = case wterl:session_delete(SRef, Tbl, K) of ok = case wt:session_delete(SRef, Tbl, K) of
ok -> ok ->
ok; ok;
not_found -> not_found ->
@ -625,28 +632,28 @@ prop_put_delete() ->
?LET({Keys, Values}, {keys(), values()}, ?LET({Keys, Values}, {keys(), values()},
?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))),
begin begin
DataDir = "/tmp/wterl.putdelete.qc", DataDir = "/tmp/wt.putdelete.qc",
Table = "table:eqc", Table = "table:eqc",
?cmd("rm -rf "++DataDir), ?cmd("rm -rf "++DataDir),
ok = filelib:ensure_dir(filename:join(DataDir, "x")), ok = filelib:ensure_dir(filename:join(DataDir, "x")),
Cfg = wterl:config_to_bin([{create,true}]), Cfg = wt:config_to_bin([{create,true}]),
{ok, Conn} = wterl:conn_open(DataDir, Cfg), {ok, Conn} = wt:conn_open(DataDir, Cfg),
{ok, SRef} = wterl:session_open(Conn), {ok, SRef} = wt:session_open(Conn),
try try
wterl:session_create(SRef, Table), wt:session_create(SRef, Table),
Model = apply_kv_ops(Ops, SRef, Table, []), Model = apply_kv_ops(Ops, SRef, Table, []),
%% Validate that all deleted values return not_found %% Validate that all deleted values return not_found
F = fun({K, deleted}) -> F = fun({K, deleted}) ->
?assertEqual(not_found, wterl:session_get(SRef, Table, K)); ?assertEqual(not_found, wt:session_get(SRef, Table, K));
({K, V}) -> ({K, V}) ->
?assertEqual({ok, V}, wterl:session_get(SRef, Table, K)) ?assertEqual({ok, V}, wt:session_get(SRef, Table, K))
end, end,
lists:map(F, Model), lists:map(F, Model),
true true
after after
wterl:session_close(SRef), wt:session_close(SRef),
wterl:conn_close(Conn) wt:conn_close(Conn)
end end
end)). end)).

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% wterl: access to WiredTiger database %% wt: access to WiredTiger database
%% %%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
@ -19,7 +19,7 @@
%% under the License. %% under the License.
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(wterl_app). -module(wt_app).
-author('Steve Vinoski <steve@basho.com>'). -author('Steve Vinoski <steve@basho.com>').
-behaviour(application). -behaviour(application).
@ -33,7 +33,7 @@
%% =================================================================== %% ===================================================================
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
wterl_sup:start_link(). wt_sup:start_link().
stop(_State) -> stop(_State) ->
ok. ok.

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% wterl_conn: manage a connection to WiredTiger %% wt_conn: manage a connection to WiredTiger
%% %%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
@ -19,7 +19,7 @@
%% under the License. %% under the License.
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(wterl_conn). -module(wt_conn).
-author('Steve Vinoski <steve@basho.com>'). -author('Steve Vinoski <steve@basho.com>').
-behaviour(gen_server). -behaviour(gen_server).
@ -37,7 +37,7 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, { -record(state, {
conn :: wterl:connection() conn :: wt:connection()
}). }).
-type config_list() :: [{atom(), any()}]. -type config_list() :: [{atom(), any()}].
@ -54,11 +54,11 @@ start_link() ->
stop() -> stop() ->
gen_server:cast(?MODULE, stop). gen_server:cast(?MODULE, stop).
-spec open(string()) -> {ok, wterl:connection()} | {error, term()}. -spec open(string()) -> {ok, wt:connection()} | {error, term()}.
open(Dir) -> open(Dir) ->
open(Dir, []). open(Dir, []).
-spec open(string(), config_list()) -> {ok, wterl:connection()} | {error, term()}. -spec open(string(), config_list()) -> {ok, wt:connection()} | {error, term()}.
open(Dir, Config) -> open(Dir, Config) ->
gen_server:call(?MODULE, {open, Dir, Config, self()}, infinity). gen_server:call(?MODULE, {open, Dir, Config, self()}, infinity).
@ -70,7 +70,7 @@ is_open() ->
get() -> get() ->
gen_server:call(?MODULE, get, infinity). gen_server:call(?MODULE, get, infinity).
-spec close(wterl:connection()) -> ok. -spec close(wt:connection()) -> ok.
close(_Conn) -> close(_Conn) ->
gen_server:call(?MODULE, {close, self()}, infinity). gen_server:call(?MODULE, {close, self()}, infinity).
@ -79,17 +79,35 @@ close(_Conn) ->
%% ==================================================================== %% ====================================================================
init([]) -> init([]) ->
true = wterl_ets:table_ready(), true = wt_conn_deputy:table_ready(),
{ok, #state{}}. {ok, #state{}}.
handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) -> handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
Opts = [{create, true}, OptsA =
config_value(cache_size, Config, "100MB"), case proplists:get_bool(create, Config) of
config_value(session_max, Config, 100)], false -> [{create, false}];
{Reply, NState} = case wterl:conn_open(Dir, wterl:config_to_bin(Opts)) of _ -> [{create, true}]
end,
OptsB =
case proplists:is_defined(shared_cache, Config) of
true ->
[];
false ->
[{cache_size, config_value(cache_size, Config, "512MB")}]
end,
OptsC =
case proplists:is_defined(session_max, Config) of
true ->
[];
false ->
[{session_max, config_value(session_max, Config, 100)}]
end,
Opts = lists:merge([OptsA, OptsB, OptsC, Config]),
{Reply, NState} =
case wt:conn_open(Dir, wt:config_to_bin(Opts)) of
{ok, ConnRef}=OK -> {ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wt_conn_deputy, {Monitor, Caller}),
{OK, State#state{conn = ConnRef}}; {OK, State#state{conn = ConnRef}};
Error -> Error ->
{Error, State} {Error, State}
@ -97,7 +115,7 @@ handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
{reply, Reply, NState}; {reply, Reply, NState};
handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) -> handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wt_conn_deputy, {Monitor, Caller}),
{reply, {ok, ConnRef}, State}; {reply, {ok, ConnRef}, State};
handle_call(is_open, _From, #state{conn=ConnRef}=State) -> handle_call(is_open, _From, #state{conn=ConnRef}=State) ->
@ -109,10 +127,10 @@ handle_call(get, _From, #state{conn=ConnRef}=State) ->
{reply, {ok, ConnRef}, State}; {reply, {ok, ConnRef}, State};
handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) -> handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) ->
{[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1), {[{Monitor, Caller}], _} = ets:match_object(wt_conn_deputy, {'_', Caller}, 1),
true = erlang:demonitor(Monitor, [flush]), true = erlang:demonitor(Monitor, [flush]),
true = ets:delete(wterl_ets, Monitor), true = ets:delete(wt_conn_deputy, Monitor),
NState = case ets:info(wterl_ets, size) of NState = case ets:info(wt_conn_deputy, size) of
0 -> 0 ->
do_close(ConnRef), do_close(ConnRef),
State#state{conn=undefined}; State#state{conn=undefined};
@ -129,17 +147,17 @@ handle_cast(stop, #state{conn=ConnRef}=State) ->
do_close(ConnRef), do_close(ConnRef),
ets:foldl(fun({Monitor, _}, _) -> ets:foldl(fun({Monitor, _}, _) ->
true = erl:demonitor(Monitor, [flush]), true = erl:demonitor(Monitor, [flush]),
ets:delete(wterl_ets, Monitor) ets:delete(wt_conn_deputy, Monitor)
end, true, wterl_ets), end, true, wt_conn_deputy),
{stop, normal, State#state{conn=undefined}}; {stop, normal, State#state{conn=undefined}};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', Monitor, _, _, _}, #state{conn=ConnRef}=State) -> handle_info({'DOWN', Monitor, _, _, _}, #state{conn=ConnRef}=State) ->
NState = case ets:lookup(wterl_ets, Monitor) of NState = case ets:lookup(wt_conn_deputy, Monitor) of
[{Monitor, _}] -> [{Monitor, _}] ->
true = ets:delete(wterl_ets, Monitor), true = ets:delete(wt_conn_deputy, Monitor),
case ets:info(wterl_ets, size) of case ets:info(wt_conn_deputy, size) of
0 -> 0 ->
do_close(ConnRef), do_close(ConnRef),
State#state{conn=undefined}; State#state{conn=undefined};
@ -168,16 +186,16 @@ code_change(_OldVsn, State, _Extra) ->
do_close(undefined) -> do_close(undefined) ->
ok; ok;
do_close(ConnRef) -> do_close(ConnRef) ->
wterl:conn_close(ConnRef). wt:conn_close(ConnRef).
%% @private %% @private
config_value(Key, Config, Default) -> config_value(Key, Config, Default) ->
{Key, app_helper:get_prop_or_env(Key, Config, wterl, Default)}. {Key, app_helper:get_prop_or_env(Key, Config, wt, Default)}.
-ifdef(TEST). -ifdef(TEST).
-define(DATADIR, "test/wterl-backend"). -define(DATADIR, "test/wt-backend").
simple_test_() -> simple_test_() ->
{spawn, {spawn,
@ -185,7 +203,7 @@ simple_test_() ->
fun() -> fun() ->
?assertCmd("rm -rf " ++ ?DATADIR), ?assertCmd("rm -rf " ++ ?DATADIR),
?assertMatch(ok, filelib:ensure_dir(filename:join(?DATADIR, "x"))), ?assertMatch(ok, filelib:ensure_dir(filename:join(?DATADIR, "x"))),
EtsPid = case wterl_ets:start_link() of EtsPid = case wt_conn_deputy:start_link() of
{ok, Pid1} -> {ok, Pid1} ->
Pid1; Pid1;
{error, {already_started, Pid1}} -> {error, {already_started, Pid1}} ->
@ -201,7 +219,7 @@ simple_test_() ->
end, end,
fun(_) -> fun(_) ->
stop(), stop(),
wterl_ets:stop() wt_conn_deputy:stop()
end, end,
fun(_) -> fun(_) ->
{inorder, {inorder,
@ -215,14 +233,14 @@ simple_test_() ->
end}]}. end}]}.
open_one() -> open_one() ->
{ok, Ref} = open("test/wterl-backend", [{session_max, 20},{cache_size, "1MB"}]), {ok, Ref} = open("test/wt-backend", [{session_max, 20}]),
true = is_open(), true = is_open(),
close(Ref), close(Ref),
false = is_open(), false = is_open(),
ok. ok.
open_and_wait(Pid) -> open_and_wait(Pid) ->
{ok, Ref} = open("test/wterl-backend"), {ok, Ref} = open("test/wt-backend"),
Pid ! open, Pid ! open,
receive receive
close -> close ->

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% wterl_ets: ets table owner for wterl_conn %% wt_conn_deputy: ets table owner for wt_conn
%% %%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
@ -19,17 +19,17 @@
%% under the License. %% under the License.
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(wterl_ets). -module(wt_conn_deputy).
-author('Steve Vinoski <steve@basho.com>'). -author('Steve Vinoski <steve@basho.com>').
-behaviour(gen_server). -behaviour(gen_server).
%% ==================================================================== %% ====================================================================
%% The sole purpose of this module is to own the ets table used by the %% The sole purpose of this module is to own the ets table used by the
%% wterl_conn module. Holding the ets table in an otherwise do-nothing %% wt_conn module. Holding the ets table in an otherwise do-nothing
%% server avoids losing the table and its contents should an unexpected %% server avoids losing the table and its contents should an unexpected
%% error occur in wterl_conn if it were the owner instead. This module %% error occur in wt_conn if it were the owner instead. This module
%% is unit-tested as part of the wterl_conn module. %% is unit-tested as part of the wt_conn module.
%% ==================================================================== %% ====================================================================
%% API %% API

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% wterl_sup: supervisor for WiredTiger database app %% wt_sup: supervisor for WiredTiger database app
%% %%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
@ -19,7 +19,7 @@
%% under the License. %% under the License.
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(wterl_sup). -module(wt_sup).
-author('Steve Vinoski <steve@basho.com>'). -author('Steve Vinoski <steve@basho.com>').
-behaviour(supervisor). -behaviour(supervisor).
@ -45,5 +45,5 @@ start_link() ->
%% =================================================================== %% ===================================================================
init([]) -> init([]) ->
{ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker), {ok, {{one_for_one, 5, 10}, [?CHILD(wt_conn_deputy, worker),
?CHILD(wterl_conn, worker)]}}. ?CHILD(wt_conn, worker)]}}.

View file

@ -1,13 +0,0 @@
{application, wterl,
[
{description, "Erlang Wrapper for WiredTiger"},
{vsn, "0.9.0"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, {wterl_app, []}},
{env, [
]}
]}.