From e3c129b1056fb0f6d34c61abbfd631e7ba10ca64 Mon Sep 17 00:00:00 2001 From: Joseph Wayne Norton Date: Sun, 6 Nov 2011 23:42:47 +0900 Subject: [PATCH] Add support to use async thread pool for driver backend Enhance driver implementation to optionally use Erlang's asynchronous driver thread pool for all LevelDB operations with the intention to avoid blocking of Erlang's scheduler threads. --- README.md | 23 +- c_src/lets_drv.cc | 507 ++++++++++++++++++++++++++++-------- c_src/lets_drv_lib.cc | 10 + c_src/lets_drv_lib.h | 3 +- c_src/lets_nif.cc | 6 +- c_src/lets_nif_lib.cc | 10 + c_src/lets_nif_lib.h | 4 +- doc/README.md | 22 +- doc/lets.md | 26 +- doc/lets_drv.md | 22 +- doc/lets_nif.md | 22 +- doc/overview.edoc | 9 +- src/lets.erl | 27 +- src/lets.hrl | 1 + src/lets_drv.erl | 159 ++++++----- src/lets_nif.erl | 94 +++---- test/qc/qc_leveldb.erl | 36 +++ test/qc/qc_statem_lets.erl | 54 ++-- test/qc/qc_statemc_lets.erl | 19 +- 19 files changed, 737 insertions(+), 317 deletions(-) diff --git a/README.md b/README.md index a72224d..97b7153 100644 --- a/README.md +++ b/README.md @@ -498,19 +498,6 @@ Explain how to build and to run lets with valgrind enabled
  • -Performance -

    - -
  • -
  • -

    Testing

    @@ -601,6 +593,7 @@ consider adding explicit read_options and write_options for LET's + ##Modules## diff --git a/c_src/lets_drv.cc b/c_src/lets_drv.cc index 9659acb..dfada2a 100644 --- a/c_src/lets_drv.cc +++ b/c_src/lets_drv.cc @@ -36,10 +36,11 @@ #define LETS_BADARG 0x00 #define LETS_TRUE 0x01 #define LETS_END_OF_TABLE 0x02 +#define LETS_BINARY 0x03 -#define LETS_OPEN6 0x00 -#define LETS_DESTROY6 0x01 -#define LETS_REPAIR6 0x02 +#define LETS_OPEN6 0x00 // same as OPEN +#define LETS_DESTROY6 0x01 // same as DESTROY +#define LETS_REPAIR6 0x02 // same as REPAIR #define LETS_INSERT2 0x03 #define LETS_INSERT3 0x04 #define LETS_INSERT_NEW2 0x05 @@ -64,67 +65,112 @@ struct DrvAsync { ErlDrvTermData caller; int command; - leveldb::Slice skey; + // outputs + ErlDrvBinary* binary; + int reply; + + // inputs leveldb::WriteBatch batch; - leveldb::Status status; DrvAsync(DrvData* d, ErlDrvTermData c, int cmd) : - drvdata(d), caller(c), command(cmd) { + drvdata(d), caller(c), command(cmd), binary(NULL), reply(LETS_TRUE) { } - DrvAsync(DrvData* d, ErlDrvTermData c, int cmd, char* key, int keylen) : - drvdata(d), caller(c), command(cmd), skey((const char*) key, keylen) { + DrvAsync(DrvData* d, ErlDrvTermData c, int cmd, const char* key, int keylen) : + drvdata(d), caller(c), command(cmd), binary(NULL), reply(LETS_TRUE) { + binary = driver_alloc_binary(keylen); + assert(binary); + memcpy(binary->orig_bytes, key, binary->orig_size); } - void put(char* key, int keylen, char* blob, int bloblen) { - leveldb::Slice skey((const char*) key, keylen); - leveldb::Slice sblob((const char*) blob, bloblen); + ~DrvAsync() { + if (binary) { + driver_free_binary(binary); + } + } + + void put(const char* key, int keylen, const char* blob, int bloblen) { + leveldb::Slice skey(key, keylen); + leveldb::Slice sblob(blob, bloblen); batch.Put(skey, sblob); } - void del(char* key, int keylen) { - leveldb::Slice skey((const char*) key, keylen); + void del(const char* key, int keylen) { + leveldb::Slice skey(key, keylen); batch.Delete(skey); } }; +static void lets_output_create6(const char op, DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_create6(void* async_data); static void lets_output_open6(DrvData* d, char* buf, int len, int* index, int items); static void lets_output_destroy6(DrvData* d, char* buf, int len, int* index, int items); static void lets_output_repair6(DrvData* d, char* buf, int len, int* index, int items); static void lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_insert2(void* async_data); static void lets_output_insert3(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_insert3(void* async_data); // static void lets_output_insert_new2(DrvData* d, char* buf, int len, int* index, int items); +// static void lets_async_insert_new2(void* async_data); // static void lets_output_insert_new3(DrvData* d, char* buf, int len, int* index, int items); +// static void lets_async_insert_new3(void* async_data); static void lets_output_delete1(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_delete1(void* async_data); static void lets_output_delete2(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_delete2(void* async_data); // static void lets_output_delete_all_objects1(DrvData* d, char* buf, int len, int* index, int items); +// static void lets_async_delete_all_objects1(void* async_data); static void lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_lookup2(void* async_data); static void lets_output_first1(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_first1(void* async_data); static void lets_output_next2(DrvData* d, char* buf, int len, int* index, int items); +static void lets_async_next2(void* async_data); // static void lets_output_info_memory1(DrvData* d, char* buf, int len, int* index, int items); +// static void lets_async_info_memory1(void* async_data); // static void lets_output_info_size1(DrvData* d, char* buf, int len, int* index, int items); +// static void lets_async_info_size1(void* async_data); static void -driver_send_int(DrvData* d, const int i) +driver_send_int(DrvData* d, const int i, ErlDrvTermData caller=0) { - ErlDrvTermData caller = driver_caller(d->port); ErlDrvTermData spec[] = { ERL_DRV_PORT, driver_mk_port(d->port), ERL_DRV_INT, i, ERL_DRV_TUPLE, 2, }; + if (!caller) { + caller = driver_caller(d->port); + } driver_send_term(d->port, caller, spec, sizeof(spec) / sizeof(spec[0])); } static void -driver_send_binary(DrvData* d, const char *buf, const ErlDrvUInt len) +driver_send_binary(DrvData* d, ErlDrvBinary* bin, ErlDrvTermData caller=0) { - ErlDrvTermData caller = driver_caller(d->port); ErlDrvTermData spec[] = { ERL_DRV_PORT, driver_mk_port(d->port), - ERL_DRV_INT, LETS_TRUE, + ERL_DRV_INT, LETS_BINARY, + ERL_DRV_BINARY, (ErlDrvTermData) bin, bin->orig_size, 0, + ERL_DRV_TUPLE, 3, + }; + if (!caller) { + caller = driver_caller(d->port); + } + driver_send_term(d->port, caller, spec, sizeof(spec) / sizeof(spec[0])); +} + +static void +driver_send_buf(DrvData* d, const char *buf, const ErlDrvUInt len, ErlDrvTermData caller=0) +{ + ErlDrvTermData spec[] = { + ERL_DRV_PORT, driver_mk_port(d->port), + ERL_DRV_INT, LETS_BINARY, ERL_DRV_BUF2BINARY, (ErlDrvTermData) buf, len, ERL_DRV_TUPLE, 3, }; + if (!caller) { + caller = driver_caller(d->port); + } driver_send_term(d->port, caller, spec, sizeof(spec) / sizeof(spec[0])); } @@ -273,69 +319,68 @@ drv_output(ErlDrvData handle, char* buf, int len) lets_output_repair6(d, buf, len, &index, items); break; case LETS_INSERT2: - ng = (items != 1); + ng = (items != 1 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_insert2(d, buf, len, &index, items); break; case LETS_INSERT3: - ng = (items != 2); + ng = (items != 2 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_insert3(d, buf, len, &index, items); break; case LETS_INSERT_NEW2: - ng = (items != 1); + ng = (items != 1 || !d->impl.alive); if (ng) GOTOBADARG; GOTOBADARG; break; case LETS_INSERT_NEW3: - ng = (items != 2); + ng = (items != 2 || !d->impl.alive); if (ng) GOTOBADARG; GOTOBADARG; break; case LETS_DELETE1: - ng = (items != 0); + ng = (items != 0 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_delete1(d, buf, len, &index, items); break; case LETS_DELETE2: - ng = (items != 1); + ng = (items != 1 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_delete2(d, buf, len, &index, items); break; case LETS_DELETE_ALL_OBJECTS1: - ng = (items != 0); + ng = (items != 0 || !d->impl.alive); if (ng) GOTOBADARG; GOTOBADARG; break; case LETS_LOOKUP2: - ng = (items != 1); + ng = (items != 1 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_lookup2(d, buf, len, &index, items); break; case LETS_FIRST1: - ng = (items != 0); + ng = (items != 0 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_first1(d, buf, len, &index, items); break; case LETS_NEXT2: - ng = (items != 1); + ng = (items != 1 || !d->impl.alive); if (ng) GOTOBADARG; lets_output_next2(d, buf, len, &index, items); break; case LETS_INFO_MEMORY1: - ng = (items != 0); + ng = (items != 0 || !d->impl.alive); if (ng) GOTOBADARG; GOTOBADARG; break; case LETS_INFO_SIZE1: - ng = (items != 0); + ng = (items != 0 || !d->impl.alive); if (ng) GOTOBADARG; GOTOBADARG; break; default: GOTOBADARG; } - return; badarg: @@ -348,21 +393,36 @@ drv_ready_async(ErlDrvData handle, ErlDrvThreadData async_data) { DrvData* d = (DrvData*) handle; DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); - (void) d; - (void) a; + switch (a->reply) { + case LETS_BADARG: + driver_send_int(d, LETS_BADARG, a->caller); + break; + case LETS_TRUE: + driver_send_int(d, LETS_TRUE, a->caller); + break; + case LETS_END_OF_TABLE: + driver_send_int(d, LETS_END_OF_TABLE, a->caller); + break; + case LETS_BINARY: + driver_send_binary(d, a->binary, a->caller); + break; + default: + driver_send_int(d, LETS_BADARG, a->caller); + } + + delete a; } void -lets_output_do(void* async_data) +drv_async_free(void* async_data) { DrvAsync* a = (DrvAsync*) async_data; - - (void) a; + delete a; } - // // Commands // @@ -370,6 +430,7 @@ lets_output_do(void* async_data) static void lets_output_create6(const char op, DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; char type; char privacy; char *name; @@ -444,18 +505,40 @@ lets_output_create6(const char op, DrvData* d, char* buf, int len, int* index, i GOTOBADARG; } - if (!lets_create(d->impl, op)) { - GOTOBADARG; + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), op); + if (!drv_async) { + GOTOBADARG; + } + driver_async(d->port, NULL, lets_async_create6, drv_async, drv_async_free); + } else { + if (!lets_create(d->impl, op)) { + GOTOBADARG; + } + driver_send_int(d, LETS_TRUE); } - - driver_send_int(d, LETS_TRUE); return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } +static void +lets_async_create6(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + if (!lets_create(d->impl, a->command)) { + a->reply = LETS_BADARG; + } else { + a->reply = LETS_TRUE; + } +} + static void lets_output_open6(DrvData* d, char* buf, int len, int* index, int items) { @@ -477,6 +560,7 @@ lets_output_repair6(DrvData* d, char* buf, int len, int* index, int items) static void lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; int ng, arity; char *key; long keylen; @@ -493,6 +577,13 @@ lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items) return; } + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_INSERT2); + if (!drv_async) { + GOTOBADARG; + } + } + while (items) { ng = ei_decode_tuple_header(buf, index, &arity); if (ng) GOTOBADARG; @@ -503,9 +594,13 @@ lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items) ng = ei_inspect_binary(buf, index, (void**) &blob, &bloblen); if (ng) GOTOBADARG; - leveldb::Slice skey((const char*) key, keylen); - leveldb::Slice sblob((const char*) blob, bloblen); - batch.Put(skey, sblob); + if (drv_async) { + drv_async->put((const char*) key, keylen, (const char*) blob, bloblen); + } else { + leveldb::Slice skey((const char*) key, keylen); + leveldb::Slice sblob((const char*) blob, bloblen); + batch.Put(skey, sblob); + } items--; } @@ -514,26 +609,43 @@ lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items) ng = (items != 0); if (ng) GOTOBADARG; - if (!d->impl.alive) { - GOTOBADARG; - } + if (drv_async) { + driver_async(d->port, NULL, lets_async_insert2, drv_async, drv_async_free); + } else { + status = d->impl.db->Write(d->impl.db_write_options, &batch); + if (!status.ok()) { + GOTOBADARG; + } - status = d->impl.db->Write(d->impl.db_write_options, &batch); - if (!status.ok()) { - GOTOBADARG; + driver_send_int(d, LETS_TRUE); } - - driver_send_int(d, LETS_TRUE); return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } +static void +lets_async_insert2(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &(a->batch)); + if (!status.ok()) { + a->reply = LETS_BADARG; + } else { + a->reply = LETS_TRUE; + } +} + static void lets_output_insert3(DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; int ng; char *key; long keylen; @@ -547,64 +659,114 @@ lets_output_insert3(DrvData* d, char* buf, int len, int* index, int items) ng = ei_inspect_binary(buf, index, (void**) &blob, &bloblen); if (ng) GOTOBADARG; - { + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_INSERT3); + if (!drv_async) { + GOTOBADARG; + } + drv_async->put((const char*) key, keylen, (const char*) blob, bloblen); + } else { leveldb::Slice skey((const char*) key, keylen); leveldb::Slice sblob((const char*) blob, bloblen); batch.Put(skey, sblob); } - if (!d->impl.alive) { - GOTOBADARG; - } + if (drv_async) { + driver_async(d->port, NULL, lets_async_insert3, drv_async, drv_async_free); + } else { + status = d->impl.db->Write(d->impl.db_write_options, &batch); + if (!status.ok()) { + GOTOBADARG; + } - status = d->impl.db->Write(d->impl.db_write_options, &batch); - if (!status.ok()) { - GOTOBADARG; + driver_send_int(d, LETS_TRUE); } - - driver_send_int(d, LETS_TRUE); return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } +static void +lets_async_insert3(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &(a->batch)); + if (!status.ok()) { + a->reply = LETS_BADARG; + } else { + a->reply = LETS_TRUE; + } +} + static void lets_output_delete1(DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; leveldb::WriteOptions db_write_options; leveldb::WriteBatch batch; leveldb::Status status; - if (!d->impl.alive) { - GOTOBADARG; - } - // alive d->impl.alive = 0; - db_write_options.sync = true; - status = d->impl.db->Write(db_write_options, &batch); - if (!status.ok()) { - GOTOBADARG; + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_DELETE1); + if (!drv_async) { + GOTOBADARG; + } + driver_async(d->port, NULL, lets_async_delete1, drv_async, drv_async_free); + } else { + db_write_options.sync = true; + status = d->impl.db->Write(db_write_options, &batch); + if (!status.ok()) { + GOTOBADARG; + } + + // @TBD This is quite risky ... need to re-consider. + // delete d->impl.db; + // d->impl.db = NULL; + + driver_send_int(d, LETS_TRUE); } - - // @TBD This is quite risky ... need to re-consider. - // delete d->impl.db; - // d->impl.db = NULL; - - driver_send_int(d, LETS_TRUE); return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } +static void +lets_async_delete1(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::WriteOptions db_write_options; + leveldb::WriteBatch batch; + db_write_options.sync = true; + leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &batch); + if (!status.ok()) { + a->reply = LETS_BADARG; + } else { + // @TBD This is quite risky ... need to re-consider. + // delete d->impl.db; + // d->impl.db = NULL; + a->reply = LETS_TRUE; + } +} + static void lets_output_delete2(DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; int ng; char *key; long keylen; @@ -614,31 +776,53 @@ lets_output_delete2(DrvData* d, char* buf, int len, int* index, int items) ng = ei_inspect_binary(buf, index, (void**) &key, &keylen); if (ng) GOTOBADARG; - { + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_INSERT2); + if (!drv_async) { + GOTOBADARG; + } + drv_async->del((const char*) key, keylen); + } else { leveldb::Slice skey((const char*) key, keylen); batch.Delete(skey); } - if (!d->impl.alive) { - GOTOBADARG; - } + if (drv_async) { + driver_async(d->port, NULL, lets_async_delete2, drv_async, drv_async_free); + } else { + status = d->impl.db->Write(d->impl.db_write_options, &batch); + if (!status.ok()) { + GOTOBADARG; + } - status = d->impl.db->Write(d->impl.db_write_options, &batch); - if (!status.ok()) { - GOTOBADARG; + driver_send_int(d, LETS_TRUE); } - - driver_send_int(d, LETS_TRUE); return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } +static void +lets_async_delete2(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &(a->batch)); + if (!status.ok()) { + a->reply = LETS_BADARG; + } else { + a->reply = LETS_TRUE; + } +} static void lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; int ng; char *key; long keylen; @@ -646,11 +830,13 @@ lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items) ng = ei_inspect_binary(buf, index, (void**) &key, &keylen); if (ng) GOTOBADARG; - if (!d->impl.alive) { - GOTOBADARG; - } - - { + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_LOOKUP2, (const char*) key, keylen); + if (!drv_async) { + GOTOBADARG; + } + driver_async(d->port, NULL, lets_async_lookup2, drv_async, drv_async_free); + } else { leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options); if (!it) { GOTOBADARG; @@ -659,29 +845,67 @@ lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items) leveldb::Slice skey((const char*) key, keylen); it->Seek(skey); if (!it->Valid() || it->key().compare(skey) != 0) { - driver_send_int(d, LETS_TRUE); + driver_send_int(d, LETS_END_OF_TABLE); delete it; return; } - driver_send_binary(d, it->value().data(), it->value().size()); + driver_send_buf(d, it->value().data(), it->value().size()); delete it; } return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } static void -lets_output_first1(DrvData* d, char* buf, int len, int* index, int items) +lets_async_lookup2(void* async_data) { - if (!d->impl.alive) { - GOTOBADARG; + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options); + if (!it) { + a->reply = LETS_BADARG; + return; } - { + leveldb::Slice skey((const char*) a->binary->orig_bytes, a->binary->orig_size); + it->Seek(skey); + if (!it->Valid() || it->key().compare(skey) != 0) { + a->reply = LETS_END_OF_TABLE; + delete it; + return; + } + + ErlDrvBinary* binary = driver_realloc_binary(a->binary, it->value().size()); + if (binary) { + memcpy(binary->orig_bytes, it->value().data(), binary->orig_size); + a->binary = binary; + a->reply = LETS_BINARY; + } else { + a->reply = LETS_BADARG; + } + + delete it; +} + +static void +lets_output_first1(DrvData* d, char* buf, int len, int* index, int items) +{ + DrvAsync* drv_async = NULL; + + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_FIRST1); + if (!drv_async) { + GOTOBADARG; + } + driver_async(d->port, NULL, lets_async_first1, drv_async, drv_async_free); + } else { leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options); if (!it) { GOTOBADARG; @@ -694,19 +918,53 @@ lets_output_first1(DrvData* d, char* buf, int len, int* index, int items) return; } - driver_send_binary(d, it->key().data(), it->key().size()); + driver_send_buf(d, it->key().data(), it->key().size()); delete it; } return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); return; } +static void +lets_async_first1(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options); + if (!it) { + a->reply = LETS_BADARG; + return; + } + + it->SeekToFirst(); + if (!it->Valid()) { + a->reply = LETS_END_OF_TABLE; + delete it; + return; + } + + ErlDrvBinary* binary = driver_alloc_binary(it->key().size()); + if (binary) { + memcpy(binary->orig_bytes, it->key().data(), binary->orig_size); + a->binary = binary; + a->reply = LETS_BINARY; + } else { + a->reply = LETS_BADARG; + } + + delete it; +} + static void lets_output_next2(DrvData* d, char* buf, int len, int* index, int items) { + DrvAsync* drv_async = NULL; int ng; char *key; long keylen; @@ -714,11 +972,13 @@ lets_output_next2(DrvData* d, char* buf, int len, int* index, int items) ng = ei_inspect_binary(buf, index, (void**) &key, &keylen); if (ng) GOTOBADARG; - if (!d->impl.alive) { - GOTOBADARG; - } - - { + if (d->impl.async) { + drv_async = new DrvAsync(d, driver_caller(d->port), LETS_NEXT2, (const char*) key, keylen); + if (!drv_async) { + GOTOBADARG; + } + driver_async(d->port, NULL, lets_async_next2, drv_async, drv_async_free); + } else { leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options); if (!it) { GOTOBADARG; @@ -741,11 +1001,54 @@ lets_output_next2(DrvData* d, char* buf, int len, int* index, int items) } } - driver_send_binary(d, it->key().data(), it->key().size()); + driver_send_buf(d, it->key().data(), it->key().size()); delete it; } return; badarg: + if (drv_async) { delete drv_async; } driver_send_int(d, LETS_BADARG); } + +static void +lets_async_next2(void* async_data) +{ + DrvAsync* a = (DrvAsync*) async_data; + assert(a != NULL); + DrvData* d = a->drvdata; + + leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options); + if (!it) { + a->reply = LETS_BADARG; + return; + } + + leveldb::Slice skey((const char*) a->binary->orig_bytes, a->binary->orig_size); + it->Seek(skey); + if (!it->Valid()) { + a->reply = LETS_END_OF_TABLE; + delete it; + return; + } + + if (it->key().compare(skey) == 0) { + it->Next(); + if (!it->Valid()) { + a->reply = LETS_END_OF_TABLE; + delete it; + return; + } + } + + ErlDrvBinary* binary = driver_realloc_binary(a->binary, it->key().size()); + if (binary) { + memcpy(binary->orig_bytes, it->key().data(), binary->orig_size); + a->binary = binary; + a->reply = LETS_BINARY; + } else { + a->reply = LETS_BADARG; + } + + delete it; +} diff --git a/c_src/lets_drv_lib.cc b/c_src/lets_drv_lib.cc index 15895d9..dd7afe5 100644 --- a/c_src/lets_drv_lib.cc +++ b/c_src/lets_drv_lib.cc @@ -203,6 +203,16 @@ lets_parse_options(lets_impl& impl, } else { return FALSE; } + } else if (strcmp(atom, "async") == 0) { + ng = ei_decode_atom(buf, &index, atom); + if (ng) return FALSE; + if (strcmp(atom, "true") == 0) { + impl.async = true; + } else if (strcmp(atom, "false") == 0) { + impl.async = false; + } else { + return FALSE; + } } else { return FALSE; } diff --git a/c_src/lets_drv_lib.h b/c_src/lets_drv_lib.h index bff8def..eb0ad7c 100644 --- a/c_src/lets_drv_lib.h +++ b/c_src/lets_drv_lib.h @@ -60,7 +60,8 @@ extern "C" { typedef struct { - char alive; + bool async; + bool alive; char type; char privacy; std::string* name; diff --git a/c_src/lets_nif.cc b/c_src/lets_nif.cc index f3c4e65..88152e1 100644 --- a/c_src/lets_nif.cc +++ b/c_src/lets_nif.cc @@ -347,6 +347,10 @@ lets_nif_delete1(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) return MAKEBADARG(env, status); } + if (!h->impl.alive) { + return MAKEBADARG(env, status); + } + // alive h->impl.alive = 0; @@ -429,7 +433,7 @@ lets_nif_lookup2(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) it->Seek(skey); if (!it->Valid() || it->key().compare(skey) != 0) { delete it; - return lets_atom_true; + return lets_atom_end_of_table; } size_t size = it->value().size(); diff --git a/c_src/lets_nif_lib.cc b/c_src/lets_nif_lib.cc index 31adfb8..774579e 100644 --- a/c_src/lets_nif_lib.cc +++ b/c_src/lets_nif_lib.cc @@ -52,6 +52,7 @@ ERL_NIF_TERM lets_atom_block_size = 0; ERL_NIF_TERM lets_atom_compression = 0; ERL_NIF_TERM lets_atom_no = 0; ERL_NIF_TERM lets_atom_snappy = 0; +ERL_NIF_TERM lets_atom_async = 0; ERL_NIF_TERM lets_atom_block_restart_interval = 0; ERL_NIF_TERM lets_atom_verify_checksums = 0; ERL_NIF_TERM lets_atom_fill_cache = 0; @@ -78,6 +79,7 @@ lets_nif_lib_init(ErlNifEnv* env) lets_atom_compression = enif_make_atom(env, "compression"); lets_atom_no = enif_make_atom(env, "no"); lets_atom_snappy = enif_make_atom(env, "snappy"); + lets_atom_async = enif_make_atom(env, "async"); lets_atom_block_restart_interval = enif_make_atom(env, "block_restart_interval"); lets_atom_verify_checksums = enif_make_atom(env, "verify_checksums"); lets_atom_fill_cache = enif_make_atom(env, "fill_cache"); @@ -211,6 +213,14 @@ lets_parse_options(ErlNifEnv* env, lets_impl& impl, } else { return FALSE; } + } else if (enif_is_identical(tuple[0], lets_atom_async)) { + if (enif_is_identical(tuple[1], lets_atom_true)) { + impl.async = true; + } else if (enif_is_identical(tuple[1], lets_atom_false)) { + impl.async = false; + } else { + return FALSE; + } } else { return FALSE; } diff --git a/c_src/lets_nif_lib.h b/c_src/lets_nif_lib.h index 4413663..57ab0c4 100644 --- a/c_src/lets_nif_lib.h +++ b/c_src/lets_nif_lib.h @@ -60,7 +60,8 @@ extern "C" { typedef struct { - char alive; + bool async; + bool alive; char type; char privacy; std::string* name; @@ -91,6 +92,7 @@ extern "C" { extern ERL_NIF_TERM lets_atom_compression; extern ERL_NIF_TERM lets_atom_no; extern ERL_NIF_TERM lets_atom_snappy; + extern ERL_NIF_TERM lets_atom_async; extern ERL_NIF_TERM lets_atom_block_restart_interval; extern ERL_NIF_TERM lets_atom_verify_checksums; extern ERL_NIF_TERM lets_atom_fill_cache; diff --git a/doc/README.md b/doc/README.md index ec85905..c40bb24 100644 --- a/doc/README.md +++ b/doc/README.md @@ -498,19 +498,6 @@ Explain how to build and to run lets with valgrind enabled
  • -Performance -

    - -
  • -
  • -

    Testing

    diff --git a/doc/lets.md b/doc/lets.md index 4abfe57..5db689e 100644 --- a/doc/lets.md +++ b/doc/lets.md @@ -44,7 +44,7 @@ -
    ets_opt() = set | ordered_set | named_table | {key_pos, pos_integer()} | public | protected | private | compressed
    +
    ets_opt() = set | ordered_set | named_table | {key_pos, pos_integer()} | public | protected | private | compressed | async
    @@ -138,6 +138,11 @@ is empty, $end_of_table will be returned.

    .
  • +async only the drv implementation +

    +
  • +
  • +

    memory only the ets implementation

  • @@ -212,6 +217,13 @@ stored in a compressed format.
  • +async If this option is present, the emulator's async thread +pool will be used when accessing the table data. only the drv +implementation +

    +
  • +
  • +

    drv If this option is present, the table data will be stored with LevelDB backend via an Erlang Driver. This is the default setting for the table implementation. @@ -482,6 +494,11 @@ __See also:__ [ets:first/1](ets.md#first-1).

  • +async only the drv implementation +

    +
  • +
  • +

    memory only the ets implementation

  • @@ -620,6 +637,13 @@ stored in a compressed format.
  • +async If this option is present, the emulator's async thread +pool will be used when accessing the table data. only the drv +implementation +

    +
  • +
  • +

    drv If this option is present, the table data will be stored with LevelDB backend via an Erlang Driver. This is the default setting for the table implementation. diff --git a/doc/lets_drv.md b/doc/lets_drv.md index 4ccb273..d70a155 100644 --- a/doc/lets_drv.md +++ b/doc/lets_drv.md @@ -26,7 +26,7 @@ -`delete(Tab, Drv) -> any()` +`delete(Tab, Impl) -> any()` @@ -35,7 +35,7 @@ -`delete(Tab, Drv, Key) -> any()` +`delete(Tab, Impl, Key) -> any()` @@ -44,7 +44,7 @@ -`delete_all_objects(Tab, Drv) -> any()` +`delete_all_objects(Tab, Impl) -> any()` @@ -62,7 +62,7 @@ -`first(Tab, Drv) -> any()` +`first(Tab, Impl) -> any()` @@ -71,7 +71,7 @@ -`info_memory(Tab, Drv) -> any()` +`info_memory(Tab, Impl) -> any()` @@ -80,7 +80,7 @@ -`info_size(Tab, Drv) -> any()` +`info_size(Tab, Impl) -> any()` @@ -89,7 +89,7 @@ -`insert(Tab, Drv, Object) -> any()` +`insert(Tab, Impl, Object) -> any()` @@ -98,7 +98,7 @@ -`insert_new(Tab, Drv, Object) -> any()` +`insert_new(Tab, Impl, Object) -> any()` @@ -107,7 +107,7 @@ -`lookup(Tab, Drv, Key) -> any()` +`lookup(Tab, Impl, Key) -> any()` @@ -116,7 +116,7 @@ -`next(Tab, Drv, Key) -> any()` +`next(Tab, Impl, Key) -> any()` @@ -143,5 +143,5 @@ -`tab2list(Tab, Drv) -> any()` +`tab2list(Tab, Impl) -> any()` diff --git a/doc/lets_nif.md b/doc/lets_nif.md index f7b4a42..b9114c8 100644 --- a/doc/lets_nif.md +++ b/doc/lets_nif.md @@ -26,7 +26,7 @@ -`delete(Tab, Nif) -> any()` +`delete(Tab, Impl) -> any()` @@ -35,7 +35,7 @@ -`delete(Tab, Nif, Key) -> any()` +`delete(Tab, Impl, Key) -> any()` @@ -44,7 +44,7 @@ -`delete_all_objects(Tab, Nif) -> any()` +`delete_all_objects(Tab, Impl) -> any()` @@ -62,7 +62,7 @@ -`first(Tab, Nif) -> any()` +`first(Tab, Impl) -> any()` @@ -71,7 +71,7 @@ -`info_memory(Tab, Nif) -> any()` +`info_memory(Tab, Impl) -> any()` @@ -80,7 +80,7 @@ -`info_size(Tab, Nif) -> any()` +`info_size(Tab, Impl) -> any()` @@ -89,7 +89,7 @@ -`insert(Tab, Nif, Object) -> any()` +`insert(Tab, Impl, Object) -> any()` @@ -98,7 +98,7 @@ -`insert_new(Tab, Nif, Object) -> any()` +`insert_new(Tab, Impl, Object) -> any()` @@ -107,7 +107,7 @@ -`lookup(Tab, Nif, Key) -> any()` +`lookup(Tab, Impl, Key) -> any()` @@ -116,7 +116,7 @@ -`next(Tab, Nif, Key) -> any()` +`next(Tab, Impl, Key) -> any()` @@ -143,5 +143,5 @@ -`tab2list(Tab, Nif) -> any()` +`tab2list(Tab, Impl) -> any()` diff --git a/doc/overview.edoc b/doc/overview.edoc index e85f426..6672fd8 100644 --- a/doc/overview.edoc +++ b/doc/overview.edoc @@ -255,15 +255,12 @@ $ erl -smp +A 5 -pz ../../sext/ebin -pz ../../qc/ebin * Explain how to build and to run lets with valgrind enabled OTP/Erlang virtual machine -- Performance - * Update driver implementation to use Erlang\'s asynchronous driver - thread pool for all LevelDB operations. - - Testing * Functional ** Update test model to include LevelDB\'s database, read, and - write options. These options have not undergone any explicit - testing. + write options. These options have not been tested. + ** Update test model to include LevelDB\'s destroy and repair + operations. These operations have not been tested. * Performance (TBD) * Stability (TBD) diff --git a/src/lets.erl b/src/lets.erl index 63f51f8..b9c8897 100644 --- a/src/lets.erl +++ b/src/lets.erl @@ -50,7 +50,7 @@ -opaque tab() :: #tab{}. -type opts() :: [ets_opt() | impl_opt() | db_opts() | db_read_opts() | db_write_opts()]. --type ets_opt() :: set | ordered_set | named_table | {key_pos,pos_integer()} | public | protected | private | compressed. +-type ets_opt() :: set | ordered_set | named_table | {key_pos,pos_integer()} | public | protected | private | compressed | async. -type impl_opt() :: drv | nif | ets. -type db_opts() :: {db, [{path,file:filename()} | create_if_missing | {create_if_missing,boolean()} | error_if_exists | {error_if_exists,boolean()} | paranoid_checks | {paranoid_checks,boolean()} | {write_buffer_size,pos_integer()} | {max_open_files,pos_integer()} | {block_cache_size,pos_integer()} | {block_size,pos_integer()} | {block_restart_interval,pos_integer()}]}. @@ -98,6 +98,10 @@ %% - +compressed+ If this option is present, the table data will be %% stored in a compressed format. %% +%% - +async+ If this option is present, the emulator\'s async thread +%% pool will be used when accessing the table data. _only the drv +%% implementation_ +%% %% - +drv+ If this option is present, the table data will be stored %% with LevelDB backend via an Erlang Driver. This is the default %% setting for the table implementation. @@ -310,6 +314,7 @@ next(Tab, Key) -> %% - +keypos+ %% - +protection+ %% - +compressed+ +%% - +async+ _only the drv implementation_ %% - +memory+ _only the ets implementation_ %% - +size+ _only the ets implementation_ %% @@ -337,6 +342,8 @@ info(Tab, Item) -> Tab#tab.protection; compressed -> Tab#tab.compressed; + async -> + Tab#tab.async; memory -> Mod:info_memory(Tab, Impl); size -> @@ -414,6 +421,7 @@ create(Op, Name, Opts) -> end end, Compressed = proplists:get_bool(compressed, POpts), + Async = proplists:get_bool(async, POpts), Drv = proplists:get_bool(drv, POpts), Nif = proplists:get_bool(nif, POpts), Ets = proplists:get_bool(ets, POpts), @@ -424,7 +432,8 @@ create(Op, Name, Opts) -> type=Type, keypos=KeyPos, protection=Protection, - compressed=Compressed}, + compressed=Compressed, + async=Async}, DBOptions = fix_db_options(Tab, proplists:get_value(db, POpts, [])), DBReadOptions = proplists:get_value(db_read, POpts, []), @@ -440,8 +449,11 @@ create(Op, Name, Opts) -> lets_drv:Op(Tab, DBOptions, DBReadOptions, DBWriteOptions) end. -fix_db_options(#tab{name=Name, compressed=Compressed}, Options) -> - fix_db_options_compression(Compressed, fix_db_options_path(Name, Options)). +fix_db_options(#tab{name=Name, compressed=Compressed, async=Async}, Options0) -> + Options1 = fix_db_options_path(Name, Options0), + Options2 = fix_db_options_compression(Compressed, Options1), + Options3 = fix_db_options_async(Async, Options2), + Options3. fix_db_options_path(Name, Options) -> case proplists:lookup(path, Options) of @@ -456,6 +468,11 @@ fix_db_options_compression(false, Options) -> fix_db_options_compression(true, Options) -> [{compression, snappy}|Options]. +fix_db_options_async(false, Options) -> + [{async, false}|Options]; +fix_db_options_async(true, Options) -> + [{async, true}|Options]. + binify(X) when is_atom(X) -> list_to_binary(atom_to_list(X)); binify(X) when is_list(X) -> @@ -464,7 +481,7 @@ binify(X) when is_binary(X) -> X. options(Options) -> - Keys = [set, ordered_set, named_table, keypos, public, protected, private, compressed, drv, nif, ets, db, db_read, db_write], + Keys = [set, ordered_set, named_table, keypos, public, protected, private, compressed, async, drv, nif, ets, db, db_read, db_write], options(Options, Keys). options(Options, Keys) when is_list(Options) -> diff --git a/src/lets.hrl b/src/lets.hrl index 00b1479..2dd4510 100644 --- a/src/lets.hrl +++ b/src/lets.hrl @@ -32,6 +32,7 @@ keypos=1 :: pos_integer(), protection=protected :: public|protected|private, compressed=false :: boolean(), + async=false :: boolean(), drv :: port() | undefined, nif :: nif() | undefined, ets :: ets:tab() | undefined diff --git a/src/lets_drv.erl b/src/lets_drv.erl index f4bfdd7..7c4f706 100644 --- a/src/lets_drv.erl +++ b/src/lets_drv.erl @@ -49,6 +49,7 @@ -define(LETS_BADARG, 16#00). -define(LETS_TRUE, 16#01). -define(LETS_END_OF_TABLE, 16#02). +-define(LETS_BINARY, 16#03). -define(LETS_OPEN6, 16#00). -define(LETS_DESTROY6, 16#01). @@ -96,9 +97,9 @@ init() -> open(#tab{name=_Name, named_table=_Named, type=Type, protection=Protection}=Tab, Options, ReadOptions, WriteOptions) -> {value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options), - Drv = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions), - %% @TODO implement named Drv (of sorts) - Tab#tab{drv=Drv}. + Impl = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions), + %% @TODO implement named Impl (of sorts) + Tab#tab{drv=Impl}. destroy(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOptions) -> {value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options), @@ -108,81 +109,81 @@ repair(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOption {value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options), impl_repair(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions). -insert(#tab{keypos=KeyPos, type=Type}, Drv, Object) when is_tuple(Object) -> +insert(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) -> Key = element(KeyPos,Object), Val = Object, - impl_insert(Drv, encode(Type, Key), encode(Type, Val)); -insert(#tab{keypos=KeyPos, type=Type}, Drv, Objects) when is_list(Objects) -> + impl_insert(Impl, encode(Type, Key), encode(Type, Val)); +insert(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) -> List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ], - impl_insert(Drv, List). + impl_insert(Impl, List). -insert_new(#tab{keypos=KeyPos, type=Type}, Drv, Object) when is_tuple(Object) -> +insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) -> Key = element(KeyPos,Object), Val = Object, - impl_insert_new(Drv, encode(Type, Key), encode(Type, Val)); -insert_new(#tab{keypos=KeyPos, type=Type}, Drv, Objects) when is_list(Objects) -> + impl_insert_new(Impl, encode(Type, Key), encode(Type, Val)); +insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) -> List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ], - impl_insert_new(Drv, List). + impl_insert_new(Impl, List). -delete(_Tab, Drv) -> - impl_delete(Drv). +delete(_Tab, Impl) -> + impl_delete(Impl). -delete(#tab{type=Type}, Drv, Key) -> - impl_delete(Drv, encode(Type, Key)). +delete(#tab{type=Type}, Impl, Key) -> + impl_delete(Impl, encode(Type, Key)). -delete_all_objects(_Tab, Drv) -> - impl_delete_all_objects(Drv). +delete_all_objects(_Tab, Impl) -> + impl_delete_all_objects(Impl). -lookup(#tab{type=Type}, Drv, Key) -> - case impl_lookup(Drv, encode(Type, Key)) of - true -> +lookup(#tab{type=Type}, Impl, Key) -> + case impl_lookup(Impl, encode(Type, Key)) of + '$end_of_table' -> []; Object when is_binary(Object) -> [decode(Type, Object)] end. -first(#tab{type=Type}, Drv) -> - case impl_first(Drv) of +first(#tab{type=Type}, Impl) -> + case impl_first(Impl) of '$end_of_table' -> '$end_of_table'; Key -> decode(Type, Key) end. -next(#tab{type=Type}, Drv, Key) -> - case impl_next(Drv, encode(Type, Key)) of +next(#tab{type=Type}, Impl, Key) -> + case impl_next(Impl, encode(Type, Key)) of '$end_of_table' -> '$end_of_table'; Next -> decode(Type, Next) end. -info_memory(_Tab, Drv) -> - case impl_info_memory(Drv) of +info_memory(_Tab, Impl) -> + case impl_info_memory(Impl) of Memory when is_integer(Memory) -> erlang:round(Memory / erlang:system_info(wordsize)); Else -> Else end. -info_size(_Tab, Drv) -> - impl_info_size(Drv). +info_size(_Tab, Impl) -> + impl_info_size(Impl). -tab2list(Tab, Drv) -> - tab2list(Tab, Drv, impl_first(Drv), []). +tab2list(Tab, Impl) -> + tab2list(Tab, Impl, impl_first(Impl), []). -tab2list(_Tab, _Drv, '$end_of_table', Acc) -> +tab2list(_Tab, _Impl, '$end_of_table', Acc) -> lists:reverse(Acc); -tab2list(#tab{type=Type}=Tab, Drv, Key, Acc) -> +tab2list(#tab{type=Type}=Tab, Impl, Key, Acc) -> NewAcc = - case impl_lookup(Drv, Key) of - true -> + case impl_lookup(Impl, Key) of + '$end_of_table' -> %% @NOTE This is not an atomic operation Acc; Object when is_binary(Object) -> [decode(Type, Object)|Acc] end, - tab2list(Tab, Drv, impl_next(Drv, Key), NewAcc). + tab2list(Tab, Impl, impl_next(Impl, Key), NewAcc). %%%---------------------------------------------------------------------- @@ -200,79 +201,73 @@ decode(ordered_set, Term) -> sext:decode(Term). impl_open(Type, Protection, Path, Options, ReadOptions, WriteOptions) -> - Drv = init(), - true = call(Drv, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}), - Drv. + Impl = init(), + true = call(Impl, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}), + Impl. impl_destroy(Type, Protection, Path, Options, ReadOptions, WriteOptions) -> - Drv = init(), - true = call(Drv, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}), - _ = port_close(Drv), + Impl = init(), + true = call(Impl, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}), + _ = port_close(Impl), _ = erl_ddll:unload(lets_drv), true. impl_repair(Type, Protection, Path, Options, ReadOptions, WriteOptions) -> - Drv = init(), - true = call(Drv, {?LETS_REPAIR6, Type, Protection, Path, Options, ReadOptions, WriteOptions}), - _ = port_close(Drv), + Impl = init(), + true = call(Impl, {?LETS_REPAIR6, Type, Protection, Path, Options, ReadOptions, WriteOptions}), + _ = port_close(Impl), _ = erl_ddll:unload(lets_drv), true. -impl_insert(Drv, Key, Object) -> - call(Drv, {?LETS_INSERT3, Key, Object}). +impl_insert(Impl, Key, Object) -> + call(Impl, {?LETS_INSERT3, Key, Object}). -impl_insert(Drv, List) -> - call(Drv, {?LETS_INSERT2, List}). +impl_insert(Impl, List) -> + call(Impl, {?LETS_INSERT2, List}). -impl_insert_new(Drv, Key, Object) -> - call(Drv, {?LETS_INSERT_NEW3, Key, Object}). +impl_insert_new(Impl, Key, Object) -> + call(Impl, {?LETS_INSERT_NEW3, Key, Object}). -impl_insert_new(Drv, List) -> - call(Drv, {?LETS_INSERT_NEW2, List}). +impl_insert_new(Impl, List) -> + call(Impl, {?LETS_INSERT_NEW2, List}). -impl_delete(Drv) -> - Res = call(Drv, {?LETS_DELETE1}), - _ = port_close(Drv), +impl_delete(Impl) -> + Res = call(Impl, {?LETS_DELETE1}), + _ = port_close(Impl), _ = erl_ddll:unload(lets_drv), Res. -impl_delete(Drv, Key) -> - call(Drv, {?LETS_DELETE2, Key}). +impl_delete(Impl, Key) -> + call(Impl, {?LETS_DELETE2, Key}). -impl_delete_all_objects(Drv) -> - call(Drv, {?LETS_DELETE_ALL_OBJECTS1}). +impl_delete_all_objects(Impl) -> + call(Impl, {?LETS_DELETE_ALL_OBJECTS1}). -impl_lookup(Drv, Key) -> - call(Drv, {?LETS_LOOKUP2, Key}). +impl_lookup(Impl, Key) -> + call(Impl, {?LETS_LOOKUP2, Key}). -impl_first(Drv) -> - call(Drv, {?LETS_FIRST1}). +impl_first(Impl) -> + call(Impl, {?LETS_FIRST1}). -impl_next(Drv, Key) -> - call(Drv, {?LETS_NEXT2, Key}). +impl_next(Impl, Key) -> + call(Impl, {?LETS_NEXT2, Key}). -impl_info_memory(Drv) -> - call(Drv, {?LETS_INFO_MEMORY1}). +impl_info_memory(Impl) -> + call(Impl, {?LETS_INFO_MEMORY1}). -impl_info_size(Drv) -> - call(Drv, {?LETS_INFO_SIZE1}). +impl_info_size(Impl) -> + call(Impl, {?LETS_INFO_SIZE1}). -call(Drv, Tuple) -> +call(Impl, Tuple) -> Data = term_to_binary(Tuple), - port_command(Drv, Data), + port_command(Impl, Data), receive - {Drv, ?LETS_TRUE, Reply} -> + {Impl, ?LETS_BINARY, Reply} -> Reply; - {Drv, ?LETS_TRUE} -> + {Impl, ?LETS_TRUE} -> true; - {Drv, ?LETS_END_OF_TABLE} -> + {Impl, ?LETS_END_OF_TABLE} -> '$end_of_table'; - {Drv, ?LETS_BADARG} -> - erlang:error(badarg, [Drv]) - %% after 1000 -> - %% receive X -> - %% erlang:error(timeout, [Drv, X]) - %% after 0 -> - %% erlang:error(timeout, [Drv]) - %% end + {Impl, ?LETS_BADARG} -> + erlang:error(badarg, [Impl]) end. diff --git a/src/lets_nif.erl b/src/lets_nif.erl index 2fd7ddd..f8e316f 100644 --- a/src/lets_nif.erl +++ b/src/lets_nif.erl @@ -67,9 +67,9 @@ init() -> open(#tab{name=_Name, named_table=_Named, type=Type, protection=Protection}=Tab, Options, ReadOptions, WriteOptions) -> {value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options), - Nif = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions), - %% @TODO implement named Nif (of sorts) - Tab#tab{nif=Nif}. + Impl = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions), + %% @TODO implement named Impl (of sorts) + Tab#tab{nif=Impl}. destroy(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOptions) -> {value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options), @@ -79,81 +79,81 @@ repair(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOption {value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options), impl_repair(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions). -insert(#tab{keypos=KeyPos, type=Type}, Nif, Object) when is_tuple(Object) -> +insert(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) -> Key = element(KeyPos,Object), Val = Object, - impl_insert(Nif, encode(Type, Key), encode(Type, Val)); -insert(#tab{keypos=KeyPos, type=Type}, Nif, Objects) when is_list(Objects) -> + impl_insert(Impl, encode(Type, Key), encode(Type, Val)); +insert(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) -> List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ], - impl_insert(Nif, List). + impl_insert(Impl, List). -insert_new(#tab{keypos=KeyPos, type=Type}, Nif, Object) when is_tuple(Object) -> +insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) -> Key = element(KeyPos,Object), Val = Object, - impl_insert_new(Nif, encode(Type, Key), encode(Type, Val)); -insert_new(#tab{keypos=KeyPos, type=Type}, Nif, Objects) when is_list(Objects) -> + impl_insert_new(Impl, encode(Type, Key), encode(Type, Val)); +insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) -> List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ], - impl_insert_new(Nif, List). + impl_insert_new(Impl, List). -delete(_Tab, Nif) -> - impl_delete(Nif). +delete(_Tab, Impl) -> + impl_delete(Impl). -delete(#tab{type=Type}, Nif, Key) -> - impl_delete(Nif, encode(Type, Key)). +delete(#tab{type=Type}, Impl, Key) -> + impl_delete(Impl, encode(Type, Key)). -delete_all_objects(_Tab, Nif) -> - impl_delete_all_objects(Nif). +delete_all_objects(_Tab, Impl) -> + impl_delete_all_objects(Impl). -lookup(#tab{type=Type}, Nif, Key) -> - case impl_lookup(Nif, encode(Type, Key)) of - true -> +lookup(#tab{type=Type}, Impl, Key) -> + case impl_lookup(Impl, encode(Type, Key)) of + '$end_of_table' -> []; Object when is_binary(Object) -> [decode(Type, Object)] end. -first(#tab{type=Type}, Nif) -> - case impl_first(Nif) of +first(#tab{type=Type}, Impl) -> + case impl_first(Impl) of '$end_of_table' -> '$end_of_table'; Key -> decode(Type, Key) end. -next(#tab{type=Type}, Nif, Key) -> - case impl_next(Nif, encode(Type, Key)) of +next(#tab{type=Type}, Impl, Key) -> + case impl_next(Impl, encode(Type, Key)) of '$end_of_table' -> '$end_of_table'; Next -> decode(Type, Next) end. -info_memory(_Tab, Nif) -> - case impl_info_memory(Nif) of +info_memory(_Tab, Impl) -> + case impl_info_memory(Impl) of Memory when is_integer(Memory) -> erlang:round(Memory / erlang:system_info(wordsize)); Else -> Else end. -info_size(_Tab, Nif) -> - impl_info_size(Nif). +info_size(_Tab, Impl) -> + impl_info_size(Impl). -tab2list(Tab, Nif) -> - tab2list(Tab, Nif, impl_first(Nif), []). +tab2list(Tab, Impl) -> + tab2list(Tab, Impl, impl_first(Impl), []). -tab2list(_Tab, _Nif, '$end_of_table', Acc) -> +tab2list(_Tab, _Impl, '$end_of_table', Acc) -> lists:reverse(Acc); -tab2list(#tab{type=Type}=Tab, Nif, Key, Acc) -> +tab2list(#tab{type=Type}=Tab, Impl, Key, Acc) -> NewAcc = - case impl_lookup(Nif, Key) of - true -> + case impl_lookup(Impl, Key) of + '$end_of_table' -> %% @NOTE This is not an atomic operation Acc; Object when is_binary(Object) -> [decode(Type, Object)|Acc] end, - tab2list(Tab, Nif, impl_next(Nif, Key), NewAcc). + tab2list(Tab, Impl, impl_next(Impl, Key), NewAcc). %%%---------------------------------------------------------------------- @@ -182,38 +182,38 @@ impl_destroy(_Type, _Protection, _Path, _Options, _ReadOptions, _WriteOptions) - impl_repair(_Type, _Protection, _Path, _Options, _ReadOptions, _WriteOptions) -> ?NIF_STUB. -impl_insert(_Nif, _Key, _Object) -> +impl_insert(_Impl, _Key, _Object) -> ?NIF_STUB. -impl_insert(_Nif, _List) -> +impl_insert(_Impl, _List) -> ?NIF_STUB. -impl_insert_new(_Nif, _Key, _Object) -> +impl_insert_new(_Impl, _Key, _Object) -> ?NIF_STUB. -impl_insert_new(_Nif, _List) -> +impl_insert_new(_Impl, _List) -> ?NIF_STUB. -impl_delete(_Nif) -> +impl_delete(_Impl) -> ?NIF_STUB. -impl_delete(_Nif, _Key) -> +impl_delete(_Impl, _Key) -> ?NIF_STUB. -impl_delete_all_objects(_Nif) -> +impl_delete_all_objects(_Impl) -> ?NIF_STUB. -impl_lookup(_Nif, _Key) -> +impl_lookup(_Impl, _Key) -> ?NIF_STUB. -impl_first(_Nif) -> +impl_first(_Impl) -> ?NIF_STUB. -impl_next(_Nif, _Key) -> +impl_next(_Impl, _Key) -> ?NIF_STUB. -impl_info_memory(_Nif) -> +impl_info_memory(_Impl) -> ?NIF_STUB. -impl_info_size(_Nif) -> +impl_info_size(_Impl) -> ?NIF_STUB. diff --git a/test/qc/qc_leveldb.erl b/test/qc/qc_leveldb.erl index 3ae4475..dad2a90 100644 --- a/test/qc/qc_leveldb.erl +++ b/test/qc/qc_leveldb.erl @@ -30,6 +30,8 @@ %% lets , open/0, open/1 , reopen/0, reopen/1 + , destroy/0, destroy/1 + , repair/0, repair/1 , close/1 , put/2, put/3 , delete/2, delete/3 @@ -106,6 +108,40 @@ reopen(Options) -> free_ptr(ErrPtr) end. +destroy() -> + Options = leveldb:leveldb_options_create(), + try + destroy(Options) + after + leveldb:leveldb_options_destroy(Options) + end. + +destroy(Options) -> + ErrPtr = errptr(), + try + leveldb:leveldb_destroy(Options, ?MODULE_STRING, ErrPtr), + read_errptr(ErrPtr) + after + free_ptr(ErrPtr) + end. + +repair() -> + Options = leveldb:leveldb_options_create(), + try + repair(Options) + after + leveldb:leveldb_options_repair(Options) + end. + +repair(Options) -> + ErrPtr = errptr(), + try + leveldb:leveldb_repair(Options, ?MODULE_STRING, ErrPtr), + read_errptr(ErrPtr) + after + free_ptr(ErrPtr) + end. + close(Db) -> ok == leveldb:leveldb_close(Db). diff --git a/test/qc/qc_statem_lets.erl b/test/qc/qc_statem_lets.erl index 76c4424..58a7f9e 100644 --- a/test/qc/qc_statem_lets.erl +++ b/test/qc/qc_statem_lets.erl @@ -30,6 +30,9 @@ -export([initial_state/0, state_is_sane/1, next_state/3, precondition/2, postcondition/3]). -export([commands_setup/1, commands_teardown/1, commands_teardown/2]). +%% @TODO remove at time of db, db_read, db_write options testing +-compile(export_all). + %% @NOTE For boilerplate exports, see "qc_statem.hrl" -include_lib("qc/include/qc_statem.hrl"). @@ -75,16 +78,18 @@ command_gen(Mod,#state{parallel=true}=S) -> serial_command_gen(_Mod,#state{tab=undefined, type=undefined, impl=undefined}=S) -> {call,?IMPL,new,[?TAB,gen_options(new,S)]}; serial_command_gen(_Mod,#state{tab=undefined}=S) -> - {call,?IMPL,new,[undefined,?TAB,gen_options(new,S)]}; + oneof([{call,?IMPL,new,[undefined,?TAB,gen_options(new,S)]}] + %% @TODO ++ [{call,?IMPL,destroy,[undefined,?TAB,gen_options(destroy,S)]}] + %% @TODO ++ [{call,?IMPL,repair,[undefined,?TAB,gen_options(repair,S)]}] + ); serial_command_gen(_Mod,#state{tab=Tab, type=Type}=S) -> %% @TODO insert/3, insert_new/3, delete/3, delete_all_objs/2 write_gen_options %% @TODO lookup/3 read_gen_options oneof([{call,?IMPL,insert,[Tab,oneof([gen_obj(S),gen_objs(S)])]}] - ++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type == ets] - %% @TODO ++ [{call,?IMPL,delete,[Tab]}] + ++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type =:= ets] ++ [{call,?IMPL,delete,[Tab]}] ++ [{call,?IMPL,delete,[Tab,gen_key(S)]}] - ++ [{call,?IMPL,delete_all_objs,[Tab]} || Type == ets] + ++ [{call,?IMPL,delete_all_objs,[Tab]} || Type =:= ets] ++ [{call,?IMPL,lookup,[Tab,gen_key(S)]}] ++ [{call,?IMPL,first,[Tab]}] ++ [{call,?IMPL,next,[Tab,gen_key(S)]}] @@ -98,9 +103,9 @@ parallel_command_gen(_Mod,#state{tab=Tab, type=Type}=S) -> %% @TODO insert/3, insert_new/3, delete_all_objs/2 write_gen_options %% @TODO lookup/3 read_gen_options oneof([{call,?IMPL,insert,[Tab,oneof([gen_obj(S),gen_objs(S)])]}] - ++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type == ets] + ++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type =:= ets] ++ [{call,?IMPL,delete,[Tab,gen_key(S)]}] - ++ [{call,?IMPL,delete_all_objs,[Tab]} || Type == ets] + ++ [{call,?IMPL,delete_all_objs,[Tab]} || Type =:= ets] ++ [{call,?IMPL,lookup,[Tab,gen_key(S)]}] ++ [{call,?IMPL,first,[Tab]}] ++ [{call,?IMPL,next,[Tab,gen_key(S)]}] @@ -152,7 +157,7 @@ next_state(#state{tab=undefined}=S, V, {call,_,new,[_Tab,?TAB,Options]}) -> end, S#state{type=Type, impl=Impl, exists=true, tab=V}; next_state(#state{impl=Impl}=S, _V, {call,_,destroy,[_Tab,?TAB,_Options]}) - when Impl /= ets -> + when Impl =/= ets -> S#state{tab=undefined, exists=false, objs=[]}; next_state(S, _V, {call,_,insert,[_Tab,Objs]}) when is_list(Objs) -> insert_objs(S, Objs); @@ -181,12 +186,18 @@ next_state(S, _V, {call,_,_,_}) -> precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,new,[?TAB,Options]}) -> L = proplists:get_value(db, Options, []), proplists:get_bool(create_if_missing, L) andalso proplists:get_bool(error_if_exists, L); +precondition(#state{tab=Tab}, {call,_,new,[?TAB,_Options]}) -> + Tab =:= undefined; precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,new,[_Tab,?TAB,Options]}) -> L = proplists:get_value(db, Options, []), proplists:get_bool(create_if_missing, L) andalso proplists:get_bool(error_if_exists, L); -precondition(#state{tab=Tab}, {call,_,new,[?TAB,_Options]}) when Tab /= undefined -> +precondition(#state{tab=Tab}, {call,_,new,[_Tab,?TAB,_Options]}) -> + Tab =:= undefined; +precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,new,[_Tab,?TAB,_Options]}) -> false; -precondition(#state{tab=Tab}, {call,_,new,[_Tab,?TAB,_Options]}) when Tab /= undefined -> +precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,destroy,[_Tab,?TAB,_Options]}) -> + false; +precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,repair,[_Tab,?TAB,_Options]}) -> false; precondition(_S, {call,_,_,_}) -> true. @@ -252,7 +263,7 @@ postcondition(#state{type=ordered_set}=S, {call,_,next,[_Tab, Key]}, Res) -> Res =:= K end; postcondition(#state{type=set}=S, {call,_,tab2list,[_Tab]}, Res) -> - [] == (S#state.objs -- Res); + [] =:= (S#state.objs -- Res); postcondition(#state{type=ordered_set}=S, {call,_,tab2list,[_Tab]}, Res) -> sort(S) =:= Res; postcondition(_S, {call,_,_,_}, _Res) -> @@ -281,28 +292,35 @@ gen_options(Op,#state{tab=undefined, type=undefined, impl=undefined}=S) -> ?LET({Type,Impl}, {gen_ets_type(), gen_ets_impl()}, gen_options(Op,S#state{type=Type, impl=Impl})); gen_options(Op,#state{type=Type, impl=drv=Impl}=S) -> - [Type, public, named_table, {keypos,#obj.key}, Impl] + [Type, public, named_table, {keypos,#obj.key}, + {compressed, gen_boolean()}, {async, gen_boolean()}, Impl] ++ gen_leveldb_options(Op,S); gen_options(Op,#state{type=Type, impl=nif=Impl}=S) -> - [Type, public, named_table, {keypos,#obj.key}, Impl] + [Type, public, named_table, {keypos,#obj.key}, + {compressed, gen_boolean()}, {async, gen_boolean()}, Impl] ++ gen_leveldb_options(Op,S); gen_options(_Op,#state{type=Type, impl=ets=Impl}) -> - [Type, public, named_table, {keypos,#obj.key}, Impl]. + [Type, public, named_table, {keypos,#obj.key}, + {compressed, gen_boolean()}, Impl]. gen_leveldb_options(Op,S) -> [gen_db_options(Op,S), gen_db_read_options(Op,S), gen_db_write_options(Op,S)]. gen_db_options(new,#state{exists=Exists}) -> ExistsOptions = if Exists -> []; true -> [create_if_missing, error_if_exists] end, - ?LET(Options, ulist(gen_db_options()), {db, Options ++ ExistsOptions}); + %% @TODO ?LET(Options, ulist(gen_db_options()), {db, Options ++ ExistsOptions}); + {db, ExistsOptions}; gen_db_options(_Op,_S) -> - ?LET(Options, ulist(gen_db_options()), {db, Options}). + %% @TODO ?LET(Options, ulist(gen_db_options()), {db, Options}). + {db, []} . gen_db_read_options(_Op,_S) -> - ?LET(Options, ulist(gen_db_read_options()), {db_read, Options}). + %% @TODO ?LET(Options, ulist(gen_db_read_options()), {db_read, Options}). + {db_read, []}. gen_db_write_options(_Op,_S) -> - ?LET(Options, ulist(gen_db_write_options()), {db_write, Options}). + %% @TODO ?LET(Options, ulist(gen_db_write_options()), {db_write, Options}). + {db_write, []}. gen_db_options() -> oneof([paranoid_checks, {paranoid_checks,gen_boolean()}, {write_buffer_size,gen_pos_integer()}, {max_open_files,gen_pos_integer()}, {block_cache_size,gen_pos_integer()}, {block_size,gen_pos_integer()}, {block_restart_interval,gen_pos_integer()}]). @@ -412,7 +430,7 @@ keyfind(X, #state{objs=L}=S) -> lists:filter(fun(#obj{key=K}) -> eq(X, K, S) end, L). keymember(X, S) -> - [] /= keyfind(X, S). + [] =/= keyfind(X, S). eq(X, Y, #state{type=set, impl=ets}) -> X =:= Y; diff --git a/test/qc/qc_statemc_lets.erl b/test/qc/qc_statemc_lets.erl index f2b6397..a883830 100644 --- a/test/qc/qc_statemc_lets.erl +++ b/test/qc/qc_statemc_lets.erl @@ -66,7 +66,10 @@ command_gen(Mod,#state{parallel=true}=S) -> serial_command_gen(_Mod,#state{db=undefined, exists=false}) -> {call,?IMPL,open,[]}; serial_command_gen(_Mod,#state{db=undefined, exists=true}) -> - {call,?IMPL,reopen,[]}; + oneof([{call,?IMPL,reopen,[]} + %% @TODO {call,?IMPL,destroy,[]}, + %% @TODO {call,?IMPL,repair,[]} + ]); serial_command_gen(_Mod,#state{db=Db}=S) -> oneof([{call,?IMPL,close,[Db]}, {call,?IMPL,put,[Db,gen_obj(S)]}, @@ -100,6 +103,8 @@ next_state(#state{db=undefined, exists=false}=S, V, {call,_,open,[]}) -> S#state{db=V, exists=true}; next_state(#state{db=undefined, exists=true}=S, V, {call,_,reopen,[]}) -> S#state{db=V, exists=true}; +next_state(#state{db=undefined, exists=true}=S, V, {call,_,destroy,[]}) -> + S#state{db=V, exists=false, objs=[]}; next_state(#state{db=Db}=S, _V, {call,_,close,[Db]}) when Db /= undefined -> S#state{db=undefined}; next_state(S, _V, {call,_,put,[_Db,Obj]}) -> @@ -114,10 +119,18 @@ precondition(#state{exists=true}, {call,_,open,[]}) -> false; precondition(#state{exists=false}, {call,_,reopen,[]}) -> false; +precondition(#state{exists=false}, {call,_,destroy,[]}) -> + false; +precondition(#state{exists=false}, {call,_,repair,[]}) -> + false; precondition(#state{db=Db}, {call,_,open,[]}) when Db /= undefined-> false; precondition(#state{db=Db}, {call,_,reopen,[]}) when Db /= undefined-> false; +precondition(#state{db=Db}, {call,_,destroy,[]}) when Db /= undefined-> + false; +precondition(#state{db=Db}, {call,_,repair,[]}) when Db /= undefined-> + false; precondition(_S, {call,_,_,_}) -> true. @@ -126,6 +139,10 @@ postcondition(#state{exists=false}, {call,_,open,[]}, Res) -> ?IMPL:is_db(Res); postcondition(#state{exists=true}, {call,_,reopen,[]}, Res) -> ?IMPL:is_db(Res); +postcondition(#state{exists=true}, {call,_,destroy,[]}, Res) -> + Res; +postcondition(#state{exists=true}, {call,_,repair,[]}, Res) -> + Res; postcondition(#state{db=Db}, {call,_,close,[_Db]}, Res) -> Res andalso Db /= undefined; postcondition(_S, {call,_,put,[_Db,_]}, Res) ->