diff --git a/c_src/async_nif.h b/c_src/async_nif.h index b5a3ab5..30ad669 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -49,10 +49,8 @@ struct async_nif_state { volatile unsigned int req_count; volatile unsigned int shutdown; ErlNifMutex *req_mutex; - ErlNifMutex *worker_mutex; ErlNifCond *cnd; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; - LIST_HEAD(workers, async_nif_worker_entry) workers; unsigned int num_workers; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; }; @@ -111,9 +109,32 @@ struct async_nif_worker_info { return async_nif_enqueue_req(async_nif, req); \ } -#define ASYNC_NIF_LOAD() async_nif_load(); -#define ASYNC_NIF_UNLOAD(env) async_nif_unload(env); -#define ASYNC_NIF_UPGRADE(env) async_nif_unload(env); +#define ASYNC_NIF_INIT(name) \ + static ErlNifMutex *name##_async_nif_coord = NULL; + +#define ASYNC_NIF_LOAD(name, priv) do { \ + if (!name##_async_nif_coord) \ + name##_async_nif_coord = enif_mutex_create(NULL); \ + enif_mutex_lock(name##_async_nif_coord); \ + priv = async_nif_load(); \ + enif_mutex_unlock(name##_async_nif_coord); \ + } while(0); +#define ASYNC_NIF_UNLOAD(name, env) do { \ + if (!name##_async_nif_coord) \ + name##_async_nif_coord = enif_mutex_create(NULL); \ + enif_mutex_lock(name##_async_nif_coord); \ + async_nif_unload(env); \ + enif_mutex_unlock(name##_async_nif_coord); \ + enif_mutex_destroy(name##_async_nif_coord); \ + name##_async_nif_coord = NULL; \ + } while(0); +#define ASYNC_NIF_UPGRADE(name, env) do { \ + if (!name##_async_nif_coord) \ + name##_async_nif_coord = enif_mutex_create(NULL); \ + enif_mutex_lock(name##_async_nif_coord); \ + async_nif_upgrade(env); \ + enif_mutex_unlock(name##_async_nif_coord); \ + } while(0); #define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env); #define ASYNC_NIF_WORK_ENV new_env @@ -133,51 +154,41 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en enif_mutex_lock(async_nif->req_mutex); STAILQ_INSERT_TAIL(&async_nif->reqs, req, entries); async_nif->req_count++; + /* Build the term before releasing the lock so as not to race on the use of + the req pointer. */ + ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), + enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"), + enif_make_int(req->env, async_nif->req_count))); enif_mutex_unlock(async_nif->req_mutex); enif_cond_broadcast(async_nif->cnd); - - return enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"), - enif_make_int(req->env, async_nif->req_count))); \ + return reply; } static void * async_nif_worker_fn(void *arg) { struct async_nif_worker_info *wi = (struct async_nif_worker_info *)arg; - struct async_nif_worker_entry *worker = wi->worker; struct async_nif_state *async_nif = wi->async_nif; unsigned int worker_id = wi->worker_id; - free(arg); // Allocated when starting the thread, now no longer needed. + enif_free(arg); // Allocated when starting the thread, now no longer needed. - /* - * Workers are active while there is work on the queue to do and - * only in the idle list when they are waiting on new work. - */ for(;;) { struct async_nif_req_entry *req = NULL; /* Examine the request queue, are there things to be done? */ enif_mutex_lock(async_nif->req_mutex); - enif_mutex_lock(async_nif->worker_mutex); - LIST_INSERT_HEAD(&async_nif->workers, worker, entries); - enif_mutex_unlock(async_nif->worker_mutex); check_again_for_work: - if (async_nif->shutdown) { enif_mutex_unlock(async_nif->req_mutex); break; } + if (async_nif->shutdown) { + enif_mutex_unlock(async_nif->req_mutex); + break; + } if ((req = STAILQ_FIRST(&async_nif->reqs)) == NULL) { - /* Queue is empty, join the list of idle workers and wait for work */ + /* Queue is empty, wait for work */ enif_cond_wait(async_nif->cnd, async_nif->req_mutex); goto check_again_for_work; } else { /* `req` is our work request and we hold the req_mutex lock. */ - // TODO: do we need this broadcast? - enif_cond_broadcast(async_nif->cnd); - - /* Remove this thread from the list of idle threads. */ - enif_mutex_lock(async_nif->worker_mutex); - LIST_REMOVE(worker, entries); - enif_mutex_unlock(async_nif->worker_mutex); do { /* Take the request off the queue. */ @@ -192,17 +203,16 @@ async_nif_worker_fn(void *arg) enif_free_env(req->env); enif_free(req); - /* Finally, check the request queue for more work before switching - into idle mode. */ + /* Review the work queue, start more worker threads if they are needed. */ + // TODO: if queue_depth > last_depth && num_workers < MAX, start one up + + /* Continue working if more requests are in the queue, otherwise wait + for new work to arrive. */ enif_mutex_lock(async_nif->req_mutex); if ((req = STAILQ_FIRST(&async_nif->reqs)) == NULL) { enif_mutex_unlock(async_nif->req_mutex); } - /* Take a second to see if we need to adjust the number of active - worker threads up or down. */ - // TODO: if queue_depth > last_depth && num_workers < MAX, start one up - } while(req); } } @@ -210,7 +220,8 @@ async_nif_worker_fn(void *arg) return 0; } -static void async_nif_unload(ErlNifEnv *env) +static void +async_nif_unload(ErlNifEnv *env) { unsigned int i; struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); @@ -247,11 +258,12 @@ static void async_nif_unload(ErlNifEnv *env) enif_mutex_unlock(async_nif->req_mutex); bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_cond_destroy(async_nif->cnd); async_nif->cnd = NULL; - enif_mutex_destroy(async_nif->req_mutex); async_nif->req_mutex = NULL; - enif_mutex_destroy(async_nif->worker_mutex); async_nif->worker_mutex = NULL; + enif_cond_destroy(async_nif->cnd); + async_nif->cnd = NULL; + enif_mutex_destroy(async_nif->req_mutex); + async_nif->req_mutex = NULL; bzero(async_nif, sizeof(struct async_nif_state)); - free(async_nif); + enif_free(async_nif); } static void * @@ -271,22 +283,19 @@ async_nif_load(void) num_schedulers = info.scheduler_threads; /* Init our portion of priv_data's module-specific state. */ - async_nif = malloc(sizeof(struct async_nif_state)); + async_nif = enif_alloc(sizeof(struct async_nif_state)); if (!async_nif) return NULL; STAILQ_INIT(&(async_nif->reqs)); - LIST_INIT(&(async_nif->workers)); async_nif->shutdown = 0; async_nif->req_mutex = enif_mutex_create(NULL); - async_nif->worker_mutex = enif_mutex_create(NULL); async_nif->cnd = enif_cond_create(NULL); /* Setup the requests management. */ async_nif->req_count = 0; /* Setup the thread pool management. */ - enif_mutex_lock(async_nif->worker_mutex); bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); /* Start the minimum of max workers allowed or number of scheduler threads running. */ @@ -295,10 +304,11 @@ async_nif_load(void) num_worker_threads = num_schedulers; if (num_worker_threads < 1) num_worker_threads = 1; + num_worker_threads = ASYNC_NIF_MAX_WORKERS; // TODO: make this dynamic at some point for (i = 0; i < num_worker_threads; i++) { struct async_nif_worker_info *wi; - wi = malloc(sizeof(struct async_nif_worker_info)); // TODO: check + wi = enif_alloc(sizeof(struct async_nif_worker_info)); // TODO: check wi->async_nif = async_nif; wi->worker = &async_nif->worker_entries[i]; wi->worker_id = i; @@ -306,7 +316,6 @@ async_nif_load(void) &async_nif_worker_fn, (void*)wi, NULL) != 0) { async_nif->shutdown = 1; enif_cond_broadcast(async_nif->cnd); - enif_mutex_unlock(async_nif->worker_mutex); while(i-- > 0) { void *exit_value = 0; /* Ignore this. */ enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); @@ -316,16 +325,20 @@ async_nif_load(void) async_nif->cnd = NULL; enif_mutex_destroy(async_nif->req_mutex); async_nif->req_mutex = NULL; - enif_mutex_destroy(async_nif->worker_mutex); - async_nif->worker_mutex = NULL; return NULL; } } - async_nif->num_workers = num_worker_threads; - enif_mutex_unlock(async_nif->worker_mutex); + async_nif->num_workers = i; return async_nif; } +static void +async_nif_upgrade(ErlNifEnv *env) +{ + // TODO: +} + + #if defined(__cplusplus) } #endif diff --git a/c_src/wterl.c b/c_src/wterl.c index 5a45f6e..0be476f 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -46,7 +46,7 @@ void debugf(const char *fmt, ...) static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; -KHASH_INIT(cursors, char*, WT_CURSOR*, 1, kh_str_hash_func, kh_str_hash_equal); +KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); /** * We will have exactly one (1) WterlCtx for each async worker thread. As @@ -90,6 +90,8 @@ static ERL_NIF_TERM ATOM_NOT_FOUND; static ERL_NIF_TERM ATOM_FIRST; static ERL_NIF_TERM ATOM_LAST; +ASYNC_NIF_INIT(wterl); + /** * Get the per-worker reusable WT_SESSION for a worker_id. */ @@ -109,35 +111,43 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION * } ctx->session = *session; ctx->cursors = kh_init(cursors); + conn_handle->num_contexts++; enif_mutex_unlock(conn_handle->context_mutex); } return 0; } +/** + * Close all sessions and all cursors open on any objects. + * + * Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex) + */ void __close_all_sessions(WterlConnHandle *conn_handle) { int i; - for (i = 0; i < conn_handle->num_contexts; i++) { + for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { WterlCtx *ctx = &conn_handle->contexts[i]; - WT_SESSION *session = ctx->session; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr; - for (itr = kh_begin(h); itr != kh_end(h); ++itr) { - if (kh_exist(h, itr)) { - WT_CURSOR *cursor = kh_val(h, itr); - enif_free(kh_key(h, itr)); - cursor->close(cursor); + if (ctx->session) { + WT_SESSION *session = ctx->session; + khash_t(cursors) *h = ctx->cursors; + khiter_t itr; + for (itr = kh_begin(h); itr != kh_end(h); ++itr) { + if (kh_exist(h, itr)) { + // WT_CURSOR *cursor = kh_val(h, itr); + // cursor->close(cursor); + char *key = (char *)kh_key(h, itr); + enif_free(key); + } } + kh_destroy(cursors, h); + session->close(session, NULL); + bzero(&conn_handle->contexts[i], sizeof(WterlCtx)); } - kh_destroy(cursors, h); - session->close(session, NULL); - ctx->session = NULL; - ctx->cursors = NULL; } + conn_handle->num_contexts = 0; } - /** * Close cursors open on 'uri' object. * @@ -147,17 +157,18 @@ void __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // TODO: race? { int i; - for (i = 0; i < conn_handle->num_contexts; i++) { + for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { WterlCtx *ctx = &conn_handle->contexts[i]; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr = kh_get(cursors, h, (char *)uri); - if (itr != kh_end(h)) { - WT_CURSOR *cursor = kh_value(h, itr); - char *key = kh_key(h, itr); - kh_del(cursors, h, itr); - enif_free(key); - cursor->close(cursor); - debugf("closing worker_id: %d 0x%p %s", i, cursor, uri); + if (ctx->session) { + khash_t(cursors) *h = ctx->cursors; + khiter_t itr = kh_get(cursors, h, (char *)uri); + if (itr != kh_end(h)) { + WT_CURSOR *cursor = kh_value(h, itr); + char *key = (char *)kh_key(h, itr); + cursor->close(cursor); + kh_del(cursors, h, itr); + enif_free(key); + } } } } @@ -293,7 +304,7 @@ ASYNC_NIF_DECL( bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); conn_handle->context_mutex = enif_mutex_create(NULL); ERL_NIF_TERM result = enif_make_resource(env, conn_handle); - enif_release_resource(conn_handle); // When GC'ed the BEAM calls __resource_conn_dtor() + enif_release_resource(conn_handle); // Note: when GC'ed the BEAM calls __resource_conn_dtor() ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); } else @@ -648,8 +659,8 @@ ASYNC_NIF_DECL( WterlConnHandle *conn_handle; Uri uri; - int start_first; - int stop_last; + int from_first; + int to_last; ERL_NIF_TERM start; ERL_NIF_TERM stop; ERL_NIF_TERM config; @@ -663,18 +674,20 @@ ASYNC_NIF_DECL( ASYNC_NIF_RETURN_BADARG(); } if (enif_is_binary(env, argv[2])) { - args->start_first = 0; + args->from_first = 0; args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - } else if (enif_is_atom(env, argv[2]) && argv[2] == ATOM_FIRST) { - args->start_first = 1; + } else if (enif_is_atom(env, argv[2])) { // TODO && argv[2] == ATOM_FIRST) { + args->from_first = 1; + args->start = 0; } else { ASYNC_NIF_RETURN_BADARG(); } if (enif_is_binary(env, argv[3])) { - args->stop_last = 0; + args->to_last = 0; args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); - } else if (enif_is_atom(env, argv[3]) && argv[3] == ATOM_LAST) { - args->stop_last = 1; + } else if (enif_is_atom(env, argv[3])) { // TODO && argv[3] == ATOM_LAST) { + args->to_last = 1; + args->stop = 0; } else { ASYNC_NIF_RETURN_BADARG(); } @@ -708,51 +721,101 @@ ASYNC_NIF_DECL( } ErlNifBinary start_key; + ErlNifBinary stop_key; WT_CURSOR *start = NULL; - if (!args->start_first) { + WT_CURSOR *stop = NULL; + + /* The truncate method should be passed either a URI or start/stop cursors, + but not both. So we simply open cursors no matter what to avoid the + mess. */ + if (!args->from_first) { if (!enif_inspect_binary(env, args->start, &start_key)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); enif_mutex_unlock(args->conn_handle->context_mutex); return; } - rc = session->open_cursor(session, args->uri, NULL, "raw", &start); - if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - session->close(session, NULL); + } + rc = session->open_cursor(session, args->uri, NULL, "raw", &start); + if (rc != 0) { + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); + return; + } + /* Position the start cursor at the first record or the specified record. */ + if (args->from_first) { + rc = start->next(start); + if (rc != 0) { + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + start->close(start); + session->close(session, NULL); enif_mutex_unlock(args->conn_handle->context_mutex); - return; - } + return; + } + } else { WT_ITEM item_start; item_start.data = start_key.data; item_start.size = start_key.size; start->set_key(start, item_start); + rc = start->search(start); + if (rc != 0) { + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + start->close(start); + session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); + return; + } } - ErlNifBinary stop_key; - WT_CURSOR *stop = NULL; - if (!args->stop_last) { + if (!args->to_last) { if (!enif_inspect_binary(env, args->stop, &stop_key)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); enif_mutex_unlock(args->conn_handle->context_mutex); return; } - rc = session->open_cursor(session, args->uri, NULL, "raw", &stop); - if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - session->close(session, NULL); + } + rc = session->open_cursor(session, args->uri, NULL, "raw", &stop); + if (rc != 0) { + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + start->close(start); + session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); + return; + } + /* Position the stop cursor at the last record or the specified record. */ + if (args->to_last) { + rc = stop->prev(stop); + if (rc != 0) { + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + start->close(start); + stop->close(stop); + session->close(session, NULL); enif_mutex_unlock(args->conn_handle->context_mutex); - return; - } + return; + } + } else { WT_ITEM item_stop; item_stop.data = stop_key.data; item_stop.size = stop_key.size; stop->set_key(stop, item_stop); + rc = stop->search(stop); + if (rc != 0) { + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + start->close(start); + stop->close(stop); + session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); + return; + } } - rc = session->truncate(session, args->uri, start, stop, (const char*)config.data); + /* Always pass NULL for URI here because we always specify the range with the + start and stop cursors which were opened referencing that URI. */ + rc = session->truncate(session, NULL, start, stop, (const char*)config.data); + if (start) start->close(start); if (stop) stop->close(stop); - (void)session->close(session, NULL); + if (session) session->close(session, NULL); enif_mutex_unlock(args->conn_handle->context_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, @@ -1149,7 +1212,7 @@ ASYNC_NIF_DECL( cursor_handle->session = session; cursor_handle->cursor = cursor; ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); - enif_release_resource(cursor_handle); // When GC'ed the BEAM calls __resource_cursor_dtor() + enif_release_resource(cursor_handle); // Note: when GC'ed the BEAM calls __resource_cursor_dtor() ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); }, { // post @@ -1702,33 +1765,23 @@ ASYNC_NIF_DECL( /** * Called when the resource handle is about to be garbage collected. */ +#if 0 +TODO: static void __resource_conn_dtor(ErlNifEnv *env, void *obj) { - int i; WterlConnHandle *conn_handle = (WterlConnHandle *)obj; /* Free up the shared sessions and cursors. */ enif_mutex_lock(conn_handle->context_mutex); - for (i = 0; i < conn_handle->num_contexts; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - WT_SESSION *session = ctx->session; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr; - for (itr = kh_begin(h); itr != kh_end(h); ++itr) { - if (kh_exist(h, itr)) { - WT_CURSOR *cursor = kh_val(h, itr); - enif_free(kh_key(h, itr)); - cursor->close(cursor); - } - } - kh_destroy(cursors, h); - session->close(session, NULL); + __close_all_sessions(conn_handle); + if (conn_handle->session_config) { + enif_free((void *)conn_handle->session_config); + conn_handle->session_config = NULL; } enif_mutex_unlock(conn_handle->context_mutex); enif_mutex_destroy(conn_handle->context_mutex); - if (conn_handle->session_config) - enif_free((void *)conn_handle->session_config); } +#endif /** * Called as this driver is loaded by the Erlang BEAM runtime triggered by the @@ -1746,7 +1799,8 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", - __resource_conn_dtor, flags, NULL); + NULL, flags, NULL); + // TODO: __resource_conn_dtor, flags, NULL); wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", NULL, flags, NULL); @@ -1756,7 +1810,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) ATOM_FIRST = enif_make_atom(env, "first"); ATOM_LAST = enif_make_atom(env, "last"); - *priv_data = ASYNC_NIF_LOAD(); + ASYNC_NIF_LOAD(wterl, *priv_data); return *priv_data ? 0 : -1; } @@ -1770,14 +1824,14 @@ on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) static void on_unload(ErlNifEnv *env, void *priv_data) { - ASYNC_NIF_UNLOAD(env); + ASYNC_NIF_UNLOAD(wterl, env); } static int on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) { - ASYNC_NIF_UPGRADE(env); - return 0; + ASYNC_NIF_UPGRADE(wterl, env); + return 0; } static ErlNifFunc nif_funcs[] = diff --git a/src/wterl.erl b/src/wterl.erl index c166a91..aae5f7e 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -522,7 +522,7 @@ remove_all_files(Dir, Files) -> _ -> file:delete(FilePath) end; - {error, Reason} -> + {error, _Reason} -> ok end end, Files). @@ -619,32 +619,44 @@ various_online_test_() -> ?assertMatch(ok, checkpoint(ConnRef, [{target, ["table:test"]}])), ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) end}, - {"truncate", + {"truncate entire table", fun() -> ?assertMatch(ok, truncate(ConnRef, "table:test")), ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) end}, - %% {"truncate range, found", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), - %% ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) - %% end}, - %% {"truncate range, not_found", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) - %% end}, - %% {"truncate range, middle", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, <<"f">>)), - %% ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"b">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"c">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), - %% ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) - %% end}, + {"truncate range [<>..last], ensure value outside range is found after", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) + end}, + {"truncate range [first..<>], ensure value inside range is not_found after", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) + end}, + {"truncate range [first..not_found] with a key that doesn't exist", + fun() -> + ?assertMatch(not_found, truncate(ConnRef, "table:test", first, <<"z">>)) + end}, + {"truncate range [not_found..last] with a key that doesn't exist", + fun() -> + ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, last)) + end}, + {"truncate range [not_found..not_found] with keys that don't exist", + fun() -> + ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, <<"0">>)) + end}, + {"truncate range [<...<>], ensure value before & after range still exist", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, <<"f">>)), + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"b">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"c">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), + ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) + end}, {"drop table", fun() -> ?assertMatch(ok, drop(ConnRef, "table:test"))