Merge pull request #6 from basho-labs/gsb-async-nifs3
Execute NIF calls on non-scheduler threads asynchronously
This commit is contained in:
commit
88416d1991
22 changed files with 4122 additions and 1102 deletions
4
.gdbinit
Normal file
4
.gdbinit
Normal file
|
@ -0,0 +1,4 @@
|
|||
handle SIGPIPE nostop noprint pass
|
||||
#b erl_nif.c:1203
|
||||
#b sys/unix/erl_unix_sys_ddll.c:234
|
||||
|
6
.gitignore
vendored
6
.gitignore
vendored
|
@ -1,8 +1,12 @@
|
|||
*.beam
|
||||
.eunit
|
||||
ebin
|
||||
priv/*.so
|
||||
c_src/system
|
||||
c_src/wiredtiger*/
|
||||
c_src/*.o
|
||||
c_src/bzip2-1.0.6
|
||||
c_src/snappy-1.0.4
|
||||
deps/
|
||||
priv/
|
||||
log/
|
||||
*~
|
||||
|
|
2
AUTHORS
Normal file
2
AUTHORS
Normal file
|
@ -0,0 +1,2 @@
|
|||
-author('Steve Vinoski <steve@basho.com>').
|
||||
-author('Gregory Burd <steve@basho.com>'). % greg@burd.me @gregburd
|
38
Makefile
38
Makefile
|
@ -3,6 +3,7 @@ TARGET= wterl
|
|||
REBAR= ./rebar
|
||||
#REBAR= /usr/bin/env rebar
|
||||
ERL= /usr/bin/env erl
|
||||
ERLEXEC= ${ERL_ROOTDIR}/lib/erlang/erts-5.9.1/bin/erlexec
|
||||
DIALYZER= /usr/bin/env dialyzer
|
||||
|
||||
|
||||
|
@ -13,12 +14,14 @@ all: compile
|
|||
deps: get-deps
|
||||
|
||||
get-deps:
|
||||
c_src/build_deps.sh get-deps
|
||||
@$(REBAR) get-deps
|
||||
|
||||
update-deps:
|
||||
c_src/build_deps.sh update-deps
|
||||
@$(REBAR) update-deps
|
||||
|
||||
c_src/wterl.o:
|
||||
c_src/wterl.o: c_src/async_nif.h
|
||||
touch c_src/wterl.c
|
||||
|
||||
ebin/app_helper.beam:
|
||||
|
@ -38,22 +41,41 @@ eunit: compile
|
|||
@$(REBAR) eunit skip_deps=true
|
||||
|
||||
eunit_console:
|
||||
@$(ERL) -pa .eunit deps/*/ebin
|
||||
@$(ERL) -pa .eunit deps/lager/ebin
|
||||
|
||||
plt: compile
|
||||
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/*/ebin --apps kernel stdlib
|
||||
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib
|
||||
|
||||
analyze: compile
|
||||
$(DIALYZER) --plt .$(TARGET).plt -pa deps/*/ebin ebin
|
||||
$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin
|
||||
|
||||
repl:
|
||||
$(ERL) -pz deps/*/ebin -pa ebin
|
||||
$(ERL) -pz deps/lager/ebin -pa ebin
|
||||
|
||||
gdb-repl:
|
||||
USE_GDB=1 $(ERL) -pz deps/*/ebin -pa ebin
|
||||
USE_GDB=1 $(ERL) -pz deps/lager/ebin -pa ebin
|
||||
|
||||
eunit-repl:
|
||||
$(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
|
||||
$(ERL) -pz deps/lager/ebin -pa ebin -pa .eunit
|
||||
|
||||
gdb-eunit-repl:
|
||||
USE_GDB=1 $(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
|
||||
USE_GDB=1 $(ERL) -pa .eunit -pz deps/lager/ebin -pz ebin -exec 'cd(".eunit").'
|
||||
|
||||
|
||||
ERL_TOP= /home/gburd/eng/otp_R15B01
|
||||
CERL= ${ERL_TOP}/bin/cerl
|
||||
VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD"
|
||||
|
||||
helgrind:
|
||||
valgrind --verbose --tool=helgrind \
|
||||
--leak-check=full
|
||||
--show-reachable=yes \
|
||||
--trace-children=yes \
|
||||
--track-origins=yes \
|
||||
--suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard \
|
||||
--show-possibly-lost=no \
|
||||
--malloc-fill=AB \
|
||||
--free-fill=CD ${ERLEXEC} -pz deps/lager/ebin -pa ebin -pa .eunit
|
||||
|
||||
valgrind:
|
||||
${CERL} -valgrind ${VALGRIND_FLAGS} --log-file=${ROOTDIR}/valgrind_log-beam.smp.%p -- -pz deps/lager/ebin -pa ebin -pa .eunit -exec 'eunit:test(wterl).'
|
||||
|
|
411
c_src/async_nif.h
Normal file
411
c_src/async_nif.h
Normal file
|
@ -0,0 +1,411 @@
|
|||
/*
|
||||
* async_nif: An async thread-pool layer for Erlang's NIF API
|
||||
*
|
||||
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
|
||||
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
|
||||
*
|
||||
* This file is provided to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file
|
||||
* except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
#ifndef __ASYNC_NIF_H__
|
||||
#define __ASYNC_NIF_H__
|
||||
|
||||
#if defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <assert.h>
|
||||
#include "fifo_q.h"
|
||||
#ifdef ASYNC_NIF_STATS
|
||||
#include "stats.h" // TODO: measure, measure... measure again
|
||||
#endif
|
||||
|
||||
#define ASYNC_NIF_MAX_WORKERS 128
|
||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
||||
|
||||
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
|
||||
|
||||
struct async_nif_work_queue {
|
||||
ErlNifMutex *reqs_mutex;
|
||||
ErlNifCond *reqs_cnd;
|
||||
FIFO_QUEUE_TYPE(reqs) reqs;
|
||||
};
|
||||
|
||||
struct async_nif_worker_entry {
|
||||
ErlNifTid tid;
|
||||
unsigned int worker_id;
|
||||
struct async_nif_state *async_nif;
|
||||
struct async_nif_work_queue *q;
|
||||
};
|
||||
|
||||
struct async_nif_state {
|
||||
unsigned int shutdown;
|
||||
unsigned int num_workers;
|
||||
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
|
||||
unsigned int num_queues;
|
||||
unsigned int next_q;
|
||||
struct async_nif_work_queue queues[];
|
||||
};
|
||||
|
||||
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
|
||||
struct decl ## _args frame; \
|
||||
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \
|
||||
static void fn_post_ ## decl (struct decl ## _args *args) { \
|
||||
do post_block while(0); \
|
||||
} \
|
||||
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
|
||||
struct decl ## _args on_stack_args; \
|
||||
struct decl ## _args *args = &on_stack_args; \
|
||||
struct decl ## _args *copy_of_args; \
|
||||
struct async_nif_req_entry *req = NULL; \
|
||||
const char *affinity = NULL; \
|
||||
ErlNifEnv *new_env = NULL; \
|
||||
/* argv[0] is a ref used for selective recv */ \
|
||||
const ERL_NIF_TERM *argv = argv_in + 1; \
|
||||
argc -= 1; \
|
||||
/* Note: !!! this assumes that the first element of priv_data is ours */ \
|
||||
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
|
||||
if (async_nif->shutdown) \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "shutdown")); \
|
||||
if (!(new_env = enif_alloc_env())) { \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "enomem")); \
|
||||
} \
|
||||
do pre_block while(0); \
|
||||
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
|
||||
if (!req) { \
|
||||
fn_post_ ## decl (args); \
|
||||
enif_free_env(new_env); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "enomem")); \
|
||||
} \
|
||||
memset(req, 0, sizeof(struct async_nif_req_entry)); \
|
||||
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
||||
if (!copy_of_args) { \
|
||||
fn_post_ ## decl (args); \
|
||||
enif_free(req); \
|
||||
enif_free_env(new_env); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "enomem")); \
|
||||
} \
|
||||
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
|
||||
req->env = new_env; \
|
||||
req->ref = enif_make_copy(new_env, argv_in[0]); \
|
||||
enif_self(env, &req->pid); \
|
||||
req->args = (void*)copy_of_args; \
|
||||
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
|
||||
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
|
||||
int h = -1; \
|
||||
if (affinity) \
|
||||
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
|
||||
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
|
||||
if (!reply) { \
|
||||
fn_post_ ## decl (args); \
|
||||
enif_free(req); \
|
||||
enif_free_env(new_env); \
|
||||
enif_free(copy_of_args); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "shutdown")); \
|
||||
} \
|
||||
return reply; \
|
||||
}
|
||||
|
||||
#define ASYNC_NIF_INIT(name) \
|
||||
static ErlNifMutex *name##_async_nif_coord = NULL;
|
||||
|
||||
#define ASYNC_NIF_LOAD(name, priv) do { \
|
||||
if (!name##_async_nif_coord) \
|
||||
name##_async_nif_coord = enif_mutex_create(NULL); \
|
||||
enif_mutex_lock(name##_async_nif_coord); \
|
||||
priv = async_nif_load(); \
|
||||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
} while(0);
|
||||
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
|
||||
if (!name##_async_nif_coord) \
|
||||
name##_async_nif_coord = enif_mutex_create(NULL); \
|
||||
enif_mutex_lock(name##_async_nif_coord); \
|
||||
async_nif_unload(env, priv); \
|
||||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
enif_mutex_destroy(name##_async_nif_coord); \
|
||||
name##_async_nif_coord = NULL; \
|
||||
} while(0);
|
||||
#define ASYNC_NIF_UPGRADE(name, env) do { \
|
||||
if (!name##_async_nif_coord) \
|
||||
name##_async_nif_coord = enif_mutex_create(NULL); \
|
||||
enif_mutex_lock(name##_async_nif_coord); \
|
||||
async_nif_upgrade(env); \
|
||||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
} while(0);
|
||||
|
||||
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
|
||||
#define ASYNC_NIF_WORK_ENV new_env
|
||||
|
||||
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
*/
|
||||
static inline unsigned int async_nif_str_hash_func(const char *s)
|
||||
{
|
||||
unsigned int h = (unsigned int)*s;
|
||||
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
|
||||
return h;
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
*/
|
||||
static ERL_NIF_TERM
|
||||
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
|
||||
{
|
||||
/* Identify the most appropriate worker for this request. */
|
||||
unsigned int qid = (hint != -1) ? hint : async_nif->next_q;
|
||||
struct async_nif_work_queue *q = NULL;
|
||||
do {
|
||||
q = &async_nif->queues[qid];
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
|
||||
/* Now that we hold the lock, check for shutdown. As long as we
|
||||
hold this lock either a) we're shutting down so exit now or
|
||||
b) this queue will be valid until we release the lock. */
|
||||
if (async_nif->shutdown) {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency)
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
qid = (qid + 1) % async_nif->num_queues;
|
||||
q = &async_nif->queues[qid];
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} while(1);
|
||||
|
||||
/* We hold the queue's lock, and we've seletect a reasonable queue for this
|
||||
new request so add the request. */
|
||||
fifo_q_put(reqs, q->reqs, req);
|
||||
|
||||
/* Build the term before releasing the lock so as not to race on the use of
|
||||
the req pointer (which will soon become invalid in another thread
|
||||
performing the request). */
|
||||
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
|
||||
enif_make_atom(req->env, "enqueued"));
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
enif_cond_signal(q->reqs_cnd);
|
||||
return reply;
|
||||
}
|
||||
|
||||
static void *
|
||||
async_nif_worker_fn(void *arg)
|
||||
{
|
||||
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
|
||||
unsigned int worker_id = we->worker_id;
|
||||
struct async_nif_state *async_nif = we->async_nif;
|
||||
struct async_nif_work_queue *q = we->q;
|
||||
struct async_nif_req_entry *req = NULL;
|
||||
|
||||
for(;;) {
|
||||
/* Examine the request queue, are there things to be done? */
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
check_again_for_work:
|
||||
if (async_nif->shutdown) {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
break;
|
||||
}
|
||||
if (fifo_q_empty(reqs, q->reqs)) {
|
||||
/* Queue is empty, wait for work */
|
||||
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
|
||||
goto check_again_for_work;
|
||||
} else {
|
||||
assert(fifo_q_size(reqs, q->reqs) > 0);
|
||||
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
|
||||
/* At this point the next req is ours to process and we hold the
|
||||
reqs_mutex lock. Take the request off the queue. */
|
||||
req = fifo_q_get(reqs, q->reqs);
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
|
||||
/* Ensure that there is at least one other worker thread watching this
|
||||
queue. */
|
||||
enif_cond_signal(q->reqs_cnd);
|
||||
|
||||
/* Perform the work. */
|
||||
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
|
||||
|
||||
/* Now call the post-work cleanup function. */
|
||||
req->fn_post(req->args);
|
||||
|
||||
/* Free resources allocated for this async request. */
|
||||
enif_free_env(req->env);
|
||||
enif_free(req->args);
|
||||
enif_free(req);
|
||||
req = NULL;
|
||||
}
|
||||
}
|
||||
enif_thread_exit(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||
{
|
||||
unsigned int i;
|
||||
unsigned int num_queues = async_nif->num_queues;
|
||||
struct async_nif_work_queue *q = NULL;
|
||||
|
||||
/* Signal the worker threads, stop what you're doing and exit. To
|
||||
ensure that we don't race with the enqueue() process we first
|
||||
lock all the worker queues, then set shutdown to true, then
|
||||
unlock. The enqueue function will take the queue mutex, then
|
||||
test for shutdown condition, then enqueue only if not shutting
|
||||
down. */
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
}
|
||||
async_nif->shutdown = 1;
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
enif_cond_broadcast(q->reqs_cnd);
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
}
|
||||
|
||||
/* Join for the now exiting worker threads. */
|
||||
for (i = 0; i < async_nif->num_workers; ++i) {
|
||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
|
||||
}
|
||||
|
||||
/* Cleanup requests, mutexes and conditions in each work queue. */
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
|
||||
/* Worker threads are stopped, now toss anything left in the queue. */
|
||||
struct async_nif_req_entry *req = NULL;
|
||||
fifo_q_foreach(reqs, q->reqs, req, {
|
||||
enif_send(NULL, &req->pid, req->env,
|
||||
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
|
||||
enif_make_atom(req->env, "shutdown")));
|
||||
req->fn_post(req->args);
|
||||
enif_free_env(req->env);
|
||||
enif_free(req->args);
|
||||
enif_free(req);
|
||||
});
|
||||
fifo_q_free(reqs, q->reqs);
|
||||
enif_mutex_destroy(q->reqs_mutex);
|
||||
enif_cond_destroy(q->reqs_cnd);
|
||||
}
|
||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||
enif_free(async_nif);
|
||||
}
|
||||
|
||||
static void *
|
||||
async_nif_load()
|
||||
{
|
||||
static int has_init = 0;
|
||||
unsigned int i, j, num_queues;
|
||||
ErlNifSysInfo info;
|
||||
struct async_nif_state *async_nif;
|
||||
|
||||
/* Don't init more than once. */
|
||||
if (has_init) return 0;
|
||||
else has_init = 1;
|
||||
|
||||
/* Find out how many schedulers there are. */
|
||||
enif_system_info(&info, sizeof(ErlNifSysInfo));
|
||||
|
||||
/* Size the number of work queues according to schedulers. */
|
||||
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
|
||||
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
|
||||
} else {
|
||||
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
|
||||
if (remainder != 0)
|
||||
num_queues = info.scheduler_threads - remainder;
|
||||
else
|
||||
num_queues = info.scheduler_threads;
|
||||
if (num_queues < 2)
|
||||
num_queues = 2;
|
||||
}
|
||||
|
||||
/* Init our portion of priv_data's module-specific state. */
|
||||
async_nif = enif_alloc(sizeof(struct async_nif_state) +
|
||||
sizeof(struct async_nif_work_queue) * num_queues);
|
||||
if (!async_nif)
|
||||
return NULL;
|
||||
memset(async_nif, 0, sizeof(struct async_nif_state) +
|
||||
sizeof(struct async_nif_work_queue) * num_queues);
|
||||
|
||||
async_nif->num_queues = num_queues;
|
||||
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
|
||||
async_nif->next_q = 0;
|
||||
async_nif->shutdown = 0;
|
||||
|
||||
for (i = 0; i < async_nif->num_queues; i++) {
|
||||
struct async_nif_work_queue *q = &async_nif->queues[i];
|
||||
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE);
|
||||
q->reqs_mutex = enif_mutex_create(NULL);
|
||||
q->reqs_cnd = enif_cond_create(NULL);
|
||||
}
|
||||
|
||||
/* Setup the thread pool management. */
|
||||
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
||||
|
||||
/* Start the worker threads. */
|
||||
for (i = 0; i < async_nif->num_workers; i++) {
|
||||
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
|
||||
we->async_nif = async_nif;
|
||||
we->worker_id = i;
|
||||
we->q = &async_nif->queues[i % async_nif->num_queues];
|
||||
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
|
||||
&async_nif_worker_fn, (void*)we, NULL) != 0) {
|
||||
async_nif->shutdown = 1;
|
||||
|
||||
for (j = 0; j < async_nif->num_queues; j++) {
|
||||
struct async_nif_work_queue *q = &async_nif->queues[j];
|
||||
enif_cond_broadcast(q->reqs_cnd);
|
||||
}
|
||||
|
||||
while(i-- > 0) {
|
||||
void *exit_value = 0; /* Ignore this. */
|
||||
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
|
||||
}
|
||||
|
||||
for (j = 0; j < async_nif->num_queues; j++) {
|
||||
struct async_nif_work_queue *q = &async_nif->queues[j];
|
||||
enif_mutex_destroy(q->reqs_mutex);
|
||||
enif_cond_destroy(q->reqs_cnd);
|
||||
}
|
||||
|
||||
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
||||
enif_free(async_nif);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
return async_nif;
|
||||
}
|
||||
|
||||
static void
|
||||
async_nif_upgrade(ErlNifEnv *env)
|
||||
{
|
||||
// TODO:
|
||||
}
|
||||
|
||||
|
||||
#if defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // __ASYNC_NIF_H__
|
|
@ -1,40 +1,185 @@
|
|||
#!/bin/bash
|
||||
|
||||
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
||||
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
||||
POSIX_SHELL="true"
|
||||
export POSIX_SHELL
|
||||
exec /usr/bin/ksh $0 $@
|
||||
fi
|
||||
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well
|
||||
|
||||
set -e
|
||||
|
||||
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
|
||||
WT_BRANCH=basho
|
||||
WT_REMOTE_REPO=http://github.com/wiredtiger/wiredtiger.git
|
||||
WT_VSN=""
|
||||
WT_DIR=wiredtiger-$WT_BRANCH
|
||||
|
||||
SNAPPY_VSN="1.0.4"
|
||||
SNAPPY_DIR=snappy-$SNAPPY_VSN
|
||||
|
||||
BZIP2_VSN="1.0.6"
|
||||
BZIP2_DIR=bzip2-$BZIP2_VSN
|
||||
|
||||
[ `basename $PWD` != "c_src" ] && cd c_src
|
||||
|
||||
BASEDIR="$PWD"
|
||||
export BASEDIR="$PWD"
|
||||
|
||||
case "$1" in
|
||||
clean)
|
||||
rm -rf system wiredtiger
|
||||
;;
|
||||
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
|
||||
MAKE=${MAKE:-make}
|
||||
|
||||
*)
|
||||
test -f system/lib/libwiredtiger.a && exit 0
|
||||
export CFLAGS="$CFLAGS -g -I $BASEDIR/system/include"
|
||||
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
|
||||
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
|
||||
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
|
||||
|
||||
if [ -d wiredtiger/.git ]; then
|
||||
(cd wiredtiger && \
|
||||
git fetch && \
|
||||
git merge origin/$WT_BRANCH)
|
||||
get_wt ()
|
||||
{
|
||||
if [ -d $BASEDIR/$WT_DIR/.git ]; then
|
||||
(cd $BASEDIR/$WT_DIR && git pull -u) || exit 1
|
||||
else
|
||||
git clone -b $WT_BRANCH --single-branch $WT_REMOTE_REPO && \
|
||||
(cd wiredtiger && \
|
||||
patch -p1 < ../wiredtiger-extension-link.patch && \
|
||||
./autogen.sh)
|
||||
if [ "X$WT_VSN" == "X" ]; then
|
||||
git clone ${WT_REPO} && \
|
||||
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
|
||||
else
|
||||
git clone -b ${WT_BRANCH} --single-branch ${WT_REPO} && \
|
||||
(cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
|
||||
fi
|
||||
(cd wiredtiger/build_posix && \
|
||||
mv wiredtiger $WT_DIR || exit 1
|
||||
fi
|
||||
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
|
||||
(cd $BASEDIR/$WT_DIR && git cherry-pick a3c8c2a13758ae9c44edabcc1a780984a7882904 || exit 1)
|
||||
(cd $BASEDIR/$WT_DIR
|
||||
[ -e $BASEDIR/wiredtiger-build.patch ] && \
|
||||
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
|
||||
./autogen.sh || exit 1
|
||||
cd ./build_posix || exit 1
|
||||
[ -e Makefile ] && $MAKE distclean
|
||||
../configure --with-pic \
|
||||
--enable-snappy \
|
||||
--enable-bzip2 \
|
||||
--prefix=$BASEDIR/system && \
|
||||
make -j && make install)
|
||||
[ -d $BASEDIR/../priv ] || mkdir $BASEDIR/../priv
|
||||
cp $BASEDIR/system/bin/wt $BASEDIR/../priv
|
||||
cp $BASEDIR/system/lib/*.so $BASEDIR/../priv
|
||||
--prefix=${BASEDIR}/system || exit 1
|
||||
)
|
||||
}
|
||||
|
||||
get_snappy ()
|
||||
{
|
||||
[ -e snappy-$SNAPPY_VSN.tar.gz ] || (echo "Missing Snappy ($SNAPPY_VSN) source package" && exit 1)
|
||||
[ -d $BASEDIR/$SNAPPY_DIR ] || tar -xzf snappy-$SNAPPY_VSN.tar.gz
|
||||
[ -e $BASEDIR/snappy-build.patch ] && \
|
||||
(cd $BASEDIR/$SNAPPY_DIR
|
||||
patch -p1 --forward < $BASEDIR/snappy-build.patch || exit 1)
|
||||
(cd $BASEDIR/$SNAPPY_DIR
|
||||
./configure --with-pic --prefix=$BASEDIR/system || exit 1)
|
||||
}
|
||||
|
||||
get_bzip2 ()
|
||||
{
|
||||
[ -e bzip2-$BZIP2_VSN.tar.gz ] || (echo "Missing bzip2 ($BZIP2_VSN) source package" && exit 1)
|
||||
[ -d $BASEDIR/$BZIP2_DIR ] || tar -xzf bzip2-$BZIP2_VSN.tar.gz
|
||||
[ -e $BASEDIR/bzip2-build.patch ] && \
|
||||
(cd $BASEDIR/$BZIP2_DIR
|
||||
patch -p1 --forward < $BASEDIR/bzip2-build.patch || exit 1)
|
||||
}
|
||||
|
||||
get_deps ()
|
||||
{
|
||||
get_wt;
|
||||
get_snappy;
|
||||
get_bzip2;
|
||||
}
|
||||
|
||||
update_deps ()
|
||||
{
|
||||
if [ -d $BASEDIR/$WT_DIR/.git ]; then
|
||||
(cd $BASEDIR/$WT_DIR
|
||||
if [ "X$WT_VSN" == "X" ]; then
|
||||
git pull -u || exit 1
|
||||
else
|
||||
git checkout $WT_VSN || exit 1
|
||||
fi
|
||||
)
|
||||
fi
|
||||
}
|
||||
|
||||
build_wt ()
|
||||
{
|
||||
(cd $BASEDIR/$WT_DIR/build_posix && \
|
||||
$MAKE -j && $MAKE install)
|
||||
}
|
||||
|
||||
build_snappy ()
|
||||
{
|
||||
(cd $BASEDIR/$SNAPPY_DIR && \
|
||||
$MAKE -j && \
|
||||
$MAKE install
|
||||
)
|
||||
}
|
||||
|
||||
build_bzip2 ()
|
||||
{
|
||||
(cd $BASEDIR/$BZIP2_DIR && \
|
||||
$MAKE -j -f Makefile-libbz2_so && \
|
||||
mkdir -p $BASEDIR/system/lib && \
|
||||
cp -f bzlib.h $BASEDIR/system/include && \
|
||||
cp -f libbz2.so.1.0.6 $BASEDIR/system/lib && \
|
||||
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2.so && \
|
||||
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2-1.so && \
|
||||
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2-1.0.so
|
||||
)
|
||||
}
|
||||
|
||||
case "$1" in
|
||||
clean)
|
||||
rm -rf system $WT_DIR $SNAPPY_DIR $BZIP2_DIR
|
||||
rm -f ${BASEDIR}/../priv/wt
|
||||
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
|
||||
rm -f ${BASEDIR}/../priv/libwiredtiger_*.so
|
||||
rm -f ${BASEDIR}/../priv/libbz2.so.*
|
||||
rm -f ${BASEDIR}/../priv/libsnappy.so.*
|
||||
;;
|
||||
|
||||
test)
|
||||
(cd $BASEDIR/$WT_DIR && $MAKE -j test)
|
||||
;;
|
||||
|
||||
update-deps)
|
||||
update-deps;
|
||||
;;
|
||||
|
||||
get-deps)
|
||||
get_deps;
|
||||
;;
|
||||
|
||||
*)
|
||||
[ -d $WT_DIR ] || get_wt;
|
||||
[ -d $SNAPPY_DIR ] || get_snappy;
|
||||
[ -d $BZIP2_DIR ] || get_bzip2;
|
||||
|
||||
# Build Snappy
|
||||
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
|
||||
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
|
||||
|
||||
# Build BZIP2
|
||||
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1)
|
||||
test -f $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
|
||||
|
||||
# Build WiredTiger
|
||||
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
|
||||
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
|
||||
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so \
|
||||
-a -f $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
|
||||
|
||||
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
|
||||
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
|
||||
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
|
||||
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so ${BASEDIR}/../priv
|
||||
cp -p -P $BASEDIR/system/lib/libwiredtiger_bzip2.so* ${BASEDIR}/../priv
|
||||
cp -p -P $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv
|
||||
(cd ${BASEDIR}/../priv
|
||||
[ -L libbz2.so ] || ln -s libbz2.so.1.0.6 libbz2.so
|
||||
[ -L libbz2.so.1 ] || ln -s libbz2.so.1.0.6 libbz2.so.1
|
||||
[ -L libbz2.so.1.0 ] || ln -s libbz2.so.1.0.6 libbz2.so.1.0)
|
||||
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
|
||||
;;
|
||||
esac
|
||||
|
|
BIN
c_src/bzip2-1.0.6.tar.gz
Normal file
BIN
c_src/bzip2-1.0.6.tar.gz
Normal file
Binary file not shown.
0
c_src/bzip2-build.patch
Normal file
0
c_src/bzip2-build.patch
Normal file
102
c_src/fifo_q.h
Normal file
102
c_src/fifo_q.h
Normal file
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* fifo_q: a macro-based implementation of a FIFO Queue
|
||||
*
|
||||
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
|
||||
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
|
||||
*
|
||||
* This file is provided to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file
|
||||
* except in compliance with the License. You may obtain
|
||||
* a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
#ifndef __FIFO_Q_H__
|
||||
#define __FIFO_Q_H__
|
||||
|
||||
#if defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define FIFO_QUEUE_TYPE(name) \
|
||||
struct fifo_q__ ## name *
|
||||
#define DECL_FIFO_QUEUE(name, type) \
|
||||
struct fifo_q__ ## name { \
|
||||
unsigned int h, t, s; \
|
||||
type *items[]; \
|
||||
}; \
|
||||
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
|
||||
int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
|
||||
struct fifo_q__ ## name *q = enif_alloc(sz); \
|
||||
if (!q) \
|
||||
return 0; \
|
||||
memset(q, 0, sz); \
|
||||
q->s = n + 1; \
|
||||
return q; \
|
||||
} \
|
||||
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
|
||||
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
|
||||
enif_free(q); \
|
||||
} \
|
||||
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
|
||||
q->items[q->h] = n; \
|
||||
q->h = (q->h + 1) % q->s; \
|
||||
return n; \
|
||||
} \
|
||||
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
|
||||
type *n = q->items[q->t]; \
|
||||
q->items[q->t] = 0; \
|
||||
q->t = (q->t + 1) % q->s; \
|
||||
return n; \
|
||||
} \
|
||||
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
|
||||
return (q->h - q->t + q->s) % q->s; \
|
||||
} \
|
||||
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
|
||||
return q->s - 1; \
|
||||
} \
|
||||
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
|
||||
return (q->t == q->h); \
|
||||
} \
|
||||
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
|
||||
return ((q->h + 1) % q->s) == q->t; \
|
||||
}
|
||||
|
||||
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
|
||||
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
|
||||
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
|
||||
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
|
||||
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
|
||||
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
|
||||
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
|
||||
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
|
||||
#define fifo_q_foreach(name, queue, item, task) do { \
|
||||
while(!fifo_q_ ## name ## _empty(queue)) { \
|
||||
item = fifo_q_ ## name ## _get(queue); \
|
||||
do task while(0); \
|
||||
} \
|
||||
} while(0);
|
||||
|
||||
struct async_nif_req_entry {
|
||||
ERL_NIF_TERM ref;
|
||||
ErlNifEnv *env;
|
||||
ErlNifPid pid;
|
||||
void *args;
|
||||
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
|
||||
void (*fn_post)(void *);
|
||||
};
|
||||
|
||||
|
||||
#if defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // __FIFO_Q_H__
|
643
c_src/khash.h
Normal file
643
c_src/khash.h
Normal file
|
@ -0,0 +1,643 @@
|
|||
/* The MIT License
|
||||
|
||||
Copyright (c) 2008, 2009, 2011 by Attractive Chaos <attractor@live.co.uk>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
"Software"), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
An example:
|
||||
|
||||
#include "khash.h"
|
||||
KHASH_MAP_INIT_INT(32, char)
|
||||
int main() {
|
||||
int ret, is_missing;
|
||||
khiter_t k;
|
||||
khash_t(32) *h = kh_init(32);
|
||||
k = kh_put(32, h, 5, &ret);
|
||||
kh_value(h, k) = 10;
|
||||
k = kh_get(32, h, 10);
|
||||
is_missing = (k == kh_end(h));
|
||||
k = kh_get(32, h, 5);
|
||||
kh_del(32, h, k);
|
||||
for (k = kh_begin(h); k != kh_end(h); ++k)
|
||||
if (kh_exist(h, k)) kh_value(h, k) = 1;
|
||||
kh_destroy(32, h);
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
2011-12-29 (0.2.7):
|
||||
|
||||
* Minor code clean up; no actual effect.
|
||||
|
||||
2011-09-16 (0.2.6):
|
||||
|
||||
* The capacity is a power of 2. This seems to dramatically improve the
|
||||
speed for simple keys. Thank Zilong Tan for the suggestion. Reference:
|
||||
|
||||
- http://code.google.com/p/ulib/
|
||||
- http://nothings.org/computer/judy/
|
||||
|
||||
* Allow to optionally use linear probing which usually has better
|
||||
performance for random input. Double hashing is still the default as it
|
||||
is more robust to certain non-random input.
|
||||
|
||||
* Added Wang's integer hash function (not used by default). This hash
|
||||
function is more robust to certain non-random input.
|
||||
|
||||
2011-02-14 (0.2.5):
|
||||
|
||||
* Allow to declare global functions.
|
||||
|
||||
2009-09-26 (0.2.4):
|
||||
|
||||
* Improve portability
|
||||
|
||||
2008-09-19 (0.2.3):
|
||||
|
||||
* Corrected the example
|
||||
* Improved interfaces
|
||||
|
||||
2008-09-11 (0.2.2):
|
||||
|
||||
* Improved speed a little in kh_put()
|
||||
|
||||
2008-09-10 (0.2.1):
|
||||
|
||||
* Added kh_clear()
|
||||
* Fixed a compiling error
|
||||
|
||||
2008-09-02 (0.2.0):
|
||||
|
||||
* Changed to token concatenation which increases flexibility.
|
||||
|
||||
2008-08-31 (0.1.2):
|
||||
|
||||
* Fixed a bug in kh_get(), which has not been tested previously.
|
||||
|
||||
2008-08-31 (0.1.1):
|
||||
|
||||
* Added destructor
|
||||
*/
|
||||
|
||||
|
||||
#ifndef __AC_KHASH_H
|
||||
#define __AC_KHASH_H
|
||||
|
||||
/*!
|
||||
@header
|
||||
|
||||
Generic hash table library.
|
||||
*/
|
||||
|
||||
#define AC_VERSION_KHASH_H "0.2.6"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <limits.h>
|
||||
|
||||
/* compiler specific configuration */
|
||||
|
||||
#if UINT_MAX == 0xffffffffu
|
||||
typedef unsigned int khint32_t;
|
||||
#elif ULONG_MAX == 0xffffffffu
|
||||
typedef unsigned long khint32_t;
|
||||
#endif
|
||||
|
||||
#if ULONG_MAX == ULLONG_MAX
|
||||
typedef unsigned long khint64_t;
|
||||
#else
|
||||
typedef unsigned long long khint64_t;
|
||||
#endif
|
||||
|
||||
#ifdef _MSC_VER
|
||||
#define kh_inline __inline
|
||||
#else
|
||||
#define kh_inline inline
|
||||
#endif
|
||||
|
||||
typedef khint32_t khint_t;
|
||||
typedef khint_t khiter_t;
|
||||
|
||||
#define __ac_isempty(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&2)
|
||||
#define __ac_isdel(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&1)
|
||||
#define __ac_iseither(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&3)
|
||||
#define __ac_set_isdel_false(flag, i) (flag[i>>4]&=~(1ul<<((i&0xfU)<<1)))
|
||||
#define __ac_set_isempty_false(flag, i) (flag[i>>4]&=~(2ul<<((i&0xfU)<<1)))
|
||||
#define __ac_set_isboth_false(flag, i) (flag[i>>4]&=~(3ul<<((i&0xfU)<<1)))
|
||||
#define __ac_set_isdel_true(flag, i) (flag[i>>4]|=1ul<<((i&0xfU)<<1))
|
||||
|
||||
#ifdef KHASH_LINEAR
|
||||
#define __ac_inc(k, m) 1
|
||||
#else
|
||||
#define __ac_inc(k, m) (((k)>>3 ^ (k)<<3) | 1) & (m)
|
||||
#endif
|
||||
|
||||
#define __ac_fsize(m) ((m) < 16? 1 : (m)>>4)
|
||||
|
||||
#ifndef kroundup32
|
||||
#define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
|
||||
#endif
|
||||
|
||||
#ifndef kcalloc
|
||||
#define kcalloc(N,Z) calloc(N,Z)
|
||||
#endif
|
||||
#ifndef kmalloc
|
||||
#define kmalloc(Z) malloc(Z)
|
||||
#endif
|
||||
#ifndef krealloc
|
||||
#define krealloc(P,Z) realloc(P,Z)
|
||||
#endif
|
||||
#ifndef kfree
|
||||
#define kfree(P) free(P)
|
||||
#endif
|
||||
|
||||
static const double __ac_HASH_UPPER = 0.77;
|
||||
|
||||
#define __KHASH_TYPE(name, khkey_t, khval_t) \
|
||||
typedef struct { \
|
||||
khint_t n_buckets, size, n_occupied, upper_bound; \
|
||||
khint32_t *flags; \
|
||||
khkey_t *keys; \
|
||||
khval_t *vals; \
|
||||
} kh_##name##_t;
|
||||
|
||||
#define __KHASH_PROTOTYPES(name, khkey_t, khval_t) \
|
||||
extern kh_##name##_t *kh_init_##name(void); \
|
||||
extern void kh_destroy_##name(kh_##name##_t *h); \
|
||||
extern void kh_clear_##name(kh_##name##_t *h); \
|
||||
extern khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key); \
|
||||
extern int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets); \
|
||||
extern khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret); \
|
||||
extern void kh_del_##name(kh_##name##_t *h, khint_t x);
|
||||
|
||||
#define __KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
|
||||
SCOPE kh_##name##_t *kh_init_##name(void) { \
|
||||
return (kh_##name##_t*)kcalloc(1, sizeof(kh_##name##_t)); \
|
||||
} \
|
||||
SCOPE void kh_destroy_##name(kh_##name##_t *h) \
|
||||
{ \
|
||||
if (h) { \
|
||||
kfree((void *)h->keys); kfree(h->flags); \
|
||||
kfree((void *)h->vals); \
|
||||
kfree(h); \
|
||||
} \
|
||||
} \
|
||||
SCOPE void kh_clear_##name(kh_##name##_t *h) \
|
||||
{ \
|
||||
if (h && h->flags) { \
|
||||
memset(h->flags, 0xaa, __ac_fsize(h->n_buckets) * sizeof(khint32_t)); \
|
||||
h->size = h->n_occupied = 0; \
|
||||
} \
|
||||
} \
|
||||
SCOPE khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key) \
|
||||
{ \
|
||||
if (h->n_buckets) { \
|
||||
khint_t inc, k, i, last, mask; \
|
||||
mask = h->n_buckets - 1; \
|
||||
k = __hash_func(key); i = k & mask; \
|
||||
inc = __ac_inc(k, mask); last = i; /* inc==1 for linear probing */ \
|
||||
while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \
|
||||
i = (i + inc) & mask; \
|
||||
if (i == last) return h->n_buckets; \
|
||||
} \
|
||||
return __ac_iseither(h->flags, i)? h->n_buckets : i; \
|
||||
} else return 0; \
|
||||
} \
|
||||
SCOPE int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets) \
|
||||
{ /* This function uses 0.25*n_buckets bytes of working space instead of [sizeof(key_t+val_t)+.25]*n_buckets. */ \
|
||||
khint32_t *new_flags = 0; \
|
||||
khint_t j = 1; \
|
||||
{ \
|
||||
kroundup32(new_n_buckets); \
|
||||
if (new_n_buckets < 4) new_n_buckets = 4; \
|
||||
if (h->size >= (khint_t)(new_n_buckets * __ac_HASH_UPPER + 0.5)) j = 0; /* requested size is too small */ \
|
||||
else { /* hash table size to be changed (shrink or expand); rehash */ \
|
||||
new_flags = (khint32_t*)kmalloc(__ac_fsize(new_n_buckets) * sizeof(khint32_t)); \
|
||||
if (!new_flags) return -1; \
|
||||
memset(new_flags, 0xaa, __ac_fsize(new_n_buckets) * sizeof(khint32_t)); \
|
||||
if (h->n_buckets < new_n_buckets) { /* expand */ \
|
||||
khkey_t *new_keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \
|
||||
if (!new_keys) return -1; \
|
||||
h->keys = new_keys; \
|
||||
if (kh_is_map) { \
|
||||
khval_t *new_vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \
|
||||
if (!new_vals) return -1; \
|
||||
h->vals = new_vals; \
|
||||
} \
|
||||
} /* otherwise shrink */ \
|
||||
} \
|
||||
} \
|
||||
if (j) { /* rehashing is needed */ \
|
||||
for (j = 0; j != h->n_buckets; ++j) { \
|
||||
if (__ac_iseither(h->flags, j) == 0) { \
|
||||
khkey_t key = h->keys[j]; \
|
||||
khval_t val; \
|
||||
khint_t new_mask; \
|
||||
new_mask = new_n_buckets - 1; \
|
||||
if (kh_is_map) val = h->vals[j]; \
|
||||
__ac_set_isdel_true(h->flags, j); \
|
||||
while (1) { /* kick-out process; sort of like in Cuckoo hashing */ \
|
||||
khint_t inc, k, i; \
|
||||
k = __hash_func(key); \
|
||||
i = k & new_mask; \
|
||||
inc = __ac_inc(k, new_mask); \
|
||||
while (!__ac_isempty(new_flags, i)) i = (i + inc) & new_mask; \
|
||||
__ac_set_isempty_false(new_flags, i); \
|
||||
if (i < h->n_buckets && __ac_iseither(h->flags, i) == 0) { /* kick out the existing element */ \
|
||||
{ khkey_t tmp = h->keys[i]; h->keys[i] = key; key = tmp; } \
|
||||
if (kh_is_map) { khval_t tmp = h->vals[i]; h->vals[i] = val; val = tmp; } \
|
||||
__ac_set_isdel_true(h->flags, i); /* mark it as deleted in the old hash table */ \
|
||||
} else { /* write the element and jump out of the loop */ \
|
||||
h->keys[i] = key; \
|
||||
if (kh_is_map) h->vals[i] = val; \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
if (h->n_buckets > new_n_buckets) { /* shrink the hash table */ \
|
||||
h->keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \
|
||||
if (kh_is_map) h->vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \
|
||||
} \
|
||||
kfree(h->flags); /* free the working space */ \
|
||||
h->flags = new_flags; \
|
||||
h->n_buckets = new_n_buckets; \
|
||||
h->n_occupied = h->size; \
|
||||
h->upper_bound = (khint_t)(h->n_buckets * __ac_HASH_UPPER + 0.5); \
|
||||
} \
|
||||
return 0; \
|
||||
} \
|
||||
SCOPE khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret) \
|
||||
{ \
|
||||
khint_t x; \
|
||||
if (h->n_occupied >= h->upper_bound) { /* update the hash table */ \
|
||||
if (h->n_buckets > (h->size<<1)) { \
|
||||
if (kh_resize_##name(h, h->n_buckets - 1) < 0) { /* clear "deleted" elements */ \
|
||||
*ret = -1; return h->n_buckets; \
|
||||
} \
|
||||
} else if (kh_resize_##name(h, h->n_buckets + 1) < 0) { /* expand the hash table */ \
|
||||
*ret = -1; return h->n_buckets; \
|
||||
} \
|
||||
} /* TODO: to implement automatically shrinking; resize() already support shrinking */ \
|
||||
{ \
|
||||
khint_t inc, k, i, site, last, mask = h->n_buckets - 1; \
|
||||
x = site = h->n_buckets; k = __hash_func(key); i = k & mask; \
|
||||
if (__ac_isempty(h->flags, i)) x = i; /* for speed up */ \
|
||||
else { \
|
||||
inc = __ac_inc(k, mask); last = i; \
|
||||
while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \
|
||||
if (__ac_isdel(h->flags, i)) site = i; \
|
||||
i = (i + inc) & mask; \
|
||||
if (i == last) { x = site; break; } \
|
||||
} \
|
||||
if (x == h->n_buckets) { \
|
||||
if (__ac_isempty(h->flags, i) && site != h->n_buckets) x = site; \
|
||||
else x = i; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
if (__ac_isempty(h->flags, x)) { /* not present at all */ \
|
||||
h->keys[x] = key; \
|
||||
__ac_set_isboth_false(h->flags, x); \
|
||||
++h->size; ++h->n_occupied; \
|
||||
*ret = 1; \
|
||||
} else if (__ac_isdel(h->flags, x)) { /* deleted */ \
|
||||
h->keys[x] = key; \
|
||||
__ac_set_isboth_false(h->flags, x); \
|
||||
++h->size; \
|
||||
*ret = 2; \
|
||||
} else *ret = 0; /* Don't touch h->keys[x] if present and not deleted */ \
|
||||
return x; \
|
||||
} \
|
||||
SCOPE void kh_del_##name(kh_##name##_t *h, khint_t x) \
|
||||
{ \
|
||||
if (x != h->n_buckets && !__ac_iseither(h->flags, x)) { \
|
||||
__ac_set_isdel_true(h->flags, x); \
|
||||
--h->size; \
|
||||
} \
|
||||
}
|
||||
|
||||
#define KHASH_DECLARE(name, khkey_t, khval_t) \
|
||||
__KHASH_TYPE(name, khkey_t, khval_t) \
|
||||
__KHASH_PROTOTYPES(name, khkey_t, khval_t)
|
||||
|
||||
#define KHASH_INIT2(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
|
||||
__KHASH_TYPE(name, khkey_t, khval_t) \
|
||||
__KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal)
|
||||
|
||||
#define KHASH_INIT(name, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
|
||||
KHASH_INIT2(name, static kh_inline, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal)
|
||||
|
||||
/* --- BEGIN OF HASH FUNCTIONS --- */
|
||||
|
||||
/*! @function
|
||||
@abstract Integer hash function
|
||||
@param key The integer [khint32_t]
|
||||
@return The hash value [khint_t]
|
||||
*/
|
||||
#define kh_int_hash_func(key) (khint32_t)(key)
|
||||
/*! @function
|
||||
@abstract Integer comparison function
|
||||
*/
|
||||
#define kh_int_hash_equal(a, b) ((a) == (b))
|
||||
/*! @function
|
||||
@abstract 64-bit integer hash function
|
||||
@param key The integer [khint64_t]
|
||||
@return The hash value [khint_t]
|
||||
*/
|
||||
#define kh_int64_hash_func(key) (khint32_t)((key)>>33^(key)^(key)<<11)
|
||||
/*! @function
|
||||
@abstract 64-bit integer comparison function
|
||||
*/
|
||||
#define kh_int64_hash_equal(a, b) ((a) == (b))
|
||||
/*! @function
|
||||
@abstract Pointer hash function
|
||||
@param key The integer void *
|
||||
@return The hash value [khint_t]
|
||||
*/
|
||||
#define kh_ptr_hash_func(key) (khint32_t)(key)
|
||||
/*! @function
|
||||
@abstract Pointer comparison function
|
||||
*/
|
||||
#define kh_ptr_hash_equal(a, b) ((a) == (b))
|
||||
/*! @function
|
||||
@abstract 64-bit pointer hash function
|
||||
@param key The integer void *
|
||||
@return The hash value [khint_t]
|
||||
*/
|
||||
#define kh_ptr64_hash_func(key) (khint32_t)(((khint64_t)key)>>33^((khint64_t)key)^((khint64_t)key)<<11)
|
||||
/*! @function
|
||||
@abstract 64-bit pointer comparison function
|
||||
*/
|
||||
#define kh_ptr64_hash_equal(a, b) ((a) == (b))
|
||||
/*! @function
|
||||
@abstract const char* hash function
|
||||
@param s Pointer to a null terminated string
|
||||
@return The hash value
|
||||
*/
|
||||
static kh_inline khint_t __ac_X31_hash_string(const char *s)
|
||||
{
|
||||
khint_t h = (khint_t)*s;
|
||||
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (khint_t)*s;
|
||||
return h;
|
||||
}
|
||||
/*! @function
|
||||
@abstract Another interface to const char* hash function
|
||||
@param key Pointer to a null terminated string [const char*]
|
||||
@return The hash value [khint_t]
|
||||
*/
|
||||
#define kh_str_hash_func(key) __ac_X31_hash_string(key)
|
||||
/*! @function
|
||||
@abstract Const char* comparison function
|
||||
*/
|
||||
#define kh_str_hash_equal(a, b) (strcmp(a, b) == 0)
|
||||
|
||||
static kh_inline khint_t __ac_Wang_hash(khint_t key)
|
||||
{
|
||||
key += ~(key << 15);
|
||||
key ^= (key >> 10);
|
||||
key += (key << 3);
|
||||
key ^= (key >> 6);
|
||||
key += ~(key << 11);
|
||||
key ^= (key >> 16);
|
||||
return key;
|
||||
}
|
||||
#define kh_int_hash_func2(k) __ac_Wang_hash((khint_t)key)
|
||||
|
||||
/* --- END OF HASH FUNCTIONS --- */
|
||||
|
||||
/* Other convenient macros... */
|
||||
|
||||
/*!
|
||||
@abstract Type of the hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
*/
|
||||
#define khash_t(name) kh_##name##_t
|
||||
|
||||
/*! @function
|
||||
@abstract Initiate a hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
@return Pointer to the hash table [khash_t(name)*]
|
||||
*/
|
||||
#define kh_init(name) kh_init_##name()
|
||||
|
||||
/*! @function
|
||||
@abstract Destroy a hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
*/
|
||||
#define kh_destroy(name, h) kh_destroy_##name(h)
|
||||
|
||||
/*! @function
|
||||
@abstract Reset a hash table without deallocating memory.
|
||||
@param name Name of the hash table [symbol]
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
*/
|
||||
#define kh_clear(name, h) kh_clear_##name(h)
|
||||
|
||||
/*! @function
|
||||
@abstract Resize a hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param s New size [khint_t]
|
||||
*/
|
||||
#define kh_resize(name, h, s) kh_resize_##name(h, s)
|
||||
|
||||
/*! @function
|
||||
@abstract Insert a key to the hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param k Key [type of keys]
|
||||
@param r Extra return code: 0 if the key is present in the hash table;
|
||||
1 if the bucket is empty (never used); 2 if the element in
|
||||
the bucket has been deleted [int*]
|
||||
@return Iterator to the inserted element [khint_t]
|
||||
*/
|
||||
#define kh_put(name, h, k, r) kh_put_##name(h, k, r)
|
||||
|
||||
/*! @function
|
||||
@abstract Retrieve a key from the hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param k Key [type of keys]
|
||||
@return Iterator to the found element, or kh_end(h) if the element is absent [khint_t]
|
||||
*/
|
||||
#define kh_get(name, h, k) kh_get_##name(h, k)
|
||||
|
||||
/*! @function
|
||||
@abstract Remove a key from the hash table.
|
||||
@param name Name of the hash table [symbol]
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param k Iterator to the element to be deleted [khint_t]
|
||||
*/
|
||||
#define kh_del(name, h, k) kh_del_##name(h, k)
|
||||
|
||||
/*! @function
|
||||
@abstract Test whether a bucket contains data.
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param x Iterator to the bucket [khint_t]
|
||||
@return 1 if containing data; 0 otherwise [int]
|
||||
*/
|
||||
#define kh_exist(h, x) (!__ac_iseither((h)->flags, (x)))
|
||||
|
||||
/*! @function
|
||||
@abstract Get key given an iterator
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param x Iterator to the bucket [khint_t]
|
||||
@return Key [type of keys]
|
||||
*/
|
||||
#define kh_key(h, x) ((h)->keys[x])
|
||||
|
||||
/*! @function
|
||||
@abstract Get value given an iterator
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param x Iterator to the bucket [khint_t]
|
||||
@return Value [type of values]
|
||||
@discussion For hash sets, calling this results in segfault.
|
||||
*/
|
||||
#define kh_val(h, x) ((h)->vals[x])
|
||||
|
||||
/*! @function
|
||||
@abstract Alias of kh_val()
|
||||
*/
|
||||
#define kh_value(h, x) ((h)->vals[x])
|
||||
|
||||
/*! @function
|
||||
@abstract Get the start iterator
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@return The start iterator [khint_t]
|
||||
*/
|
||||
#define kh_begin(h) (khint_t)(0)
|
||||
|
||||
/*! @function
|
||||
@abstract Get the end iterator
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@return The end iterator [khint_t]
|
||||
*/
|
||||
#define kh_end(h) ((h)->n_buckets)
|
||||
|
||||
/*! @function
|
||||
@abstract Get the number of elements in the hash table
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@return Number of elements in the hash table [khint_t]
|
||||
*/
|
||||
#define kh_size(h) ((h)->size)
|
||||
|
||||
/*! @function
|
||||
@abstract Get the number of buckets in the hash table
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@return Number of buckets in the hash table [khint_t]
|
||||
*/
|
||||
#define kh_n_buckets(h) ((h)->n_buckets)
|
||||
|
||||
/*! @function
|
||||
@abstract Iterate over the entries in the hash table
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param kvar Variable to which key will be assigned
|
||||
@param vvar Variable to which value will be assigned
|
||||
@param code Block of code to execute
|
||||
*/
|
||||
#define kh_foreach(h, kvar, vvar, code) { khint_t __i; \
|
||||
for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \
|
||||
if (!kh_exist(h,__i)) continue; \
|
||||
(kvar) = kh_key(h,__i); \
|
||||
(vvar) = kh_val(h,__i); \
|
||||
code; \
|
||||
} }
|
||||
|
||||
/*! @function
|
||||
@abstract Iterate over the values in the hash table
|
||||
@param h Pointer to the hash table [khash_t(name)*]
|
||||
@param vvar Variable to which value will be assigned
|
||||
@param code Block of code to execute
|
||||
*/
|
||||
#define kh_foreach_value(h, vvar, code) { khint_t __i; \
|
||||
for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \
|
||||
if (!kh_exist(h,__i)) continue; \
|
||||
(vvar) = kh_val(h,__i); \
|
||||
code; \
|
||||
} }
|
||||
|
||||
/* More conenient interfaces */
|
||||
|
||||
/*! @function
|
||||
@abstract Instantiate a hash map containing (void *) keys
|
||||
@param name Name of the hash table [symbol]
|
||||
@param khval_t Type of values [type]
|
||||
*/
|
||||
#ifdef __x86_64__
|
||||
#define KHASH_MAP_INIT_PTR(name, khval_t) \
|
||||
KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal)
|
||||
#else
|
||||
#define KHASH_MAP_INIT_PTR(name, khval_t) \
|
||||
KHASH_INIT(name, void*, khval_t, 1, kh_ptr_hash_func, kh_ptr_hash_equal)
|
||||
#endif
|
||||
|
||||
/*! @function
|
||||
@abstract Instantiate a hash set containing integer keys
|
||||
@param name Name of the hash table [symbol]
|
||||
*/
|
||||
#define KHASH_SET_INIT_INT(name) \
|
||||
KHASH_INIT(name, khint32_t, char, 0, kh_int_hash_func, kh_int_hash_equal)
|
||||
|
||||
/*! @function
|
||||
@abstract Instantiate a hash map containing integer keys
|
||||
@param name Name of the hash table [symbol]
|
||||
@param khval_t Type of values [type]
|
||||
*/
|
||||
#define KHASH_MAP_INIT_INT(name, khval_t) \
|
||||
KHASH_INIT(name, khint32_t, khval_t, 1, kh_int_hash_func, kh_int_hash_equal)
|
||||
|
||||
/*! @function
|
||||
@abstract Instantiate a hash map containing 64-bit integer keys
|
||||
@param name Name of the hash table [symbol]
|
||||
*/
|
||||
#define KHASH_SET_INIT_INT64(name) \
|
||||
KHASH_INIT(name, khint64_t, char, 0, kh_int64_hash_func, kh_int64_hash_equal)
|
||||
|
||||
/*! @function
|
||||
@abstract Instantiate a hash map containing 64-bit integer keys
|
||||
@param name Name of the hash table [symbol]
|
||||
@param khval_t Type of values [type]
|
||||
*/
|
||||
#define KHASH_MAP_INIT_INT64(name, khval_t) \
|
||||
KHASH_INIT(name, khint64_t, khval_t, 1, kh_int64_hash_func, kh_int64_hash_equal)
|
||||
|
||||
typedef const char *kh_cstr_t;
|
||||
/*! @function
|
||||
@abstract Instantiate a hash map containing const char* keys
|
||||
@param name Name of the hash table [symbol]
|
||||
*/
|
||||
#define KHASH_SET_INIT_STR(name) \
|
||||
KHASH_INIT(name, kh_cstr_t, char, 0, kh_str_hash_func, kh_str_hash_equal)
|
||||
|
||||
/*! @function
|
||||
@abstract Instantiate a hash map containing const char* keys
|
||||
@param name Name of the hash table [symbol]
|
||||
@param khval_t Type of values [type]
|
||||
*/
|
||||
#define KHASH_MAP_INIT_STR(name, khval_t) \
|
||||
KHASH_INIT(name, kh_cstr_t, khval_t, 1, kh_str_hash_func, kh_str_hash_equal)
|
||||
|
||||
#endif /* __AC_KHASH_H */
|
BIN
c_src/snappy-1.0.4.tar.gz
Normal file
BIN
c_src/snappy-1.0.4.tar.gz
Normal file
Binary file not shown.
|
@ -1,22 +1,24 @@
|
|||
diff --git a/ext/compressors/bzip2/Makefile.am b/ext/compressors/bzip2/Makefile.am
|
||||
index 0aedc2e..1cc4cf6 100644
|
||||
index 0aedc2e..a70ae2e 100644
|
||||
--- a/ext/compressors/bzip2/Makefile.am
|
||||
+++ b/ext/compressors/bzip2/Makefile.am
|
||||
@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
|
||||
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
|
||||
|
||||
lib_LTLIBRARIES = libwiredtiger_bzip2.la
|
||||
libwiredtiger_bzip2_la_SOURCES = bzip2_compress.c
|
||||
-libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module
|
||||
+libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib
|
||||
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
|
||||
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
|
||||
libwiredtiger_bzip2_la_LIBADD = -lbz2
|
||||
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
|
||||
index 6d78823..7d35777 100644
|
||||
index 6d78823..2122cf8 100644
|
||||
--- a/ext/compressors/snappy/Makefile.am
|
||||
+++ b/ext/compressors/snappy/Makefile.am
|
||||
@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
|
||||
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
|
||||
|
||||
lib_LTLIBRARIES = libwiredtiger_snappy.la
|
||||
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
|
||||
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
|
||||
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib
|
||||
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
|
||||
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
|
||||
libwiredtiger_snappy_la_LIBADD = -lsnappy
|
2445
c_src/wterl.c
2445
c_src/wterl.c
File diff suppressed because it is too large
Load diff
|
@ -37,9 +37,8 @@
|
|||
|
||||
{port_env, [
|
||||
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
|
||||
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lpriv -lwiredtiger"}
|
||||
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
|
||||
]}.
|
||||
|
||||
{pre_hooks, [{compile, "c_src/build_deps.sh"}]}.
|
||||
|
||||
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.
|
||||
{post_hooks, [{clean, "c_src/build_deps.sh clean"}]}.
|
||||
|
|
42
src/async_nif.hrl
Normal file
42
src/async_nif.hrl
Normal file
|
@ -0,0 +1,42 @@
|
|||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% async_nif: An async thread-pool layer for Erlang's NIF API
|
||||
%%
|
||||
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
|
||||
%% Author: Gregory Burd <greg@basho.com> <greg@burd.me>
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-define(ASYNC_NIF_CALL(Fun, Args),
|
||||
begin
|
||||
NIFRef = erlang:make_ref(),
|
||||
case erlang:apply(Fun, [NIFRef|Args]) of
|
||||
{ok, enqueued} ->
|
||||
receive
|
||||
{NIFRef, {error, shutdown}=Error} ->
|
||||
%% Work unit was queued, but not executed.
|
||||
Error;
|
||||
{NIFRef, {error, _Reason}=Error} ->
|
||||
%% Work unit returned an error.
|
||||
Error;
|
||||
{NIFRef, Reply} ->
|
||||
Reply
|
||||
end;
|
||||
Other ->
|
||||
Other
|
||||
end
|
||||
end).
|
|
@ -22,7 +22,6 @@
|
|||
|
||||
-module(riak_kv_wterl_backend).
|
||||
-behavior(temp_riak_kv_backend).
|
||||
-author('Steve Vinoski <steve@basho.com>').
|
||||
|
||||
%% KV Backend API
|
||||
-export([api_version/0,
|
||||
|
@ -51,13 +50,11 @@
|
|||
%%-define(CAPABILITIES, [async_fold, indexes]).
|
||||
-define(CAPABILITIES, [async_fold]).
|
||||
|
||||
-record(pass, {session :: wterl:session(),
|
||||
cursor :: wterl:cursor()}).
|
||||
-type pass() :: #pass{}.
|
||||
|
||||
-record(state, {table :: string(),
|
||||
type :: string(),
|
||||
connection :: wterl:connection(),
|
||||
passes :: [pass()]}).
|
||||
is_empty_cursor :: wterl:cursor(),
|
||||
status_cursor :: wterl:cursor()}).
|
||||
|
||||
-type state() :: #state{}.
|
||||
-type config() :: [{atom(), term()}].
|
||||
|
@ -97,33 +94,74 @@ start(Partition, Config) ->
|
|||
end,
|
||||
case AppStart of
|
||||
ok ->
|
||||
Table = "lsm:wt" ++ integer_to_list(Partition),
|
||||
{ok, Connection} = establish_connection(Config),
|
||||
Passes = establish_passes(erlang:system_info(schedulers), Connection, Table),
|
||||
{ok, #state{table=Table, connection=Connection, passes=Passes}};
|
||||
Type =
|
||||
case wterl:config_value(type, Config, "lsm") of
|
||||
{type, "lsm"} -> "lsm";
|
||||
{type, "table"} -> "table";
|
||||
{type, "btree"} -> "table";
|
||||
{type, BadType} ->
|
||||
lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]),
|
||||
"lsm";
|
||||
_ ->
|
||||
lager:info("wterl:start ignoring mistaken setting defaulting to lsm"),
|
||||
"lsm"
|
||||
end,
|
||||
{ok, Connection} = establish_connection(Config, Type),
|
||||
Table = Type ++ ":" ++ integer_to_list(Partition),
|
||||
Compressor =
|
||||
case wterl:config_value(block_compressor, Config, "snappy") of
|
||||
{block_compressor, "snappy"}=C -> [C];
|
||||
{block_compressor, "bzip2"}=C -> [C];
|
||||
{block_compressor, "none"} -> [];
|
||||
{block_compressor, none} -> [];
|
||||
{block_compressor, _} -> [{block_compressor, "snappy"}];
|
||||
_ -> [{block_compressor, "snappy"}]
|
||||
end,
|
||||
TableOpts =
|
||||
case Type of
|
||||
"lsm" ->
|
||||
[{internal_page_max, "128K"},
|
||||
{leaf_page_max, "128K"},
|
||||
{lsm_chunk_size, "25MB"},
|
||||
{prefix_compression, false},
|
||||
{lsm_bloom_newest, true},
|
||||
{lsm_bloom_oldest, true} ,
|
||||
{lsm_bloom_bit_count, 128},
|
||||
{lsm_bloom_hash_count, 64},
|
||||
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
|
||||
] ++ Compressor;
|
||||
"table" ->
|
||||
Compressor
|
||||
end,
|
||||
case wterl:create(Connection, Table, TableOpts) of
|
||||
ok ->
|
||||
case establish_utility_cursors(Connection, Table) of
|
||||
{ok, IsEmptyCursor, StatusCursor} ->
|
||||
{ok, #state{table=Table, type=Type,
|
||||
connection=Connection,
|
||||
is_empty_cursor=IsEmptyCursor,
|
||||
status_cursor=StatusCursor}};
|
||||
{error, Reason2} ->
|
||||
{error, Reason2}
|
||||
end;
|
||||
{error, Reason3} ->
|
||||
{error, Reason3}
|
||||
end
|
||||
end.
|
||||
|
||||
%% @doc Stop the wterl backend
|
||||
-spec stop(state()) -> ok.
|
||||
stop(#state{passes=Passes}) ->
|
||||
lists:foreach(fun(Elem) ->
|
||||
{Session, Cursor} = Elem,
|
||||
ok = wterl:cursor_close(Cursor),
|
||||
ok = wterl:session_close(Session)
|
||||
end, Passes),
|
||||
ok.
|
||||
stop(_State) ->
|
||||
ok. %% The connection is closed by wterl_conn:stop()
|
||||
|
||||
%% @doc Retrieve an object from the wterl backend
|
||||
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
|
||||
{ok, any(), state()} |
|
||||
{ok, not_found, state()} |
|
||||
{error, term(), state()}.
|
||||
get(Bucket, Key, #state{passes=Passes}=State) ->
|
||||
get(Bucket, Key, #state{connection=Connection, table=Table}=State) ->
|
||||
WTKey = to_object_key(Bucket, Key),
|
||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
||||
case wterl:cursor_search(Cursor, WTKey) of
|
||||
case wterl:get(Connection, Table, WTKey) of
|
||||
{ok, Value} ->
|
||||
{ok, Value, State};
|
||||
not_found ->
|
||||
|
@ -140,10 +178,8 @@ get(Bucket, Key, #state{passes=Passes}=State) ->
|
|||
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
|
||||
{ok, state()} |
|
||||
{error, term(), state()}.
|
||||
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
|
||||
WTKey = to_object_key(Bucket, PrimaryKey),
|
||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
||||
case wterl:cursor_insert(Cursor, WTKey, Val) of
|
||||
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) ->
|
||||
case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
|
@ -157,10 +193,8 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
|
|||
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
|
||||
{ok, state()} |
|
||||
{error, term(), state()}.
|
||||
delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
|
||||
WTKey = to_object_key(Bucket, Key),
|
||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
||||
case wterl:cursor_remove(Cursor, WTKey) of
|
||||
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) ->
|
||||
case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
|
@ -172,14 +206,12 @@ delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
|
|||
any(),
|
||||
[],
|
||||
state()) -> {ok, any()} | {async, fun()}.
|
||||
fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
|
||||
{ok, Connection} = wterl_conn:get(),
|
||||
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
|
||||
FoldFun = fold_buckets_fun(FoldBucketsFun),
|
||||
BucketFolder =
|
||||
fun() ->
|
||||
{ok, Session} = wterl:session_open(Connection),
|
||||
case wterl:cursor_open(Session, Table) of
|
||||
{error, "No such file or directory"} ->
|
||||
case wterl:cursor_open(Connection, Table) of
|
||||
{error, {enoent, _Message}} ->
|
||||
Acc;
|
||||
{ok, Cursor} ->
|
||||
try
|
||||
|
@ -190,8 +222,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
|
|||
{break, AccFinal} ->
|
||||
AccFinal
|
||||
after
|
||||
ok = wterl:cursor_close(Cursor),
|
||||
ok = wterl:session_close(Session)
|
||||
ok = wterl:cursor_close(Cursor)
|
||||
end
|
||||
end
|
||||
end,
|
||||
|
@ -207,7 +238,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
|
|||
any(),
|
||||
[{atom(), term()}],
|
||||
state()) -> {ok, term()} | {async, fun()}.
|
||||
fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
||||
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
|
||||
%% Figure out how we should limit the fold: by bucket, by
|
||||
%% secondary index, or neither (fold across everything.)
|
||||
Bucket = lists:keyfind(bucket, 1, Opts),
|
||||
|
@ -220,15 +251,12 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
|||
true -> undefined
|
||||
end,
|
||||
|
||||
{ok, Connection} = wterl_conn:get(),
|
||||
|
||||
%% Set up the fold...
|
||||
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
|
||||
KeyFolder =
|
||||
fun() ->
|
||||
{ok, Session} = wterl:session_open(Connection),
|
||||
case wterl:cursor_open(Session, Table) of
|
||||
{error, "No such file or directory"} ->
|
||||
case wterl:cursor_open(Connection, Table) of
|
||||
{error, {enoent, _Message}} ->
|
||||
Acc;
|
||||
{ok, Cursor} ->
|
||||
try
|
||||
|
@ -237,8 +265,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
|||
{break, AccFinal} ->
|
||||
AccFinal
|
||||
after
|
||||
ok = wterl:cursor_close(Cursor),
|
||||
ok = wterl:session_close(Session)
|
||||
ok = wterl:cursor_close(Cursor)
|
||||
end
|
||||
end
|
||||
end,
|
||||
|
@ -254,15 +281,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
|||
any(),
|
||||
[{atom(), term()}],
|
||||
state()) -> {ok, any()} | {async, fun()}.
|
||||
fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
||||
{ok, Connection} = wterl_conn:get(),
|
||||
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
|
||||
Bucket = proplists:get_value(bucket, Opts),
|
||||
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
|
||||
ObjectFolder =
|
||||
fun() ->
|
||||
{ok, Session} = wterl:session_open(Connection),
|
||||
case wterl:cursor_open(Session, Table) of
|
||||
{error, "No such file or directory"} ->
|
||||
case wterl:cursor_open(Connection, Table) of
|
||||
{error, {enoent, _Message}} ->
|
||||
Acc;
|
||||
{ok, Cursor} ->
|
||||
try
|
||||
|
@ -271,8 +296,14 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
|||
{break, AccFinal} ->
|
||||
AccFinal
|
||||
after
|
||||
ok = wterl:cursor_close(Cursor),
|
||||
ok = wterl:session_close(Session)
|
||||
case wterl:cursor_close(Cursor) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, {eperm, _}} -> %% TODO: review/fix
|
||||
ok;
|
||||
{error, _}=E ->
|
||||
E
|
||||
end
|
||||
end
|
||||
end
|
||||
end,
|
||||
|
@ -285,11 +316,12 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
|||
|
||||
%% @doc Delete all objects from this wterl backend
|
||||
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
||||
drop(#state{passes=Passes, table=Table}=State) ->
|
||||
{Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
||||
case wterl:session_truncate(Session, Table) of
|
||||
drop(#state{connection=Connection, table=Table}=State) ->
|
||||
case wterl:drop(Connection, Table) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, {ebusy, _}} -> %% TODO: review/fix
|
||||
{ok, State};
|
||||
Error ->
|
||||
{error, Error, State}
|
||||
end.
|
||||
|
@ -297,24 +329,25 @@ drop(#state{passes=Passes, table=Table}=State) ->
|
|||
%% @doc Returns true if this wterl backend contains any
|
||||
%% non-tombstone values; otherwise returns false.
|
||||
-spec is_empty(state()) -> boolean().
|
||||
is_empty(#state{passes=Passes}) ->
|
||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
||||
is_empty(#state{is_empty_cursor=Cursor}) ->
|
||||
wterl:cursor_reset(Cursor),
|
||||
try
|
||||
not_found =:= wterl:cursor_next(Cursor)
|
||||
after
|
||||
ok = wterl:cursor_close(Cursor)
|
||||
case wterl:cursor_next(Cursor) of
|
||||
not_found -> true;
|
||||
{error, {eperm, _}} -> false; % TODO: review/fix this logic
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
%% @doc Get the status information for this wterl backend
|
||||
-spec status(state()) -> [{atom(), term()}].
|
||||
status(#state{passes=Passes}) ->
|
||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
||||
try
|
||||
Stats = fetch_status(Cursor),
|
||||
[{stats, Stats}]
|
||||
after
|
||||
ok = wterl:cursor_close(Cursor)
|
||||
status(#state{status_cursor=Cursor}) ->
|
||||
wterl:cursor_reset(Cursor),
|
||||
case fetch_status(Cursor) of
|
||||
{ok, Stats} ->
|
||||
Stats;
|
||||
{error, {eperm, _}} -> % TODO: review/fix this logic
|
||||
{ok, []};
|
||||
_ ->
|
||||
{ok, []}
|
||||
end.
|
||||
|
||||
%% @doc Register an asynchronous callback
|
||||
|
@ -327,16 +360,35 @@ callback(_Ref, _Msg, State) ->
|
|||
%% Internal functions
|
||||
%% ===================================================================
|
||||
|
||||
%% @private
|
||||
max_sessions(Config) ->
|
||||
RingSize =
|
||||
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
|
||||
undefined -> 1024;
|
||||
Size -> Size
|
||||
end,
|
||||
2 * (RingSize * erlang:system_info(schedulers)).
|
||||
Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
|
||||
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
|
||||
true -> 1000000000;
|
||||
false -> Est
|
||||
end.
|
||||
|
||||
%% @private
|
||||
establish_connection(Config) ->
|
||||
establish_utility_cursors(Connection, Table) ->
|
||||
case wterl:cursor_open(Connection, Table) of
|
||||
{ok, IsEmptyCursor} ->
|
||||
case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
|
||||
{ok, StatusCursor} ->
|
||||
{ok, IsEmptyCursor, StatusCursor};
|
||||
{error, Reason1} ->
|
||||
{error, Reason1}
|
||||
end;
|
||||
{error, Reason2} ->
|
||||
{error, Reason2}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
establish_connection(Config, Type) ->
|
||||
%% Get the data root directory
|
||||
case app_helper:get_prop_or_env(data_root, Config, wterl) of
|
||||
undefined ->
|
||||
|
@ -344,8 +396,18 @@ establish_connection(Config) ->
|
|||
{error, data_root_unset};
|
||||
DataRoot ->
|
||||
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
||||
|
||||
%% WT Connection Options:
|
||||
%% NOTE: LSM auto-checkpoints, so we don't have too.
|
||||
CheckpointSetting =
|
||||
case Type =:= "lsm" of
|
||||
true ->
|
||||
[];
|
||||
false ->
|
||||
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
|
||||
end,
|
||||
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
|
||||
Opts =
|
||||
ConnectionOpts =
|
||||
orddict:from_list(
|
||||
[ wterl:config_value(create, Config, true),
|
||||
wterl:config_value(sync, Config, false),
|
||||
|
@ -354,14 +416,18 @@ establish_connection(Config) ->
|
|||
wterl:config_value(session_max, Config, max_sessions(Config)),
|
||||
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
|
||||
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
|
||||
%% NOTE: LSM auto-checkpoints, so we don't have too.
|
||||
%% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec
|
||||
wterl:config_value(verbose, Config, [
|
||||
%"ckpt" "block", "shared_cache", "evictserver", "fileops",
|
||||
%"hazard", "mutex", "read", "readserver", "reconcile",
|
||||
%"salvage", "verify", "write", "evict", "lsm"
|
||||
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
|
||||
case wterl_conn:open(DataRoot, Opts) of
|
||||
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
|
||||
|
||||
|
||||
|
||||
%% WT Session Options:
|
||||
SessionOpts = [{isolation, "snapshot"}],
|
||||
|
||||
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
|
||||
{ok, Connection} ->
|
||||
{ok, Connection};
|
||||
{error, Reason2} ->
|
||||
|
@ -370,40 +436,6 @@ establish_connection(Config) ->
|
|||
end
|
||||
end.
|
||||
|
||||
establish_passes(Count, Connection, Table)
|
||||
when is_number(Count), Count > 0 ->
|
||||
lists:map(fun(_Elem) ->
|
||||
{ok, Session} = establish_session(Connection, Table),
|
||||
{ok, Cursor} = wterl:cursor_open(Session, Table),
|
||||
{Session, Cursor}
|
||||
end, lists:seq(1, Count)).
|
||||
|
||||
%% @private
|
||||
establish_session(Connection, Table) ->
|
||||
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
|
||||
{ok, Session} ->
|
||||
SessionOpts =
|
||||
[{block_compressor, "snappy"},
|
||||
{internal_page_max, "128K"},
|
||||
{leaf_page_max, "128K"},
|
||||
{lsm_chunk_size, "25MB"},
|
||||
{lsm_bloom_newest, true},
|
||||
{lsm_bloom_oldest, true} ,
|
||||
{lsm_bloom_bit_count, 128},
|
||||
{lsm_bloom_hash_count, 64},
|
||||
{lsm_bloom_config, [{leaf_page_max, "8MB"}]} ],
|
||||
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
|
||||
ok ->
|
||||
{ok, Session};
|
||||
{error, Reason} ->
|
||||
lager:error("Failed to start wterl backend: ~p\n", [Reason]),
|
||||
{error, Reason}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% Return a function to fold over the buckets on this backend
|
||||
fold_buckets_fun(FoldBucketsFun) ->
|
||||
|
@ -512,7 +544,9 @@ from_index_key(LKey) ->
|
|||
%% @private
|
||||
%% Return all status from wterl statistics cursor
|
||||
fetch_status(Cursor) ->
|
||||
fetch_status(Cursor, wterl:cursor_next_value(Cursor), []).
|
||||
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
|
||||
fetch_status(_Cursor, {error, _}, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
fetch_status(_Cursor, not_found, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
fetch_status(Cursor, {ok, Stat}, Acc) ->
|
||||
|
@ -563,12 +597,14 @@ size_cache(RequestedSize) ->
|
|||
-ifdef(TEST).
|
||||
|
||||
simple_test_() ->
|
||||
?assertCmd("rm -rf test/wterl-backend"),
|
||||
{ok, CWD} = file:get_cwd(),
|
||||
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
|
||||
application:set_env(wterl, data_root, "test/wterl-backend"),
|
||||
temp_riak_kv_backend:standard_test(?MODULE, []).
|
||||
|
||||
custom_config_test_() ->
|
||||
?assertCmd("rm -rf test/wterl-backend"),
|
||||
{ok, CWD} = file:get_cwd(),
|
||||
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
|
||||
application:set_env(wterl, data_root, ""),
|
||||
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).
|
||||
|
||||
|
|
26
src/rmdir.erl
Normal file
26
src/rmdir.erl
Normal file
|
@ -0,0 +1,26 @@
|
|||
-module(rmdir).
|
||||
|
||||
-export([path/1]).
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
path(Dir) ->
|
||||
remove_all_files(".", [Dir]).
|
||||
|
||||
remove_all_files(Dir, Files) ->
|
||||
lists:foreach(fun(File) ->
|
||||
FilePath = filename:join([Dir, File]),
|
||||
case file:read_file_info(FilePath) of
|
||||
{ok, FileInfo} ->
|
||||
case FileInfo#file_info.type of
|
||||
directory ->
|
||||
{ok, DirFiles} = file:list_dir(FilePath),
|
||||
remove_all_files(FilePath, DirFiles),
|
||||
file:del_dir(FilePath);
|
||||
_ ->
|
||||
file:delete(FilePath)
|
||||
end;
|
||||
{error, _Reason} ->
|
||||
ok
|
||||
end
|
||||
end, Files).
|
|
@ -272,13 +272,16 @@ empty_check({Backend, State}) ->
|
|||
}.
|
||||
|
||||
setup({BackendMod, Config}) ->
|
||||
lager:start(),
|
||||
application:start(lager),
|
||||
application:start(sasl),
|
||||
application:start(os_mon),
|
||||
{ok, S} = BackendMod:start(42, Config),
|
||||
{BackendMod, S}.
|
||||
|
||||
cleanup({BackendMod, S}) ->
|
||||
ok = BackendMod:stop(S).
|
||||
ok = BackendMod:stop(S),
|
||||
application:stop(lager),
|
||||
application:stop(sasl),
|
||||
application:stop(os_mon).
|
||||
|
||||
-endif. % TEST
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, wterl,
|
||||
[
|
||||
{description, "Erlang Wrapper for WiredTiger"},
|
||||
{description, "Erlang NIF Wrapper for WiredTiger"},
|
||||
{vsn, "0.9.0"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
|
|
739
src/wterl.erl
739
src/wterl.erl
File diff suppressed because it is too large
Load diff
|
@ -30,7 +30,7 @@
|
|||
|
||||
%% API
|
||||
-export([start_link/0, stop/0,
|
||||
open/1, open/2, is_open/0, get/0, close/1]).
|
||||
open/1, open/2, open/3, is_open/0, get/0, close/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
|
@ -53,12 +53,14 @@ stop() ->
|
|||
gen_server:cast(?MODULE, stop).
|
||||
|
||||
-spec open(string()) -> {ok, wterl:connection()} | {error, term()}.
|
||||
open(Dir) ->
|
||||
open(Dir, []).
|
||||
|
||||
-spec open(string(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
|
||||
open(Dir, Config) ->
|
||||
gen_server:call(?MODULE, {open, Dir, Config, self()}, infinity).
|
||||
-spec open(string(), config_list(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
|
||||
open(Dir) ->
|
||||
open(Dir, [], []).
|
||||
open(Dir, ConnectionConfig) ->
|
||||
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, [], self()}, infinity).
|
||||
open(Dir, ConnectionConfig, SessionConfig) ->
|
||||
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, SessionConfig, self()}, infinity).
|
||||
|
||||
-spec is_open() -> boolean().
|
||||
is_open() ->
|
||||
|
@ -80,9 +82,9 @@ init([]) ->
|
|||
true = wterl_ets:table_ready(),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
|
||||
handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) ->
|
||||
{Reply, NState} =
|
||||
case wterl:connection_open(Dir, wterl:config_to_bin(Config)) of
|
||||
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of
|
||||
{ok, ConnRef}=OK ->
|
||||
Monitor = erlang:monitor(process, Caller),
|
||||
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
||||
|
@ -91,7 +93,7 @@ handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
|
|||
{Error, State}
|
||||
end,
|
||||
{reply, Reply, NState};
|
||||
handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) ->
|
||||
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
|
||||
Monitor = erlang:monitor(process, Caller),
|
||||
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
||||
{reply, {ok, ConnRef}, State};
|
||||
|
|
11
update-version.sh
Executable file
11
update-version.sh
Executable file
|
@ -0,0 +1,11 @@
|
|||
#!/bin/sh -
|
||||
|
||||
# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!!
|
||||
|
||||
wterl=`git log -n 1 --pretty=format:"%H"`
|
||||
wiredtiger0=`(cd c_src/wiredtiger && git log -n 1 --pretty=format:"%H")`
|
||||
wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`
|
||||
|
||||
echo $wterl
|
||||
echo $wiredtiger
|
||||
|
Loading…
Reference in a new issue