Add support to use async thread pool for driver backend

Enhance driver implementation to optionally use Erlang's asynchronous
driver thread pool for all LevelDB operations with the intention to
avoid blocking of Erlang's scheduler threads.
This commit is contained in:
Joseph Wayne Norton 2011-11-06 23:42:47 +09:00
parent e94c7a2091
commit e3c129b105
19 changed files with 737 additions and 317 deletions

View file

@ -498,19 +498,6 @@ Explain how to build and to run lets with valgrind enabled
</li>
<li>
<p>
Performance
</p>
<ul>
<li>
<p>
Update driver implementation to use Erlang's asynchronous driver
thread pool for all LevelDB operations.
</p>
</li>
</ul>
</li>
<li>
<p>
Testing
</p>
<ul>
@ -522,8 +509,13 @@ Functional
<li>
<p>
Update test model to include LevelDB's database, read, and
write options. These options have not undergone any explicit
testing.
write options. These options have not been tested.
</p>
</li>
<li>
<p>
Update test model to include LevelDB's destroy and repair
operations. These operations have not been tested.
</p>
</li>
</ul>
@ -601,6 +593,7 @@ consider adding explicit read_options and write_options for LET's
##Modules##

View file

@ -36,10 +36,11 @@
#define LETS_BADARG 0x00
#define LETS_TRUE 0x01
#define LETS_END_OF_TABLE 0x02
#define LETS_BINARY 0x03
#define LETS_OPEN6 0x00
#define LETS_DESTROY6 0x01
#define LETS_REPAIR6 0x02
#define LETS_OPEN6 0x00 // same as OPEN
#define LETS_DESTROY6 0x01 // same as DESTROY
#define LETS_REPAIR6 0x02 // same as REPAIR
#define LETS_INSERT2 0x03
#define LETS_INSERT3 0x04
#define LETS_INSERT_NEW2 0x05
@ -64,67 +65,112 @@ struct DrvAsync {
ErlDrvTermData caller;
int command;
leveldb::Slice skey;
// outputs
ErlDrvBinary* binary;
int reply;
// inputs
leveldb::WriteBatch batch;
leveldb::Status status;
DrvAsync(DrvData* d, ErlDrvTermData c, int cmd) :
drvdata(d), caller(c), command(cmd) {
drvdata(d), caller(c), command(cmd), binary(NULL), reply(LETS_TRUE) {
}
DrvAsync(DrvData* d, ErlDrvTermData c, int cmd, char* key, int keylen) :
drvdata(d), caller(c), command(cmd), skey((const char*) key, keylen) {
DrvAsync(DrvData* d, ErlDrvTermData c, int cmd, const char* key, int keylen) :
drvdata(d), caller(c), command(cmd), binary(NULL), reply(LETS_TRUE) {
binary = driver_alloc_binary(keylen);
assert(binary);
memcpy(binary->orig_bytes, key, binary->orig_size);
}
void put(char* key, int keylen, char* blob, int bloblen) {
leveldb::Slice skey((const char*) key, keylen);
leveldb::Slice sblob((const char*) blob, bloblen);
~DrvAsync() {
if (binary) {
driver_free_binary(binary);
}
}
void put(const char* key, int keylen, const char* blob, int bloblen) {
leveldb::Slice skey(key, keylen);
leveldb::Slice sblob(blob, bloblen);
batch.Put(skey, sblob);
}
void del(char* key, int keylen) {
leveldb::Slice skey((const char*) key, keylen);
void del(const char* key, int keylen) {
leveldb::Slice skey(key, keylen);
batch.Delete(skey);
}
};
static void lets_output_create6(const char op, DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_create6(void* async_data);
static void lets_output_open6(DrvData* d, char* buf, int len, int* index, int items);
static void lets_output_destroy6(DrvData* d, char* buf, int len, int* index, int items);
static void lets_output_repair6(DrvData* d, char* buf, int len, int* index, int items);
static void lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_insert2(void* async_data);
static void lets_output_insert3(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_insert3(void* async_data);
// static void lets_output_insert_new2(DrvData* d, char* buf, int len, int* index, int items);
// static void lets_async_insert_new2(void* async_data);
// static void lets_output_insert_new3(DrvData* d, char* buf, int len, int* index, int items);
// static void lets_async_insert_new3(void* async_data);
static void lets_output_delete1(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_delete1(void* async_data);
static void lets_output_delete2(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_delete2(void* async_data);
// static void lets_output_delete_all_objects1(DrvData* d, char* buf, int len, int* index, int items);
// static void lets_async_delete_all_objects1(void* async_data);
static void lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_lookup2(void* async_data);
static void lets_output_first1(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_first1(void* async_data);
static void lets_output_next2(DrvData* d, char* buf, int len, int* index, int items);
static void lets_async_next2(void* async_data);
// static void lets_output_info_memory1(DrvData* d, char* buf, int len, int* index, int items);
// static void lets_async_info_memory1(void* async_data);
// static void lets_output_info_size1(DrvData* d, char* buf, int len, int* index, int items);
// static void lets_async_info_size1(void* async_data);
static void
driver_send_int(DrvData* d, const int i)
driver_send_int(DrvData* d, const int i, ErlDrvTermData caller=0)
{
ErlDrvTermData caller = driver_caller(d->port);
ErlDrvTermData spec[] = {
ERL_DRV_PORT, driver_mk_port(d->port),
ERL_DRV_INT, i,
ERL_DRV_TUPLE, 2,
};
if (!caller) {
caller = driver_caller(d->port);
}
driver_send_term(d->port, caller, spec, sizeof(spec) / sizeof(spec[0]));
}
static void
driver_send_binary(DrvData* d, const char *buf, const ErlDrvUInt len)
driver_send_binary(DrvData* d, ErlDrvBinary* bin, ErlDrvTermData caller=0)
{
ErlDrvTermData caller = driver_caller(d->port);
ErlDrvTermData spec[] = {
ERL_DRV_PORT, driver_mk_port(d->port),
ERL_DRV_INT, LETS_TRUE,
ERL_DRV_INT, LETS_BINARY,
ERL_DRV_BINARY, (ErlDrvTermData) bin, bin->orig_size, 0,
ERL_DRV_TUPLE, 3,
};
if (!caller) {
caller = driver_caller(d->port);
}
driver_send_term(d->port, caller, spec, sizeof(spec) / sizeof(spec[0]));
}
static void
driver_send_buf(DrvData* d, const char *buf, const ErlDrvUInt len, ErlDrvTermData caller=0)
{
ErlDrvTermData spec[] = {
ERL_DRV_PORT, driver_mk_port(d->port),
ERL_DRV_INT, LETS_BINARY,
ERL_DRV_BUF2BINARY, (ErlDrvTermData) buf, len,
ERL_DRV_TUPLE, 3,
};
if (!caller) {
caller = driver_caller(d->port);
}
driver_send_term(d->port, caller, spec, sizeof(spec) / sizeof(spec[0]));
}
@ -273,69 +319,68 @@ drv_output(ErlDrvData handle, char* buf, int len)
lets_output_repair6(d, buf, len, &index, items);
break;
case LETS_INSERT2:
ng = (items != 1);
ng = (items != 1 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_insert2(d, buf, len, &index, items);
break;
case LETS_INSERT3:
ng = (items != 2);
ng = (items != 2 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_insert3(d, buf, len, &index, items);
break;
case LETS_INSERT_NEW2:
ng = (items != 1);
ng = (items != 1 || !d->impl.alive);
if (ng) GOTOBADARG;
GOTOBADARG;
break;
case LETS_INSERT_NEW3:
ng = (items != 2);
ng = (items != 2 || !d->impl.alive);
if (ng) GOTOBADARG;
GOTOBADARG;
break;
case LETS_DELETE1:
ng = (items != 0);
ng = (items != 0 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_delete1(d, buf, len, &index, items);
break;
case LETS_DELETE2:
ng = (items != 1);
ng = (items != 1 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_delete2(d, buf, len, &index, items);
break;
case LETS_DELETE_ALL_OBJECTS1:
ng = (items != 0);
ng = (items != 0 || !d->impl.alive);
if (ng) GOTOBADARG;
GOTOBADARG;
break;
case LETS_LOOKUP2:
ng = (items != 1);
ng = (items != 1 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_lookup2(d, buf, len, &index, items);
break;
case LETS_FIRST1:
ng = (items != 0);
ng = (items != 0 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_first1(d, buf, len, &index, items);
break;
case LETS_NEXT2:
ng = (items != 1);
ng = (items != 1 || !d->impl.alive);
if (ng) GOTOBADARG;
lets_output_next2(d, buf, len, &index, items);
break;
case LETS_INFO_MEMORY1:
ng = (items != 0);
ng = (items != 0 || !d->impl.alive);
if (ng) GOTOBADARG;
GOTOBADARG;
break;
case LETS_INFO_SIZE1:
ng = (items != 0);
ng = (items != 0 || !d->impl.alive);
if (ng) GOTOBADARG;
GOTOBADARG;
break;
default:
GOTOBADARG;
}
return;
badarg:
@ -348,21 +393,36 @@ drv_ready_async(ErlDrvData handle, ErlDrvThreadData async_data)
{
DrvData* d = (DrvData*) handle;
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
(void) d;
(void) a;
switch (a->reply) {
case LETS_BADARG:
driver_send_int(d, LETS_BADARG, a->caller);
break;
case LETS_TRUE:
driver_send_int(d, LETS_TRUE, a->caller);
break;
case LETS_END_OF_TABLE:
driver_send_int(d, LETS_END_OF_TABLE, a->caller);
break;
case LETS_BINARY:
driver_send_binary(d, a->binary, a->caller);
break;
default:
driver_send_int(d, LETS_BADARG, a->caller);
}
delete a;
}
void
lets_output_do(void* async_data)
drv_async_free(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
(void) a;
delete a;
}
//
// Commands
//
@ -370,6 +430,7 @@ lets_output_do(void* async_data)
static void
lets_output_create6(const char op, DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
char type;
char privacy;
char *name;
@ -444,18 +505,40 @@ lets_output_create6(const char op, DrvData* d, char* buf, int len, int* index, i
GOTOBADARG;
}
if (!lets_create(d->impl, op)) {
GOTOBADARG;
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), op);
if (!drv_async) {
GOTOBADARG;
}
driver_async(d->port, NULL, lets_async_create6, drv_async, drv_async_free);
} else {
if (!lets_create(d->impl, op)) {
GOTOBADARG;
}
driver_send_int(d, LETS_TRUE);
}
driver_send_int(d, LETS_TRUE);
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_async_create6(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
if (!lets_create(d->impl, a->command)) {
a->reply = LETS_BADARG;
} else {
a->reply = LETS_TRUE;
}
}
static void
lets_output_open6(DrvData* d, char* buf, int len, int* index, int items)
{
@ -477,6 +560,7 @@ lets_output_repair6(DrvData* d, char* buf, int len, int* index, int items)
static void
lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
int ng, arity;
char *key;
long keylen;
@ -493,6 +577,13 @@ lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items)
return;
}
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_INSERT2);
if (!drv_async) {
GOTOBADARG;
}
}
while (items) {
ng = ei_decode_tuple_header(buf, index, &arity);
if (ng) GOTOBADARG;
@ -503,9 +594,13 @@ lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items)
ng = ei_inspect_binary(buf, index, (void**) &blob, &bloblen);
if (ng) GOTOBADARG;
leveldb::Slice skey((const char*) key, keylen);
leveldb::Slice sblob((const char*) blob, bloblen);
batch.Put(skey, sblob);
if (drv_async) {
drv_async->put((const char*) key, keylen, (const char*) blob, bloblen);
} else {
leveldb::Slice skey((const char*) key, keylen);
leveldb::Slice sblob((const char*) blob, bloblen);
batch.Put(skey, sblob);
}
items--;
}
@ -514,26 +609,43 @@ lets_output_insert2(DrvData* d, char* buf, int len, int* index, int items)
ng = (items != 0);
if (ng) GOTOBADARG;
if (!d->impl.alive) {
GOTOBADARG;
}
if (drv_async) {
driver_async(d->port, NULL, lets_async_insert2, drv_async, drv_async_free);
} else {
status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
}
status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
driver_send_int(d, LETS_TRUE);
}
driver_send_int(d, LETS_TRUE);
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_async_insert2(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &(a->batch));
if (!status.ok()) {
a->reply = LETS_BADARG;
} else {
a->reply = LETS_TRUE;
}
}
static void
lets_output_insert3(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
int ng;
char *key;
long keylen;
@ -547,64 +659,114 @@ lets_output_insert3(DrvData* d, char* buf, int len, int* index, int items)
ng = ei_inspect_binary(buf, index, (void**) &blob, &bloblen);
if (ng) GOTOBADARG;
{
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_INSERT3);
if (!drv_async) {
GOTOBADARG;
}
drv_async->put((const char*) key, keylen, (const char*) blob, bloblen);
} else {
leveldb::Slice skey((const char*) key, keylen);
leveldb::Slice sblob((const char*) blob, bloblen);
batch.Put(skey, sblob);
}
if (!d->impl.alive) {
GOTOBADARG;
}
if (drv_async) {
driver_async(d->port, NULL, lets_async_insert3, drv_async, drv_async_free);
} else {
status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
}
status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
driver_send_int(d, LETS_TRUE);
}
driver_send_int(d, LETS_TRUE);
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_async_insert3(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &(a->batch));
if (!status.ok()) {
a->reply = LETS_BADARG;
} else {
a->reply = LETS_TRUE;
}
}
static void
lets_output_delete1(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
leveldb::WriteOptions db_write_options;
leveldb::WriteBatch batch;
leveldb::Status status;
if (!d->impl.alive) {
GOTOBADARG;
}
// alive
d->impl.alive = 0;
db_write_options.sync = true;
status = d->impl.db->Write(db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_DELETE1);
if (!drv_async) {
GOTOBADARG;
}
driver_async(d->port, NULL, lets_async_delete1, drv_async, drv_async_free);
} else {
db_write_options.sync = true;
status = d->impl.db->Write(db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
}
// @TBD This is quite risky ... need to re-consider.
// delete d->impl.db;
// d->impl.db = NULL;
driver_send_int(d, LETS_TRUE);
}
// @TBD This is quite risky ... need to re-consider.
// delete d->impl.db;
// d->impl.db = NULL;
driver_send_int(d, LETS_TRUE);
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_async_delete1(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::WriteOptions db_write_options;
leveldb::WriteBatch batch;
db_write_options.sync = true;
leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
a->reply = LETS_BADARG;
} else {
// @TBD This is quite risky ... need to re-consider.
// delete d->impl.db;
// d->impl.db = NULL;
a->reply = LETS_TRUE;
}
}
static void
lets_output_delete2(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
int ng;
char *key;
long keylen;
@ -614,31 +776,53 @@ lets_output_delete2(DrvData* d, char* buf, int len, int* index, int items)
ng = ei_inspect_binary(buf, index, (void**) &key, &keylen);
if (ng) GOTOBADARG;
{
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_INSERT2);
if (!drv_async) {
GOTOBADARG;
}
drv_async->del((const char*) key, keylen);
} else {
leveldb::Slice skey((const char*) key, keylen);
batch.Delete(skey);
}
if (!d->impl.alive) {
GOTOBADARG;
}
if (drv_async) {
driver_async(d->port, NULL, lets_async_delete2, drv_async, drv_async_free);
} else {
status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
}
status = d->impl.db->Write(d->impl.db_write_options, &batch);
if (!status.ok()) {
GOTOBADARG;
driver_send_int(d, LETS_TRUE);
}
driver_send_int(d, LETS_TRUE);
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_async_delete2(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::Status status = d->impl.db->Write(d->impl.db_write_options, &(a->batch));
if (!status.ok()) {
a->reply = LETS_BADARG;
} else {
a->reply = LETS_TRUE;
}
}
static void
lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
int ng;
char *key;
long keylen;
@ -646,11 +830,13 @@ lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items)
ng = ei_inspect_binary(buf, index, (void**) &key, &keylen);
if (ng) GOTOBADARG;
if (!d->impl.alive) {
GOTOBADARG;
}
{
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_LOOKUP2, (const char*) key, keylen);
if (!drv_async) {
GOTOBADARG;
}
driver_async(d->port, NULL, lets_async_lookup2, drv_async, drv_async_free);
} else {
leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options);
if (!it) {
GOTOBADARG;
@ -659,29 +845,67 @@ lets_output_lookup2(DrvData* d, char* buf, int len, int* index, int items)
leveldb::Slice skey((const char*) key, keylen);
it->Seek(skey);
if (!it->Valid() || it->key().compare(skey) != 0) {
driver_send_int(d, LETS_TRUE);
driver_send_int(d, LETS_END_OF_TABLE);
delete it;
return;
}
driver_send_binary(d, it->value().data(), it->value().size());
driver_send_buf(d, it->value().data(), it->value().size());
delete it;
}
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_output_first1(DrvData* d, char* buf, int len, int* index, int items)
lets_async_lookup2(void* async_data)
{
if (!d->impl.alive) {
GOTOBADARG;
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options);
if (!it) {
a->reply = LETS_BADARG;
return;
}
{
leveldb::Slice skey((const char*) a->binary->orig_bytes, a->binary->orig_size);
it->Seek(skey);
if (!it->Valid() || it->key().compare(skey) != 0) {
a->reply = LETS_END_OF_TABLE;
delete it;
return;
}
ErlDrvBinary* binary = driver_realloc_binary(a->binary, it->value().size());
if (binary) {
memcpy(binary->orig_bytes, it->value().data(), binary->orig_size);
a->binary = binary;
a->reply = LETS_BINARY;
} else {
a->reply = LETS_BADARG;
}
delete it;
}
static void
lets_output_first1(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_FIRST1);
if (!drv_async) {
GOTOBADARG;
}
driver_async(d->port, NULL, lets_async_first1, drv_async, drv_async_free);
} else {
leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options);
if (!it) {
GOTOBADARG;
@ -694,19 +918,53 @@ lets_output_first1(DrvData* d, char* buf, int len, int* index, int items)
return;
}
driver_send_binary(d, it->key().data(), it->key().size());
driver_send_buf(d, it->key().data(), it->key().size());
delete it;
}
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
return;
}
static void
lets_async_first1(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options);
if (!it) {
a->reply = LETS_BADARG;
return;
}
it->SeekToFirst();
if (!it->Valid()) {
a->reply = LETS_END_OF_TABLE;
delete it;
return;
}
ErlDrvBinary* binary = driver_alloc_binary(it->key().size());
if (binary) {
memcpy(binary->orig_bytes, it->key().data(), binary->orig_size);
a->binary = binary;
a->reply = LETS_BINARY;
} else {
a->reply = LETS_BADARG;
}
delete it;
}
static void
lets_output_next2(DrvData* d, char* buf, int len, int* index, int items)
{
DrvAsync* drv_async = NULL;
int ng;
char *key;
long keylen;
@ -714,11 +972,13 @@ lets_output_next2(DrvData* d, char* buf, int len, int* index, int items)
ng = ei_inspect_binary(buf, index, (void**) &key, &keylen);
if (ng) GOTOBADARG;
if (!d->impl.alive) {
GOTOBADARG;
}
{
if (d->impl.async) {
drv_async = new DrvAsync(d, driver_caller(d->port), LETS_NEXT2, (const char*) key, keylen);
if (!drv_async) {
GOTOBADARG;
}
driver_async(d->port, NULL, lets_async_next2, drv_async, drv_async_free);
} else {
leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options);
if (!it) {
GOTOBADARG;
@ -741,11 +1001,54 @@ lets_output_next2(DrvData* d, char* buf, int len, int* index, int items)
}
}
driver_send_binary(d, it->key().data(), it->key().size());
driver_send_buf(d, it->key().data(), it->key().size());
delete it;
}
return;
badarg:
if (drv_async) { delete drv_async; }
driver_send_int(d, LETS_BADARG);
}
static void
lets_async_next2(void* async_data)
{
DrvAsync* a = (DrvAsync*) async_data;
assert(a != NULL);
DrvData* d = a->drvdata;
leveldb::Iterator* it = d->impl.db->NewIterator(d->impl.db_read_options);
if (!it) {
a->reply = LETS_BADARG;
return;
}
leveldb::Slice skey((const char*) a->binary->orig_bytes, a->binary->orig_size);
it->Seek(skey);
if (!it->Valid()) {
a->reply = LETS_END_OF_TABLE;
delete it;
return;
}
if (it->key().compare(skey) == 0) {
it->Next();
if (!it->Valid()) {
a->reply = LETS_END_OF_TABLE;
delete it;
return;
}
}
ErlDrvBinary* binary = driver_realloc_binary(a->binary, it->key().size());
if (binary) {
memcpy(binary->orig_bytes, it->key().data(), binary->orig_size);
a->binary = binary;
a->reply = LETS_BINARY;
} else {
a->reply = LETS_BADARG;
}
delete it;
}

View file

@ -203,6 +203,16 @@ lets_parse_options(lets_impl& impl,
} else {
return FALSE;
}
} else if (strcmp(atom, "async") == 0) {
ng = ei_decode_atom(buf, &index, atom);
if (ng) return FALSE;
if (strcmp(atom, "true") == 0) {
impl.async = true;
} else if (strcmp(atom, "false") == 0) {
impl.async = false;
} else {
return FALSE;
}
} else {
return FALSE;
}

View file

@ -60,7 +60,8 @@ extern "C" {
typedef struct
{
char alive;
bool async;
bool alive;
char type;
char privacy;
std::string* name;

View file

@ -347,6 +347,10 @@ lets_nif_delete1(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
return MAKEBADARG(env, status);
}
if (!h->impl.alive) {
return MAKEBADARG(env, status);
}
// alive
h->impl.alive = 0;
@ -429,7 +433,7 @@ lets_nif_lookup2(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
it->Seek(skey);
if (!it->Valid() || it->key().compare(skey) != 0) {
delete it;
return lets_atom_true;
return lets_atom_end_of_table;
}
size_t size = it->value().size();

View file

@ -52,6 +52,7 @@ ERL_NIF_TERM lets_atom_block_size = 0;
ERL_NIF_TERM lets_atom_compression = 0;
ERL_NIF_TERM lets_atom_no = 0;
ERL_NIF_TERM lets_atom_snappy = 0;
ERL_NIF_TERM lets_atom_async = 0;
ERL_NIF_TERM lets_atom_block_restart_interval = 0;
ERL_NIF_TERM lets_atom_verify_checksums = 0;
ERL_NIF_TERM lets_atom_fill_cache = 0;
@ -78,6 +79,7 @@ lets_nif_lib_init(ErlNifEnv* env)
lets_atom_compression = enif_make_atom(env, "compression");
lets_atom_no = enif_make_atom(env, "no");
lets_atom_snappy = enif_make_atom(env, "snappy");
lets_atom_async = enif_make_atom(env, "async");
lets_atom_block_restart_interval = enif_make_atom(env, "block_restart_interval");
lets_atom_verify_checksums = enif_make_atom(env, "verify_checksums");
lets_atom_fill_cache = enif_make_atom(env, "fill_cache");
@ -211,6 +213,14 @@ lets_parse_options(ErlNifEnv* env, lets_impl& impl,
} else {
return FALSE;
}
} else if (enif_is_identical(tuple[0], lets_atom_async)) {
if (enif_is_identical(tuple[1], lets_atom_true)) {
impl.async = true;
} else if (enif_is_identical(tuple[1], lets_atom_false)) {
impl.async = false;
} else {
return FALSE;
}
} else {
return FALSE;
}

View file

@ -60,7 +60,8 @@ extern "C" {
typedef struct
{
char alive;
bool async;
bool alive;
char type;
char privacy;
std::string* name;
@ -91,6 +92,7 @@ extern "C" {
extern ERL_NIF_TERM lets_atom_compression;
extern ERL_NIF_TERM lets_atom_no;
extern ERL_NIF_TERM lets_atom_snappy;
extern ERL_NIF_TERM lets_atom_async;
extern ERL_NIF_TERM lets_atom_block_restart_interval;
extern ERL_NIF_TERM lets_atom_verify_checksums;
extern ERL_NIF_TERM lets_atom_fill_cache;

View file

@ -498,19 +498,6 @@ Explain how to build and to run lets with valgrind enabled
</li>
<li>
<p>
Performance
</p>
<ul>
<li>
<p>
Update driver implementation to use Erlang's asynchronous driver
thread pool for all LevelDB operations.
</p>
</li>
</ul>
</li>
<li>
<p>
Testing
</p>
<ul>
@ -522,8 +509,13 @@ Functional
<li>
<p>
Update test model to include LevelDB's database, read, and
write options. These options have not undergone any explicit
testing.
write options. These options have not been tested.
</p>
</li>
<li>
<p>
Update test model to include LevelDB's destroy and repair
operations. These operations have not been tested.
</p>
</li>
</ul>

View file

@ -44,7 +44,7 @@
<pre>ets_opt() = set | ordered_set | named_table | {key_pos, pos_integer()} | public | protected | private | compressed</pre>
<pre>ets_opt() = set | ordered_set | named_table | {key_pos, pos_integer()} | public | protected | private | compressed | async</pre>
@ -138,6 +138,11 @@ is empty, <tt><em>$end_of_table</em></tt> will be returned.</p>.</td></tr><tr><t
</li>
<li>
<p>
<tt>async</tt> <em>only the drv implementation</em>
</p>
</li>
<li>
<p>
<tt>memory</tt> <em>only the ets implementation</em>
</p>
</li>
@ -212,6 +217,13 @@ stored in a compressed format.
</li>
<li>
<p>
<tt>async</tt> If this option is present, the emulator's async thread
pool will be used when accessing the table data. <em>only the drv
implementation</em>
</p>
</li>
<li>
<p>
<tt>drv</tt> If this option is present, the table data will be stored
with LevelDB backend via an Erlang Driver. This is the default
setting for the table implementation.
@ -482,6 +494,11 @@ __See also:__ [ets:first/1](ets.md#first-1).<a name="info-2"></a>
</li>
<li>
<p>
<tt>async</tt> <em>only the drv implementation</em>
</p>
</li>
<li>
<p>
<tt>memory</tt> <em>only the ets implementation</em>
</p>
</li>
@ -620,6 +637,13 @@ stored in a compressed format.
</li>
<li>
<p>
<tt>async</tt> If this option is present, the emulator's async thread
pool will be used when accessing the table data. <em>only the drv
implementation</em>
</p>
</li>
<li>
<p>
<tt>drv</tt> If this option is present, the table data will be stored
with LevelDB backend via an Erlang Driver. This is the default
setting for the table implementation.

View file

@ -26,7 +26,7 @@
`delete(Tab, Drv) -> any()`
`delete(Tab, Impl) -> any()`
<a name="delete-3"></a>
@ -35,7 +35,7 @@
`delete(Tab, Drv, Key) -> any()`
`delete(Tab, Impl, Key) -> any()`
<a name="delete_all_objects-2"></a>
@ -44,7 +44,7 @@
`delete_all_objects(Tab, Drv) -> any()`
`delete_all_objects(Tab, Impl) -> any()`
<a name="destroy-4"></a>
@ -62,7 +62,7 @@
`first(Tab, Drv) -> any()`
`first(Tab, Impl) -> any()`
<a name="info_memory-2"></a>
@ -71,7 +71,7 @@
`info_memory(Tab, Drv) -> any()`
`info_memory(Tab, Impl) -> any()`
<a name="info_size-2"></a>
@ -80,7 +80,7 @@
`info_size(Tab, Drv) -> any()`
`info_size(Tab, Impl) -> any()`
<a name="insert-3"></a>
@ -89,7 +89,7 @@
`insert(Tab, Drv, Object) -> any()`
`insert(Tab, Impl, Object) -> any()`
<a name="insert_new-3"></a>
@ -98,7 +98,7 @@
`insert_new(Tab, Drv, Object) -> any()`
`insert_new(Tab, Impl, Object) -> any()`
<a name="lookup-3"></a>
@ -107,7 +107,7 @@
`lookup(Tab, Drv, Key) -> any()`
`lookup(Tab, Impl, Key) -> any()`
<a name="next-3"></a>
@ -116,7 +116,7 @@
`next(Tab, Drv, Key) -> any()`
`next(Tab, Impl, Key) -> any()`
<a name="open-4"></a>
@ -143,5 +143,5 @@
`tab2list(Tab, Drv) -> any()`
`tab2list(Tab, Impl) -> any()`

View file

@ -26,7 +26,7 @@
`delete(Tab, Nif) -> any()`
`delete(Tab, Impl) -> any()`
<a name="delete-3"></a>
@ -35,7 +35,7 @@
`delete(Tab, Nif, Key) -> any()`
`delete(Tab, Impl, Key) -> any()`
<a name="delete_all_objects-2"></a>
@ -44,7 +44,7 @@
`delete_all_objects(Tab, Nif) -> any()`
`delete_all_objects(Tab, Impl) -> any()`
<a name="destroy-4"></a>
@ -62,7 +62,7 @@
`first(Tab, Nif) -> any()`
`first(Tab, Impl) -> any()`
<a name="info_memory-2"></a>
@ -71,7 +71,7 @@
`info_memory(Tab, Nif) -> any()`
`info_memory(Tab, Impl) -> any()`
<a name="info_size-2"></a>
@ -80,7 +80,7 @@
`info_size(Tab, Nif) -> any()`
`info_size(Tab, Impl) -> any()`
<a name="insert-3"></a>
@ -89,7 +89,7 @@
`insert(Tab, Nif, Object) -> any()`
`insert(Tab, Impl, Object) -> any()`
<a name="insert_new-3"></a>
@ -98,7 +98,7 @@
`insert_new(Tab, Nif, Object) -> any()`
`insert_new(Tab, Impl, Object) -> any()`
<a name="lookup-3"></a>
@ -107,7 +107,7 @@
`lookup(Tab, Nif, Key) -> any()`
`lookup(Tab, Impl, Key) -> any()`
<a name="next-3"></a>
@ -116,7 +116,7 @@
`next(Tab, Nif, Key) -> any()`
`next(Tab, Impl, Key) -> any()`
<a name="open-4"></a>
@ -143,5 +143,5 @@
`tab2list(Tab, Nif) -> any()`
`tab2list(Tab, Impl) -> any()`

View file

@ -255,15 +255,12 @@ $ erl -smp +A 5 -pz ../../sext/ebin -pz ../../qc/ebin
* Explain how to build and to run lets with valgrind enabled
OTP/Erlang virtual machine
- Performance
* Update driver implementation to use Erlang\'s asynchronous driver
thread pool for all LevelDB operations.
- Testing
* Functional
** Update test model to include LevelDB\'s database, read, and
write options. These options have not undergone any explicit
testing.
write options. These options have not been tested.
** Update test model to include LevelDB\'s destroy and repair
operations. These operations have not been tested.
* Performance (TBD)
* Stability (TBD)

View file

@ -50,7 +50,7 @@
-opaque tab() :: #tab{}.
-type opts() :: [ets_opt() | impl_opt() | db_opts() | db_read_opts() | db_write_opts()].
-type ets_opt() :: set | ordered_set | named_table | {key_pos,pos_integer()} | public | protected | private | compressed.
-type ets_opt() :: set | ordered_set | named_table | {key_pos,pos_integer()} | public | protected | private | compressed | async.
-type impl_opt() :: drv | nif | ets.
-type db_opts() :: {db, [{path,file:filename()} | create_if_missing | {create_if_missing,boolean()} | error_if_exists | {error_if_exists,boolean()} | paranoid_checks | {paranoid_checks,boolean()} | {write_buffer_size,pos_integer()} | {max_open_files,pos_integer()} | {block_cache_size,pos_integer()} | {block_size,pos_integer()} | {block_restart_interval,pos_integer()}]}.
@ -98,6 +98,10 @@
%% - +compressed+ If this option is present, the table data will be
%% stored in a compressed format.
%%
%% - +async+ If this option is present, the emulator\'s async thread
%% pool will be used when accessing the table data. _only the drv
%% implementation_
%%
%% - +drv+ If this option is present, the table data will be stored
%% with LevelDB backend via an Erlang Driver. This is the default
%% setting for the table implementation.
@ -310,6 +314,7 @@ next(Tab, Key) ->
%% - +keypos+
%% - +protection+
%% - +compressed+
%% - +async+ _only the drv implementation_
%% - +memory+ _only the ets implementation_
%% - +size+ _only the ets implementation_
%%
@ -337,6 +342,8 @@ info(Tab, Item) ->
Tab#tab.protection;
compressed ->
Tab#tab.compressed;
async ->
Tab#tab.async;
memory ->
Mod:info_memory(Tab, Impl);
size ->
@ -414,6 +421,7 @@ create(Op, Name, Opts) ->
end
end,
Compressed = proplists:get_bool(compressed, POpts),
Async = proplists:get_bool(async, POpts),
Drv = proplists:get_bool(drv, POpts),
Nif = proplists:get_bool(nif, POpts),
Ets = proplists:get_bool(ets, POpts),
@ -424,7 +432,8 @@ create(Op, Name, Opts) ->
type=Type,
keypos=KeyPos,
protection=Protection,
compressed=Compressed},
compressed=Compressed,
async=Async},
DBOptions = fix_db_options(Tab, proplists:get_value(db, POpts, [])),
DBReadOptions = proplists:get_value(db_read, POpts, []),
@ -440,8 +449,11 @@ create(Op, Name, Opts) ->
lets_drv:Op(Tab, DBOptions, DBReadOptions, DBWriteOptions)
end.
fix_db_options(#tab{name=Name, compressed=Compressed}, Options) ->
fix_db_options_compression(Compressed, fix_db_options_path(Name, Options)).
fix_db_options(#tab{name=Name, compressed=Compressed, async=Async}, Options0) ->
Options1 = fix_db_options_path(Name, Options0),
Options2 = fix_db_options_compression(Compressed, Options1),
Options3 = fix_db_options_async(Async, Options2),
Options3.
fix_db_options_path(Name, Options) ->
case proplists:lookup(path, Options) of
@ -456,6 +468,11 @@ fix_db_options_compression(false, Options) ->
fix_db_options_compression(true, Options) ->
[{compression, snappy}|Options].
fix_db_options_async(false, Options) ->
[{async, false}|Options];
fix_db_options_async(true, Options) ->
[{async, true}|Options].
binify(X) when is_atom(X) ->
list_to_binary(atom_to_list(X));
binify(X) when is_list(X) ->
@ -464,7 +481,7 @@ binify(X) when is_binary(X) ->
X.
options(Options) ->
Keys = [set, ordered_set, named_table, keypos, public, protected, private, compressed, drv, nif, ets, db, db_read, db_write],
Keys = [set, ordered_set, named_table, keypos, public, protected, private, compressed, async, drv, nif, ets, db, db_read, db_write],
options(Options, Keys).
options(Options, Keys) when is_list(Options) ->

View file

@ -32,6 +32,7 @@
keypos=1 :: pos_integer(),
protection=protected :: public|protected|private,
compressed=false :: boolean(),
async=false :: boolean(),
drv :: port() | undefined,
nif :: nif() | undefined,
ets :: ets:tab() | undefined

View file

@ -49,6 +49,7 @@
-define(LETS_BADARG, 16#00).
-define(LETS_TRUE, 16#01).
-define(LETS_END_OF_TABLE, 16#02).
-define(LETS_BINARY, 16#03).
-define(LETS_OPEN6, 16#00).
-define(LETS_DESTROY6, 16#01).
@ -96,9 +97,9 @@ init() ->
open(#tab{name=_Name, named_table=_Named, type=Type, protection=Protection}=Tab, Options, ReadOptions, WriteOptions) ->
{value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options),
Drv = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions),
%% @TODO implement named Drv (of sorts)
Tab#tab{drv=Drv}.
Impl = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions),
%% @TODO implement named Impl (of sorts)
Tab#tab{drv=Impl}.
destroy(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOptions) ->
{value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options),
@ -108,81 +109,81 @@ repair(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOption
{value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options),
impl_repair(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions).
insert(#tab{keypos=KeyPos, type=Type}, Drv, Object) when is_tuple(Object) ->
insert(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) ->
Key = element(KeyPos,Object),
Val = Object,
impl_insert(Drv, encode(Type, Key), encode(Type, Val));
insert(#tab{keypos=KeyPos, type=Type}, Drv, Objects) when is_list(Objects) ->
impl_insert(Impl, encode(Type, Key), encode(Type, Val));
insert(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) ->
List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ],
impl_insert(Drv, List).
impl_insert(Impl, List).
insert_new(#tab{keypos=KeyPos, type=Type}, Drv, Object) when is_tuple(Object) ->
insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) ->
Key = element(KeyPos,Object),
Val = Object,
impl_insert_new(Drv, encode(Type, Key), encode(Type, Val));
insert_new(#tab{keypos=KeyPos, type=Type}, Drv, Objects) when is_list(Objects) ->
impl_insert_new(Impl, encode(Type, Key), encode(Type, Val));
insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) ->
List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ],
impl_insert_new(Drv, List).
impl_insert_new(Impl, List).
delete(_Tab, Drv) ->
impl_delete(Drv).
delete(_Tab, Impl) ->
impl_delete(Impl).
delete(#tab{type=Type}, Drv, Key) ->
impl_delete(Drv, encode(Type, Key)).
delete(#tab{type=Type}, Impl, Key) ->
impl_delete(Impl, encode(Type, Key)).
delete_all_objects(_Tab, Drv) ->
impl_delete_all_objects(Drv).
delete_all_objects(_Tab, Impl) ->
impl_delete_all_objects(Impl).
lookup(#tab{type=Type}, Drv, Key) ->
case impl_lookup(Drv, encode(Type, Key)) of
true ->
lookup(#tab{type=Type}, Impl, Key) ->
case impl_lookup(Impl, encode(Type, Key)) of
'$end_of_table' ->
[];
Object when is_binary(Object) ->
[decode(Type, Object)]
end.
first(#tab{type=Type}, Drv) ->
case impl_first(Drv) of
first(#tab{type=Type}, Impl) ->
case impl_first(Impl) of
'$end_of_table' ->
'$end_of_table';
Key ->
decode(Type, Key)
end.
next(#tab{type=Type}, Drv, Key) ->
case impl_next(Drv, encode(Type, Key)) of
next(#tab{type=Type}, Impl, Key) ->
case impl_next(Impl, encode(Type, Key)) of
'$end_of_table' ->
'$end_of_table';
Next ->
decode(Type, Next)
end.
info_memory(_Tab, Drv) ->
case impl_info_memory(Drv) of
info_memory(_Tab, Impl) ->
case impl_info_memory(Impl) of
Memory when is_integer(Memory) ->
erlang:round(Memory / erlang:system_info(wordsize));
Else ->
Else
end.
info_size(_Tab, Drv) ->
impl_info_size(Drv).
info_size(_Tab, Impl) ->
impl_info_size(Impl).
tab2list(Tab, Drv) ->
tab2list(Tab, Drv, impl_first(Drv), []).
tab2list(Tab, Impl) ->
tab2list(Tab, Impl, impl_first(Impl), []).
tab2list(_Tab, _Drv, '$end_of_table', Acc) ->
tab2list(_Tab, _Impl, '$end_of_table', Acc) ->
lists:reverse(Acc);
tab2list(#tab{type=Type}=Tab, Drv, Key, Acc) ->
tab2list(#tab{type=Type}=Tab, Impl, Key, Acc) ->
NewAcc =
case impl_lookup(Drv, Key) of
true ->
case impl_lookup(Impl, Key) of
'$end_of_table' ->
%% @NOTE This is not an atomic operation
Acc;
Object when is_binary(Object) ->
[decode(Type, Object)|Acc]
end,
tab2list(Tab, Drv, impl_next(Drv, Key), NewAcc).
tab2list(Tab, Impl, impl_next(Impl, Key), NewAcc).
%%%----------------------------------------------------------------------
@ -200,79 +201,73 @@ decode(ordered_set, Term) ->
sext:decode(Term).
impl_open(Type, Protection, Path, Options, ReadOptions, WriteOptions) ->
Drv = init(),
true = call(Drv, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}),
Drv.
Impl = init(),
true = call(Impl, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}),
Impl.
impl_destroy(Type, Protection, Path, Options, ReadOptions, WriteOptions) ->
Drv = init(),
true = call(Drv, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}),
_ = port_close(Drv),
Impl = init(),
true = call(Impl, {?LETS_OPEN6, Type, Protection, Path, Options, ReadOptions, WriteOptions}),
_ = port_close(Impl),
_ = erl_ddll:unload(lets_drv),
true.
impl_repair(Type, Protection, Path, Options, ReadOptions, WriteOptions) ->
Drv = init(),
true = call(Drv, {?LETS_REPAIR6, Type, Protection, Path, Options, ReadOptions, WriteOptions}),
_ = port_close(Drv),
Impl = init(),
true = call(Impl, {?LETS_REPAIR6, Type, Protection, Path, Options, ReadOptions, WriteOptions}),
_ = port_close(Impl),
_ = erl_ddll:unload(lets_drv),
true.
impl_insert(Drv, Key, Object) ->
call(Drv, {?LETS_INSERT3, Key, Object}).
impl_insert(Impl, Key, Object) ->
call(Impl, {?LETS_INSERT3, Key, Object}).
impl_insert(Drv, List) ->
call(Drv, {?LETS_INSERT2, List}).
impl_insert(Impl, List) ->
call(Impl, {?LETS_INSERT2, List}).
impl_insert_new(Drv, Key, Object) ->
call(Drv, {?LETS_INSERT_NEW3, Key, Object}).
impl_insert_new(Impl, Key, Object) ->
call(Impl, {?LETS_INSERT_NEW3, Key, Object}).
impl_insert_new(Drv, List) ->
call(Drv, {?LETS_INSERT_NEW2, List}).
impl_insert_new(Impl, List) ->
call(Impl, {?LETS_INSERT_NEW2, List}).
impl_delete(Drv) ->
Res = call(Drv, {?LETS_DELETE1}),
_ = port_close(Drv),
impl_delete(Impl) ->
Res = call(Impl, {?LETS_DELETE1}),
_ = port_close(Impl),
_ = erl_ddll:unload(lets_drv),
Res.
impl_delete(Drv, Key) ->
call(Drv, {?LETS_DELETE2, Key}).
impl_delete(Impl, Key) ->
call(Impl, {?LETS_DELETE2, Key}).
impl_delete_all_objects(Drv) ->
call(Drv, {?LETS_DELETE_ALL_OBJECTS1}).
impl_delete_all_objects(Impl) ->
call(Impl, {?LETS_DELETE_ALL_OBJECTS1}).
impl_lookup(Drv, Key) ->
call(Drv, {?LETS_LOOKUP2, Key}).
impl_lookup(Impl, Key) ->
call(Impl, {?LETS_LOOKUP2, Key}).
impl_first(Drv) ->
call(Drv, {?LETS_FIRST1}).
impl_first(Impl) ->
call(Impl, {?LETS_FIRST1}).
impl_next(Drv, Key) ->
call(Drv, {?LETS_NEXT2, Key}).
impl_next(Impl, Key) ->
call(Impl, {?LETS_NEXT2, Key}).
impl_info_memory(Drv) ->
call(Drv, {?LETS_INFO_MEMORY1}).
impl_info_memory(Impl) ->
call(Impl, {?LETS_INFO_MEMORY1}).
impl_info_size(Drv) ->
call(Drv, {?LETS_INFO_SIZE1}).
impl_info_size(Impl) ->
call(Impl, {?LETS_INFO_SIZE1}).
call(Drv, Tuple) ->
call(Impl, Tuple) ->
Data = term_to_binary(Tuple),
port_command(Drv, Data),
port_command(Impl, Data),
receive
{Drv, ?LETS_TRUE, Reply} ->
{Impl, ?LETS_BINARY, Reply} ->
Reply;
{Drv, ?LETS_TRUE} ->
{Impl, ?LETS_TRUE} ->
true;
{Drv, ?LETS_END_OF_TABLE} ->
{Impl, ?LETS_END_OF_TABLE} ->
'$end_of_table';
{Drv, ?LETS_BADARG} ->
erlang:error(badarg, [Drv])
%% after 1000 ->
%% receive X ->
%% erlang:error(timeout, [Drv, X])
%% after 0 ->
%% erlang:error(timeout, [Drv])
%% end
{Impl, ?LETS_BADARG} ->
erlang:error(badarg, [Impl])
end.

View file

@ -67,9 +67,9 @@ init() ->
open(#tab{name=_Name, named_table=_Named, type=Type, protection=Protection}=Tab, Options, ReadOptions, WriteOptions) ->
{value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options),
Nif = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions),
%% @TODO implement named Nif (of sorts)
Tab#tab{nif=Nif}.
Impl = impl_open(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions),
%% @TODO implement named Impl (of sorts)
Tab#tab{nif=Impl}.
destroy(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOptions) ->
{value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options),
@ -79,81 +79,81 @@ repair(#tab{type=Type, protection=Protection}, Options, ReadOptions, WriteOption
{value, {path,Path}, NewOptions} = lists:keytake(path, 1, Options),
impl_repair(Type, Protection, Path, NewOptions, ReadOptions, WriteOptions).
insert(#tab{keypos=KeyPos, type=Type}, Nif, Object) when is_tuple(Object) ->
insert(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) ->
Key = element(KeyPos,Object),
Val = Object,
impl_insert(Nif, encode(Type, Key), encode(Type, Val));
insert(#tab{keypos=KeyPos, type=Type}, Nif, Objects) when is_list(Objects) ->
impl_insert(Impl, encode(Type, Key), encode(Type, Val));
insert(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) ->
List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ],
impl_insert(Nif, List).
impl_insert(Impl, List).
insert_new(#tab{keypos=KeyPos, type=Type}, Nif, Object) when is_tuple(Object) ->
insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Object) when is_tuple(Object) ->
Key = element(KeyPos,Object),
Val = Object,
impl_insert_new(Nif, encode(Type, Key), encode(Type, Val));
insert_new(#tab{keypos=KeyPos, type=Type}, Nif, Objects) when is_list(Objects) ->
impl_insert_new(Impl, encode(Type, Key), encode(Type, Val));
insert_new(#tab{keypos=KeyPos, type=Type}, Impl, Objects) when is_list(Objects) ->
List = [{encode(Type, element(KeyPos,Object)), encode(Type, Object)} || Object <- Objects ],
impl_insert_new(Nif, List).
impl_insert_new(Impl, List).
delete(_Tab, Nif) ->
impl_delete(Nif).
delete(_Tab, Impl) ->
impl_delete(Impl).
delete(#tab{type=Type}, Nif, Key) ->
impl_delete(Nif, encode(Type, Key)).
delete(#tab{type=Type}, Impl, Key) ->
impl_delete(Impl, encode(Type, Key)).
delete_all_objects(_Tab, Nif) ->
impl_delete_all_objects(Nif).
delete_all_objects(_Tab, Impl) ->
impl_delete_all_objects(Impl).
lookup(#tab{type=Type}, Nif, Key) ->
case impl_lookup(Nif, encode(Type, Key)) of
true ->
lookup(#tab{type=Type}, Impl, Key) ->
case impl_lookup(Impl, encode(Type, Key)) of
'$end_of_table' ->
[];
Object when is_binary(Object) ->
[decode(Type, Object)]
end.
first(#tab{type=Type}, Nif) ->
case impl_first(Nif) of
first(#tab{type=Type}, Impl) ->
case impl_first(Impl) of
'$end_of_table' ->
'$end_of_table';
Key ->
decode(Type, Key)
end.
next(#tab{type=Type}, Nif, Key) ->
case impl_next(Nif, encode(Type, Key)) of
next(#tab{type=Type}, Impl, Key) ->
case impl_next(Impl, encode(Type, Key)) of
'$end_of_table' ->
'$end_of_table';
Next ->
decode(Type, Next)
end.
info_memory(_Tab, Nif) ->
case impl_info_memory(Nif) of
info_memory(_Tab, Impl) ->
case impl_info_memory(Impl) of
Memory when is_integer(Memory) ->
erlang:round(Memory / erlang:system_info(wordsize));
Else ->
Else
end.
info_size(_Tab, Nif) ->
impl_info_size(Nif).
info_size(_Tab, Impl) ->
impl_info_size(Impl).
tab2list(Tab, Nif) ->
tab2list(Tab, Nif, impl_first(Nif), []).
tab2list(Tab, Impl) ->
tab2list(Tab, Impl, impl_first(Impl), []).
tab2list(_Tab, _Nif, '$end_of_table', Acc) ->
tab2list(_Tab, _Impl, '$end_of_table', Acc) ->
lists:reverse(Acc);
tab2list(#tab{type=Type}=Tab, Nif, Key, Acc) ->
tab2list(#tab{type=Type}=Tab, Impl, Key, Acc) ->
NewAcc =
case impl_lookup(Nif, Key) of
true ->
case impl_lookup(Impl, Key) of
'$end_of_table' ->
%% @NOTE This is not an atomic operation
Acc;
Object when is_binary(Object) ->
[decode(Type, Object)|Acc]
end,
tab2list(Tab, Nif, impl_next(Nif, Key), NewAcc).
tab2list(Tab, Impl, impl_next(Impl, Key), NewAcc).
%%%----------------------------------------------------------------------
@ -182,38 +182,38 @@ impl_destroy(_Type, _Protection, _Path, _Options, _ReadOptions, _WriteOptions) -
impl_repair(_Type, _Protection, _Path, _Options, _ReadOptions, _WriteOptions) ->
?NIF_STUB.
impl_insert(_Nif, _Key, _Object) ->
impl_insert(_Impl, _Key, _Object) ->
?NIF_STUB.
impl_insert(_Nif, _List) ->
impl_insert(_Impl, _List) ->
?NIF_STUB.
impl_insert_new(_Nif, _Key, _Object) ->
impl_insert_new(_Impl, _Key, _Object) ->
?NIF_STUB.
impl_insert_new(_Nif, _List) ->
impl_insert_new(_Impl, _List) ->
?NIF_STUB.
impl_delete(_Nif) ->
impl_delete(_Impl) ->
?NIF_STUB.
impl_delete(_Nif, _Key) ->
impl_delete(_Impl, _Key) ->
?NIF_STUB.
impl_delete_all_objects(_Nif) ->
impl_delete_all_objects(_Impl) ->
?NIF_STUB.
impl_lookup(_Nif, _Key) ->
impl_lookup(_Impl, _Key) ->
?NIF_STUB.
impl_first(_Nif) ->
impl_first(_Impl) ->
?NIF_STUB.
impl_next(_Nif, _Key) ->
impl_next(_Impl, _Key) ->
?NIF_STUB.
impl_info_memory(_Nif) ->
impl_info_memory(_Impl) ->
?NIF_STUB.
impl_info_size(_Nif) ->
impl_info_size(_Impl) ->
?NIF_STUB.

View file

@ -30,6 +30,8 @@
%% lets
, open/0, open/1
, reopen/0, reopen/1
, destroy/0, destroy/1
, repair/0, repair/1
, close/1
, put/2, put/3
, delete/2, delete/3
@ -106,6 +108,40 @@ reopen(Options) ->
free_ptr(ErrPtr)
end.
destroy() ->
Options = leveldb:leveldb_options_create(),
try
destroy(Options)
after
leveldb:leveldb_options_destroy(Options)
end.
destroy(Options) ->
ErrPtr = errptr(),
try
leveldb:leveldb_destroy(Options, ?MODULE_STRING, ErrPtr),
read_errptr(ErrPtr)
after
free_ptr(ErrPtr)
end.
repair() ->
Options = leveldb:leveldb_options_create(),
try
repair(Options)
after
leveldb:leveldb_options_repair(Options)
end.
repair(Options) ->
ErrPtr = errptr(),
try
leveldb:leveldb_repair(Options, ?MODULE_STRING, ErrPtr),
read_errptr(ErrPtr)
after
free_ptr(ErrPtr)
end.
close(Db) ->
ok == leveldb:leveldb_close(Db).

View file

@ -30,6 +30,9 @@
-export([initial_state/0, state_is_sane/1, next_state/3, precondition/2, postcondition/3]).
-export([commands_setup/1, commands_teardown/1, commands_teardown/2]).
%% @TODO remove at time of db, db_read, db_write options testing
-compile(export_all).
%% @NOTE For boilerplate exports, see "qc_statem.hrl"
-include_lib("qc/include/qc_statem.hrl").
@ -75,16 +78,18 @@ command_gen(Mod,#state{parallel=true}=S) ->
serial_command_gen(_Mod,#state{tab=undefined, type=undefined, impl=undefined}=S) ->
{call,?IMPL,new,[?TAB,gen_options(new,S)]};
serial_command_gen(_Mod,#state{tab=undefined}=S) ->
{call,?IMPL,new,[undefined,?TAB,gen_options(new,S)]};
oneof([{call,?IMPL,new,[undefined,?TAB,gen_options(new,S)]}]
%% @TODO ++ [{call,?IMPL,destroy,[undefined,?TAB,gen_options(destroy,S)]}]
%% @TODO ++ [{call,?IMPL,repair,[undefined,?TAB,gen_options(repair,S)]}]
);
serial_command_gen(_Mod,#state{tab=Tab, type=Type}=S) ->
%% @TODO insert/3, insert_new/3, delete/3, delete_all_objs/2 write_gen_options
%% @TODO lookup/3 read_gen_options
oneof([{call,?IMPL,insert,[Tab,oneof([gen_obj(S),gen_objs(S)])]}]
++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type == ets]
%% @TODO ++ [{call,?IMPL,delete,[Tab]}]
++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type =:= ets]
++ [{call,?IMPL,delete,[Tab]}]
++ [{call,?IMPL,delete,[Tab,gen_key(S)]}]
++ [{call,?IMPL,delete_all_objs,[Tab]} || Type == ets]
++ [{call,?IMPL,delete_all_objs,[Tab]} || Type =:= ets]
++ [{call,?IMPL,lookup,[Tab,gen_key(S)]}]
++ [{call,?IMPL,first,[Tab]}]
++ [{call,?IMPL,next,[Tab,gen_key(S)]}]
@ -98,9 +103,9 @@ parallel_command_gen(_Mod,#state{tab=Tab, type=Type}=S) ->
%% @TODO insert/3, insert_new/3, delete_all_objs/2 write_gen_options
%% @TODO lookup/3 read_gen_options
oneof([{call,?IMPL,insert,[Tab,oneof([gen_obj(S),gen_objs(S)])]}]
++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type == ets]
++ [{call,?IMPL,insert_new,[Tab,oneof([gen_obj(S),gen_objs(S)])]} || Type =:= ets]
++ [{call,?IMPL,delete,[Tab,gen_key(S)]}]
++ [{call,?IMPL,delete_all_objs,[Tab]} || Type == ets]
++ [{call,?IMPL,delete_all_objs,[Tab]} || Type =:= ets]
++ [{call,?IMPL,lookup,[Tab,gen_key(S)]}]
++ [{call,?IMPL,first,[Tab]}]
++ [{call,?IMPL,next,[Tab,gen_key(S)]}]
@ -152,7 +157,7 @@ next_state(#state{tab=undefined}=S, V, {call,_,new,[_Tab,?TAB,Options]}) ->
end,
S#state{type=Type, impl=Impl, exists=true, tab=V};
next_state(#state{impl=Impl}=S, _V, {call,_,destroy,[_Tab,?TAB,_Options]})
when Impl /= ets ->
when Impl =/= ets ->
S#state{tab=undefined, exists=false, objs=[]};
next_state(S, _V, {call,_,insert,[_Tab,Objs]}) when is_list(Objs) ->
insert_objs(S, Objs);
@ -181,12 +186,18 @@ next_state(S, _V, {call,_,_,_}) ->
precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,new,[?TAB,Options]}) ->
L = proplists:get_value(db, Options, []),
proplists:get_bool(create_if_missing, L) andalso proplists:get_bool(error_if_exists, L);
precondition(#state{tab=Tab}, {call,_,new,[?TAB,_Options]}) ->
Tab =:= undefined;
precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,new,[_Tab,?TAB,Options]}) ->
L = proplists:get_value(db, Options, []),
proplists:get_bool(create_if_missing, L) andalso proplists:get_bool(error_if_exists, L);
precondition(#state{tab=Tab}, {call,_,new,[?TAB,_Options]}) when Tab /= undefined ->
precondition(#state{tab=Tab}, {call,_,new,[_Tab,?TAB,_Options]}) ->
Tab =:= undefined;
precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,new,[_Tab,?TAB,_Options]}) ->
false;
precondition(#state{tab=Tab}, {call,_,new,[_Tab,?TAB,_Options]}) when Tab /= undefined ->
precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,destroy,[_Tab,?TAB,_Options]}) ->
false;
precondition(#state{tab=undefined, type=undefined, impl=undefined}, {call,_,repair,[_Tab,?TAB,_Options]}) ->
false;
precondition(_S, {call,_,_,_}) ->
true.
@ -252,7 +263,7 @@ postcondition(#state{type=ordered_set}=S, {call,_,next,[_Tab, Key]}, Res) ->
Res =:= K
end;
postcondition(#state{type=set}=S, {call,_,tab2list,[_Tab]}, Res) ->
[] == (S#state.objs -- Res);
[] =:= (S#state.objs -- Res);
postcondition(#state{type=ordered_set}=S, {call,_,tab2list,[_Tab]}, Res) ->
sort(S) =:= Res;
postcondition(_S, {call,_,_,_}, _Res) ->
@ -281,28 +292,35 @@ gen_options(Op,#state{tab=undefined, type=undefined, impl=undefined}=S) ->
?LET({Type,Impl}, {gen_ets_type(), gen_ets_impl()},
gen_options(Op,S#state{type=Type, impl=Impl}));
gen_options(Op,#state{type=Type, impl=drv=Impl}=S) ->
[Type, public, named_table, {keypos,#obj.key}, Impl]
[Type, public, named_table, {keypos,#obj.key},
{compressed, gen_boolean()}, {async, gen_boolean()}, Impl]
++ gen_leveldb_options(Op,S);
gen_options(Op,#state{type=Type, impl=nif=Impl}=S) ->
[Type, public, named_table, {keypos,#obj.key}, Impl]
[Type, public, named_table, {keypos,#obj.key},
{compressed, gen_boolean()}, {async, gen_boolean()}, Impl]
++ gen_leveldb_options(Op,S);
gen_options(_Op,#state{type=Type, impl=ets=Impl}) ->
[Type, public, named_table, {keypos,#obj.key}, Impl].
[Type, public, named_table, {keypos,#obj.key},
{compressed, gen_boolean()}, Impl].
gen_leveldb_options(Op,S) ->
[gen_db_options(Op,S), gen_db_read_options(Op,S), gen_db_write_options(Op,S)].
gen_db_options(new,#state{exists=Exists}) ->
ExistsOptions = if Exists -> []; true -> [create_if_missing, error_if_exists] end,
?LET(Options, ulist(gen_db_options()), {db, Options ++ ExistsOptions});
%% @TODO ?LET(Options, ulist(gen_db_options()), {db, Options ++ ExistsOptions});
{db, ExistsOptions};
gen_db_options(_Op,_S) ->
?LET(Options, ulist(gen_db_options()), {db, Options}).
%% @TODO ?LET(Options, ulist(gen_db_options()), {db, Options}).
{db, []} .
gen_db_read_options(_Op,_S) ->
?LET(Options, ulist(gen_db_read_options()), {db_read, Options}).
%% @TODO ?LET(Options, ulist(gen_db_read_options()), {db_read, Options}).
{db_read, []}.
gen_db_write_options(_Op,_S) ->
?LET(Options, ulist(gen_db_write_options()), {db_write, Options}).
%% @TODO ?LET(Options, ulist(gen_db_write_options()), {db_write, Options}).
{db_write, []}.
gen_db_options() ->
oneof([paranoid_checks, {paranoid_checks,gen_boolean()}, {write_buffer_size,gen_pos_integer()}, {max_open_files,gen_pos_integer()}, {block_cache_size,gen_pos_integer()}, {block_size,gen_pos_integer()}, {block_restart_interval,gen_pos_integer()}]).
@ -412,7 +430,7 @@ keyfind(X, #state{objs=L}=S) ->
lists:filter(fun(#obj{key=K}) -> eq(X, K, S) end, L).
keymember(X, S) ->
[] /= keyfind(X, S).
[] =/= keyfind(X, S).
eq(X, Y, #state{type=set, impl=ets}) ->
X =:= Y;

View file

@ -66,7 +66,10 @@ command_gen(Mod,#state{parallel=true}=S) ->
serial_command_gen(_Mod,#state{db=undefined, exists=false}) ->
{call,?IMPL,open,[]};
serial_command_gen(_Mod,#state{db=undefined, exists=true}) ->
{call,?IMPL,reopen,[]};
oneof([{call,?IMPL,reopen,[]}
%% @TODO {call,?IMPL,destroy,[]},
%% @TODO {call,?IMPL,repair,[]}
]);
serial_command_gen(_Mod,#state{db=Db}=S) ->
oneof([{call,?IMPL,close,[Db]},
{call,?IMPL,put,[Db,gen_obj(S)]},
@ -100,6 +103,8 @@ next_state(#state{db=undefined, exists=false}=S, V, {call,_,open,[]}) ->
S#state{db=V, exists=true};
next_state(#state{db=undefined, exists=true}=S, V, {call,_,reopen,[]}) ->
S#state{db=V, exists=true};
next_state(#state{db=undefined, exists=true}=S, V, {call,_,destroy,[]}) ->
S#state{db=V, exists=false, objs=[]};
next_state(#state{db=Db}=S, _V, {call,_,close,[Db]}) when Db /= undefined ->
S#state{db=undefined};
next_state(S, _V, {call,_,put,[_Db,Obj]}) ->
@ -114,10 +119,18 @@ precondition(#state{exists=true}, {call,_,open,[]}) ->
false;
precondition(#state{exists=false}, {call,_,reopen,[]}) ->
false;
precondition(#state{exists=false}, {call,_,destroy,[]}) ->
false;
precondition(#state{exists=false}, {call,_,repair,[]}) ->
false;
precondition(#state{db=Db}, {call,_,open,[]}) when Db /= undefined->
false;
precondition(#state{db=Db}, {call,_,reopen,[]}) when Db /= undefined->
false;
precondition(#state{db=Db}, {call,_,destroy,[]}) when Db /= undefined->
false;
precondition(#state{db=Db}, {call,_,repair,[]}) when Db /= undefined->
false;
precondition(_S, {call,_,_,_}) ->
true.
@ -126,6 +139,10 @@ postcondition(#state{exists=false}, {call,_,open,[]}, Res) ->
?IMPL:is_db(Res);
postcondition(#state{exists=true}, {call,_,reopen,[]}, Res) ->
?IMPL:is_db(Res);
postcondition(#state{exists=true}, {call,_,destroy,[]}, Res) ->
Res;
postcondition(#state{exists=true}, {call,_,repair,[]}, Res) ->
Res;
postcondition(#state{db=Db}, {call,_,close,[_Db]}, Res) ->
Res andalso Db /= undefined;
postcondition(_S, {call,_,put,[_Db,_]}, Res) ->