Lots more work on getting basic system fleshed out. Basic structures in place for tracking open databases.

This commit is contained in:
Dave Smith 2008-12-06 22:08:36 -07:00
parent f1c52ba1c9
commit 20f872cdef
6 changed files with 415 additions and 57 deletions

View file

@ -1,7 +1,7 @@
load "base.rake"
C_SRCS = FileList["c_src/bdberl_drv.c"]
C_OBJS = FileList["c_src/bdberl_drv.o"]
C_SRCS = FileList["c_src/*.c"]
C_OBJS = C_SRCS.pathmap("%X.o")
CLEAN.include %w( c_src/*.o priv/*.so )
CLOBBER.include %w( c_src/system )
@ -15,17 +15,21 @@ file DB_LIB do
sh "cd c_src && ./buildlib.sh 2>&1"
end
file DRIVER do
file DRIVER => [:compile_c] do
puts "linking priv/#{DRIVER}..."
sh "gcc #{erts_link_cflags()} c_src/*.o c_src/system/lib/libdb-*.a -o #{DRIVER}", :verbose => false
end
rule ".o" => ["%X.c"] do |t|
rule ".o" => ["%X.c", "%X.h"] do |t|
puts "compiling #{t.source}..."
sh "gcc -c -Wall -Werror -Ic_src/system/include -I#{erts_dir()}/include #{t.source} -o #{t.name}", :verbose => false
end
task :compile_c => ['c_src'] + C_OBJS
task :compile => [DB_LIB, :compile_c, DRIVER]
task :compile => [DB_LIB, DRIVER]
task :test do
run_tests "test"
end

View file

@ -136,7 +136,7 @@ def run_tests(dir, rest = "")
-noshell -s ct_run script_start -s erlang halt \
#{get_cover(dir)} \
#{get_suites(dir)} -logdir #{dir}/logs -env TEST_DIR #{PWD}/#{dir} \
#{rest}"
#{rest}", :verbose => false
fail if $?.exitstatus != 0 && !ENV["stop_on_fail"].nil?

View file

@ -9,70 +9,87 @@
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include "erl_driver.h"
#include "db.h"
#include "hive_hash.h"
#include "bdberl_drv.h"
/**
* Driver functions
* Global instance of DB_ENV; only a single one exists per O/S process.
*/
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer);
static void bdberl_drv_stop(ErlDrvData handle);
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
char* inbuf, int inbuf_sz,
char** outbuf, int outbuf_sz);
static void bdberl_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data);
static DB_ENV* G_DB_ENV;
/**
* Command codes
* Global variable to track the return code from opening the DB_ENV. We track this
* value so as to provide a useful error code when the user attempts to open the
* port and it fails due to an error that occurred when opening the environment.
*/
#define CMD_OPEN_DB 0
#define CMD_CLOSE_DB 1
#define CMD_GET 2
#define CMD_PUT 3
static int G_DB_ENV_ERROR;
/**
* Driver Entry
*/
ErlDrvEntry bdberl_drv_entry =
{
NULL, /* F_PTR init, N/A */
bdberl_drv_start, /* L_PTR start, called when port is opened */
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
NULL, /* F_PTR output, called when erlang has sent */
NULL, /* F_PTR ready_input, called when input descriptor ready */
NULL, /* F_PTR ready_output, called when output descriptor ready */
"bdberl_drv", /* driver_name */
NULL, /* F_PTR finish, called when unloaded */
NULL, /* handle */
bdberl_drv_control, /* F_PTR control, port_command callback */
NULL, /* F_PTR timeout, reserved */
NULL, /* F_PTR outputv, reserved */
bdberl_ready_async, /* F_PTR ready_async */
NULL, /* F_PTR flush */
NULL, /* F_PTR call */
NULL, /* F_PTR event */
ERL_DRV_EXTENDED_MARKER,
ERL_DRV_EXTENDED_MAJOR_VERSION,
ERL_DRV_EXTENDED_MINOR_VERSION,
ERL_DRV_FLAG_USE_PORT_LOCKING,
NULL, /* Reserved */
NULL /* F_PTR process_exit */
};
/**
* Structure for holding port instance data
* G_DATABASES is a global array of Database structs. Used to track currently opened DB*
* handles and ensure that they get cleaned up when all ports which were using them exit or
* explicitly close them.
*
* This array is allocated when the driver is first initialized and does not grow/shrink
* dynamically. G_DATABASES_SIZE contains the size of the array. G_DATABASES_NAMES is a hash of
* filenames to array index for an opened Database.
*
* All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock
* G_DATABASES_RWLOCK.
*/
typedef struct
{
ErlDrvPort port;
} PortData;
static Database* G_DATABASES;
static int G_DATABASES_SIZE;
static ErlDrvRWLock* G_DATABASES_RWLOCK;
static hive_hash* G_DATABASES_NAMES;
DRIVER_INIT(bdberl_drv)
{
// Setup flags we'll use to init the environment
int flags =
DB_INIT_LOCK | /* Enable support for locking */
DB_INIT_TXN | /* Enable support for transactions */
DB_INIT_MPOOL | /* Enable support for memory pools */
DB_RECOVER | /* Enable support for recovering from failures */
DB_CREATE | /* Create files as necessary */
DB_REGISTER | /* Run recovery if needed */
DB_THREAD; /* Make the environment free-threaded */
// Initialize global environment -- use environment variable DB_HOME to
// specify where the working directory is
db_env_create(&G_DB_ENV, 0);
G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0);
if (G_DB_ENV_ERROR == 0)
{
// Use the BDBERL_MAX_DBS environment value to determine the max # of
// databases to permit the VM to open at once. Defaults to 1024.
G_DATABASES_SIZE = 1024;
char* max_dbs_str = getenv("BDBERL_MAX_DBS");
if (max_dbs_str != 0)
{
G_DATABASES_SIZE = atoi(max_dbs_str);
if (G_DATABASES_SIZE <= 0)
{
G_DATABASES_SIZE = 1024;
}
}
// BDB is setup -- allocate structures for tracking databases
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK");
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
}
else
{
// Something bad happened while initializing BDB; in this situation we
// cleanup and set the environment to zero. Attempts to open ports will
// fail and the user will have to sort out how to resolve the issue.
G_DB_ENV->close(G_DB_ENV, 0);
G_DB_ENV = 0;
}
return &bdberl_drv_entry;
}
@ -111,6 +128,10 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
return 0;
}
static void bdberl_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data)
static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data)
{
}
static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor)
{
}

111
c_src/bdberl_drv.h Normal file
View file

@ -0,0 +1,111 @@
/* -------------------------------------------------------------------
*
* bdberl: Berkeley DB Driver for Erlang
* Copyright (c) 2008 The Hive. All rights reserved.
*
* ------------------------------------------------------------------- */
#ifndef _BDBERL_DRV
#define _BDBERL_DRV
#include "erl_driver.h"
#include "db.h"
/**
* Driver functions
*/
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer);
static void bdberl_drv_stop(ErlDrvData handle);
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
char* inbuf, int inbuf_sz,
char** outbuf, int outbuf_sz);
static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data);
static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor);
/**
* Command codes
*/
#define CMD_OPEN_DB 0
#define CMD_CLOSE_DB 1
#define CMD_TXN_BEGIN 2
#define CMD_TXN_COMMIT 3
#define CMD_TXN_ABORT 4
#define CMD_GET 5
#define CMD_PUT 6
#define CMD_PUT_ATOMIC 7
/**
* Driver Entry
*/
ErlDrvEntry bdberl_drv_entry =
{
NULL, /* F_PTR init, N/A */
bdberl_drv_start, /* L_PTR start, called when port is opened */
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
NULL, /* F_PTR output, called when erlang has sent */
NULL, /* F_PTR ready_input, called when input descriptor ready */
NULL, /* F_PTR ready_output, called when output descriptor ready */
"bdberl_drv", /* driver_name */
NULL, /* F_PTR finish, called when unloaded */
NULL, /* handle */
bdberl_drv_control, /* F_PTR control, port_command callback */
NULL, /* F_PTR timeout, reserved */
NULL, /* F_PTR outputv, reserved */
bdberl_drv_ready_async, /* F_PTR ready_async */
NULL, /* F_PTR flush */
NULL, /* F_PTR call */
NULL, /* F_PTR event */
ERL_DRV_EXTENDED_MARKER,
ERL_DRV_EXTENDED_MAJOR_VERSION,
ERL_DRV_EXTENDED_MINOR_VERSION,
ERL_DRV_FLAG_USE_PORT_LOCKING,
NULL, /* Reserved */
bdberl_drv_process_exit /* F_PTR process_exit */
};
typedef struct
{
unsigned int dbref;
struct DbRefList* next;
} DbRefList;
typedef struct
{
ErlDrvPort port;
struct PortList* next;
} PortList;
typedef struct
{
DB* db;
const char* name;
PortList* ports;
} Database;
/**
* Structure for holding port instance data
*/
typedef struct
{
ErlDrvPort port;
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
* active */
int in_flight; /* Flag indicating if this port has an operation pending on the async
* pool. */
DbRefList* dbrefs; /* List of databases that this port has opened */
} PortData;
#endif

188
c_src/hive_hash.c Normal file
View file

@ -0,0 +1,188 @@
/* Copyright 2006 David Crawshaw, released under the new BSD license.
* Version 2, from http://www.zentus.com/c/hash.html */
/* Changed from just "hash" to "hive_hash" to reduce collisions when linked
* in with erlang
*
* Dave Smith (dsmith@thehive.com) 12/08
*/
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include "hive_hash.h"
/* Table is sized by primes to minimise clustering.
See: http://planetmath.org/encyclopedia/GoodHashTablePrimes.html */
static const unsigned int sizes[] = {
53, 97, 193, 389, 769, 1543, 3079, 6151, 12289, 24593, 49157, 98317,
196613, 393241, 786433, 1572869, 3145739, 6291469, 12582917, 25165843,
50331653, 100663319, 201326611, 402653189, 805306457, 1610612741
};
static const int sizes_count = sizeof(sizes) / sizeof(sizes[0]);
static const float load_factor = 0.65;
struct record {
unsigned int hash;
const char *key;
void *value;
};
struct hive_hash {
struct record *records;
unsigned int records_count;
unsigned int size_index;
};
static int hive_hash_grow(hive_hash *h)
{
int i;
struct record *old_recs;
unsigned int old_recs_length;
old_recs_length = sizes[h->size_index];
old_recs = h->records;
if (h->size_index == sizes_count - 1) return -1;
if ((h->records = calloc(sizes[++h->size_index],
sizeof(struct record))) == NULL) {
h->records = old_recs;
return -1;
}
h->records_count = 0;
// rehash table
for (i=0; i < old_recs_length; i++)
if (old_recs[i].hash && old_recs[i].key)
hive_hash_add(h, old_recs[i].key, old_recs[i].value);
free(old_recs);
return 0;
}
/* algorithm djb2 */
static unsigned int strhash(const char *str)
{
int c;
int hash = 5381;
while ((c = *str++))
hash = hash * 33 + c;
return hash == 0 ? 1 : hash;
}
hive_hash * hive_hash_new(unsigned int capacity) {
struct hive_hash *h;
int i, sind;
capacity /= load_factor;
for (i=0; i < sizes_count; i++)
if (sizes[i] > capacity) { sind = i; break; }
if ((h = malloc(sizeof(struct hive_hash))) == NULL) return NULL;
if ((h->records = calloc(sizes[sind], sizeof(struct record))) == NULL) {
free(h);
return NULL;
}
h->records_count = 0;
h->size_index = sind;
return h;
}
void hive_hash_destroy(hive_hash *h)
{
free(h->records);
free(h);
}
int hive_hash_add(hive_hash *h, const char *key, void *value)
{
struct record *recs;
int rc;
unsigned int off, ind, size, code;
if (key == NULL || *key == '\0') return -2;
if (h->records_count > sizes[h->size_index] * load_factor) {
rc = hive_hash_grow(h);
if (rc) return rc;
}
code = strhash(key);
recs = h->records;
size = sizes[h->size_index];
ind = code % size;
off = 0;
while (recs[ind].key)
ind = (code + (int)pow(++off,2)) % size;
recs[ind].hash = code;
recs[ind].key = key;
recs[ind].value = value;
h->records_count++;
return 0;
}
void * hive_hash_get(hive_hash *h, const char *key)
{
struct record *recs;
unsigned int off, ind, size;
unsigned int code = strhash(key);
recs = h->records;
size = sizes[h->size_index];
ind = code % size;
off = 0;
// search on hash which remains even if a record has been removed,
// so hash_remove() does not need to move any collision records
while (recs[ind].hash) {
if ((code == recs[ind].hash) && recs[ind].key &&
strcmp(key, recs[ind].key) == 0)
return recs[ind].value;
ind = (code + (int)pow(++off,2)) % size;
}
return NULL;
}
void * hive_hash_remove(hive_hash *h, const char *key)
{
unsigned int code = strhash(key);
struct record *recs;
void * value;
unsigned int off, ind, size;
recs = h->records;
size = sizes[h->size_index];
ind = code % size;
off = 0;
while (recs[ind].hash) {
if ((code == recs[ind].hash) && recs[ind].key &&
strcmp(key, recs[ind].key) == 0) {
// do not erase hash, so probes for collisions succeed
value = recs[ind].value;
recs[ind].key = 0;
recs[ind].value = 0;
h->records_count--;
return value;
}
ind = (code + (int)pow(++off, 2)) % size;
}
return NULL;
}
unsigned int hive_hash_size(hive_hash *h)
{
return h->records_count;
}

34
c_src/hive_hash.h Normal file
View file

@ -0,0 +1,34 @@
/* Copyright 2006 David Crawshaw, released under the new BSD license.
* Version 2, from http://www.zentus.com/c/hash.html */
/* Changed from just "hash" to "hive_hash" to reduce collisions when linked
* in with erlang
*
* Dave Smith (dsmith@thehive.com) 12/08
*/
#ifndef __HIVE_HASH__
#define __HIVE_HASH__
/* Opaque structure used to represent hashtable. */
typedef struct hive_hash hive_hash;
/* Create new hashtable. */
hive_hash * hive_hash_new(unsigned int size);
/* Free hashtable. */
void hive_hash_destroy(hive_hash *h);
/* Add key/value pair. Returns non-zero value on error (eg out-of-memory). */
int hive_hash_add(hive_hash *h, const char *key, void *value);
/* Return value matching given key. */
void * hive_hash_get(hive_hash *h, const char *key);
/* Remove key from table, returning value. */
void * hive_hash_remove(hive_hash *h, const char *key);
/* Returns total number of keys in the hashtable. */
unsigned int hive_hash_size(hive_hash *h);
#endif