stasis-aries-wal/lang/perl/Stasis.pm

655 lines
16 KiB
Perl
Raw Normal View History

2008-10-27 23:25:03 +00:00
package Stasis;
require Inline;
use strict;
use warnings;
2008-10-27 23:25:03 +00:00
my $STASIS_DIR;
my $initted = 0;
2008-10-27 23:25:03 +00:00
sub import {
my $modname = shift;
my $inline_dir = shift;
require Inline;
$STASIS_DIR = $INC{"Stasis.pm"};
$STASIS_DIR =~ s~/+lang/+perl/+Stasis.pm~~g;
1;
Inline->import (C => 'DATA' =>
LIBS => "-L$STASIS_DIR/build/src/stasis/ " .
"-lstasis -lpthread",
CCFLAGS => "-Wall -pedantic -Werror -std=c99 -DPERL_GCC_PEDANTIC",
INC => "-I $STASIS_DIR -I $STASIS_DIR/build/",
ENABLE => 'AUTOWRAP',
TYPEMAPS => "$STASIS_DIR/lang/perl/typemap",
PREFIX => 'stasis_perl_',
DIRECTORY => $inline_dir ||$ENV{STASIS_INLINE_DIRECTORY}
);
Inline->init;
}
2008-10-27 23:25:03 +00:00
sub version {
return "Stasis 0.1";
}
sub ThashCreate {
my $xid = shift;
my $rid = ThashCreateHelper($xid);
return bless $rid, 'Stasis::HashHeader';
}
sub Tinit {
$initted && die "Called Tinit() when Stasis was initted!\n";
$initted = 1;
return TinitHelper();
}
sub Tdeinit {
$initted || die "Called Tdeinit() when Stasis was not initted!\n";
$initted = 0;
return TdeinitHelper();
}
sub TbootstrapHash {
my $xid = Tbegin();
my $rid = ROOT_RID();
if(TrecordType($xid, $rid) == INVALID_SLOT()) {
$rid = Stasis::ThashCreate($xid);
}
Tcommit($xid);
return $rid;
}
sub open {
my $h = shift;
$initted || Tinit();
tie %$h, 'Stasis::Hash';
}
END {
if($initted) {
print STDERR "Cleanly shutting Stasis down..."; $| = 1;
Tdeinit();
print STDERR "done.\n";
}
}
package Stasis::Hash;
require Scalar::Util;
require Tie::Hash;
our @ISA = qw(Tie::Hash);
sub TIEHASH {
my $class = shift;
my $xid = shift ;
my $rid = shift || Stasis::ROOT_RID();
if(!defined($xid)) {
$xid = -1;
}
defined ($xid) || die "need xid to tie hash";
defined ($rid) || die "need rid to tie hash";
my $this = {
xid => $xid,
rid => $rid,
};
if($xid == -1) {
$this->{autoxact} = 1;
$this->{rid} = Stasis::TbootstrapHash();
$this->{xid} = Stasis::Tbegin();
}
return bless $this, $class;
}
sub getXid {
my $this = shift;
my $ret = $$this{xid};
defined $ret || ($ret = ${$$this{root}}{xid});
#warn "$ret ($$this{xid}) ($$this{root})\n";
return $ret;
}
sub setRoot {
my $this = shift;
my $root = shift;
#warn "setting root\n";
$$this{xid} = undef;
$$this{autoxact} = 1;
$$this{root} = ($$root{root} || $root);
#warn $$this{root}; # = ($$this{root} || $this);
#warn $this->getXid();
}
sub FETCH {
my $this = shift;
my $key = shift;
my $xid = $this->getXid();
my $sv = Stasis::ThashLookup($xid, $$this{rid}, $key);
if(Scalar::Util::blessed($sv)) {
if($sv->isa('Stasis::HashHeader')) {
my %h;
tie(%h, 'Stasis::Hash', $xid, $sv);
tied(%h)->setRoot($this);
return \%h;
} else {
die 'ThashLookup returned an object of unknown type';
}
} else {
return $sv;
}
}
sub STORE {
my $this = shift;
my $key = shift;
my $val = shift;
my $xid = $this->getXid();
if('HASH' eq ref($val)) {
if(Scalar::Util::blessed($val) && $val->isa('Stasis::Hash')) {
die "untested?";
my $obj = tied ($val); # tied returns the object backing the hash.
Stasis::ThashInsert($xid, $$this{rid}, $key, $$obj{rid});
die "untested?";
} else {
# Copy the hash into scratch space
my %h;
foreach my $k (keys %$val) {
$h{$k} = $$val{$k};
}
# Tie the hash that was passed to us
my $rid = Stasis::ThashCreate($xid);
tie %$val, 'Stasis::Hash', $xid, $rid;
# Copy the scratch space into the tied hash.
foreach my $k (keys %h) {
$$val{$k} = $h{$k}
}
# Insert the populated hashtable
Stasis::ThashInsert($xid, $$this{rid}, $key, $rid);
if($$this{autoxact}) {
tied(%$val)->setRoot($this);
}
}
} else {
# XXX assumes the value is a scalar.
Stasis::ThashInsert($xid, $$this{rid}, $key, $val);
}
}
sub DELETE {
my $this = shift;
my $key = shift;
my $xid = $this->getXid();
# This will leak hashes that were automatically created. Refcounts?
return Stasis::ThashRemove($xid, $$this{rid}, $key);
}
sub FIRSTKEY {
my $this = shift;
my $xid = $this->getXid();
$$this{it} = Stasis::ThashIterator($xid, $$this{rid});
if(Stasis::Titerator_next($xid, $$this{it})) {
return Stasis::Titerator_key($xid, $$this{it});
} else {
Stasis::Titerator_close($xid, $$this{it});
return;
}
}
sub NEXTKEY {
my $this = shift;
my $lastkey = shift;
my $xid = $this->getXid();
Stasis::Titerator_tupleDone($xid, $$this{it});
if(Stasis::Titerator_next($xid, $$this{it})) {
return Stasis::Titerator_key($xid, $$this{it});
} else {
Stasis::Titerator_close($xid, $$this{it});
return;
}
}
sub EXISTS {
my $this = shift;
my $key = shift;
warn "unimplemeted method 'EXISTS' called";
}
sub CLEAR {
my $this = shift;
warn "unimplemeted method 'CLEAR' called";
}
sub commit {
my $this = shift;
$this->{autoxact} || die 'commit() called on non-auto hash';
$this->{root} && die 'commit() called on non root';
Stasis::Tcommit($this->{xid});
$this->{xid} = Stasis::Tbegin();
}
sub abort {
my $this = shift;
$this->{autoxact} || die 'abort() called on non-auto hash';
$this->{root} && die 'abort() called on non root';
Stasis::Tabort($this->{xid});
$this->{xid} = Stasis::Tbegin();
}
package Stasis::Recordid; # Silence warning about "can't locate package Stasis::Recordid for @Stasis::HashHeader::ISA"
package Stasis::HashHeader;
our @ISA = qw(Stasis::Recordid);
package Stasis;
1;
2008-10-27 23:25:03 +00:00
__DATA__
__C__
#include "stasis/transactional.h"
int TinitHelper() {
return Tinit();
}
int TdeinitHelper() {
return Tdeinit();
}
2008-10-27 23:25:03 +00:00
int Tbegin();
int Tcommit(int xid);
int TsoftCommit(int xid);
void TforceCommits();
2008-10-27 23:25:03 +00:00
int Tabort(int xid);
int Tprepare(int xid);
recordid Talloc(int xid, unsigned long size);
int TrecordType(int xid, recordid rid);
int TrecordSize(int xid, recordid rid);
static recordid recordid_SV(SV* sv) {
if(!(SvROK(sv) && SvTYPE(SvRV(sv)) == SVt_PVAV)) {
printf("recordid_SV fail 1\n");
2008-10-27 23:25:03 +00:00
abort();
}
if (!(sv_isobject(sv) && sv_derived_from(sv, "Stasis::Recordid"))) {
printf("SV is not a recordid\n");
2008-10-27 23:25:03 +00:00
abort();
}
AV * av = (AV*)SvRV(sv);
if(av_len(av)+1 != 2) {
printf("recordid_SV fail 3\n");
2008-10-27 23:25:03 +00:00
abort();
}
SV ** pageSV = av_fetch(av, 0, 0);
SV ** slotSV = av_fetch(av, 1, 0);
if(!(pageSV && slotSV &&
*pageSV && *slotSV &&
SvIOK(*pageSV) && SvIOK(*slotSV))) {
printf("recordid_SV fail 4\n");
2008-10-27 23:25:03 +00:00
abort();
}
recordid rid = {
(pageid_t) SvUV(*pageSV),
(slotid_t) SvUV(*slotSV)
};
return rid;
}
static SV* SV_recordid(recordid rid) {
SV* sv;
if(!memcmp(&rid, &NULLRID, sizeof(NULLRID))) {
sv = newSV(0);
} else {
AV* arry = newAV();
av_push(arry, newSVuv((UV) rid.page));
av_push(arry, newSVuv((UV) rid.slot));
sv = newRV_noinc((SV*)arry);
}
return sv_bless(sv, gv_stashpv("Stasis::Recordid", GV_ADD));
}
static byte * bytes_SV(SV* sv, STRLEN * sz) {
// XXX check for hashes, arrays?
2008-10-27 23:25:03 +00:00
byte * ret = 0;
IV valI;
NV valN;
byte* valP;
2008-10-27 23:25:03 +00:00
byte * tmp;
recordid valR;
char code;
if(SvIOK(sv)) {
// signed int, machine length
valI = SvIV(sv);
*sz = (STRLEN)sizeof(IV);
2008-10-27 23:25:03 +00:00
tmp = (byte*)&valI;
code = 'I';
} else if (SvNOK(sv)) {
valN = SvNV(sv); // double
*sz = sizeof(NV);
tmp = (byte*)&valN;
code = 'N';
} else if (SvPOK(sv)) {
valP = (byte*)SvPV(sv,*sz); // string
tmp = valP;
code = 'P';
} else if (sv_isobject(sv) && sv_isa(sv, "Stasis::HashHeader")) {
2008-10-27 23:25:03 +00:00
valR = recordid_SV(sv);
*sz = sizeof(recordid);
tmp = (byte*)&valR;
code = 'H';
} else if (sv_isobject(sv) && sv_isa(sv, "Stasis::Recordid")) {
valR = recordid_SV(sv);
*sz = sizeof(recordid);
tmp = (byte*)&valR;
code = 'R';
} else if (sv_isobject(sv)) {
printf("Stasis.pm: Encountered unsupported object\n");
abort();
2008-10-27 23:25:03 +00:00
} else {
printf("Stasis.pm: Encountered unsupported SV\n");
2008-10-27 23:25:03 +00:00
abort();
}
// make room for code byte
*sz = *sz+1;
if(code == 'P') {
// append null
(*sz)++;
ret = malloc(*sz);
memcpy(ret,tmp, (*sz)-2);
ret[(*sz)-2] = '\0';
ret[(*sz)-1] = code;
} else {
ret = malloc(*sz);
memcpy(ret, tmp, (*sz)-1);
ret[(*sz)-1] = code;
}
return ret;
}
static SV * SV_bytes(byte* bytes, STRLEN sz) {
SV * ret;
char code = bytes[sz-1];
switch(code) {
case 'I': {
assert(sz-1 == sizeof(IV));
ret = newSViv(*(IV*)bytes);
} break;
case 'N': {
assert(sz-1 == sizeof(NV));
ret = newSVnv(*(NV*)bytes);
} break;
case 'P': {
ret = newSVpvn((const char*)bytes,sz-2);
2008-10-27 23:25:03 +00:00
} break;
case 'R': {
assert(sz-1 == sizeof(recordid));
ret = SV_recordid(*(recordid*)bytes);
} break;
case 'H': {
assert(sz-1 == sizeof(recordid));
ret = SV_recordid(*(recordid*)bytes);
ret = sv_bless(ret, gv_stashpv("Stasis::HashHeader", GV_ADD));
} break;
2008-10-27 23:25:03 +00:00
default: {
abort();
}
}
return ret;
}
/** Records */
2008-10-27 23:25:03 +00:00
recordid TallocScalar(int xid, SV* sv) {
STRLEN sz;
byte * buf = bytes_SV(sv, &sz);
free(buf);
return Talloc(xid, sz);
}
int stasis_perl_Tset(int xid, recordid rid, SV * sv) {
STRLEN sz;
byte * buf = bytes_SV(sv, &sz);
rid.size = sz;
int ret = Tset(xid, rid, buf);
free(buf);
return ret;
}
SV* stasis_perl_Tread(int xid, recordid rid) {
rid.size = TrecordSize(xid, rid);
byte * buf = malloc(rid.size);
2008-10-27 23:25:03 +00:00
Tread(xid, rid, buf);
SV* ret = SV_bytes(buf, rid.size);
free(buf);
return ret;
}
int stasis_perl_TsetRecordid(int xid, recordid rid, recordid buf) {
rid.size = sizeof(buf);
return Tset(xid, rid, &buf);
}
recordid stasis_perl_TreadRecordid(int xid, recordid rid) {
recordid buf;
Tread(xid, rid, &buf);
return buf;
}
/** Hash table */
recordid stasis_perl_ThashCreateHelper(int xid) {
2008-10-27 23:25:03 +00:00
return ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
}
int stasis_perl_ThashInsert(int xid, recordid hash, SV * key, SV * val) {
STRLEN key_len, val_len;
byte * keyb = bytes_SV(key,&key_len);
byte * valb = bytes_SV(val,&val_len);
int ret = ThashInsert(xid, hash, keyb, key_len, valb, val_len);
free(keyb);
free(valb);
return ret;
}
int stasis_perl_ThashRemove(int xid, recordid hash, SV * key) {
STRLEN key_len;
byte * keyb = bytes_SV(key,&key_len);
int ret = ThashRemove(xid, hash, keyb, key_len);
free(keyb);
return ret;
}
SV* stasis_perl_ThashLookup(int xid, recordid hash, SV * key) {
STRLEN key_len;
byte* keyb = bytes_SV(key, &key_len);
byte* valb;
int val_len = ThashLookup(xid, hash, keyb, key_len, &valb);
free(keyb);
if(val_len != -1) {
SV* ret = SV_bytes(valb, val_len);
free(valb);
return ret;
} else {
2009-12-10 02:38:31 +00:00
return &PL_sv_undef;
2008-10-27 23:25:03 +00:00
}
}
void * stasis_perl_ThashIterator(int xid, recordid hash) {
return ThashGenericIterator(xid, hash);
}
/** Arrays */
recordid stasis_perl_TarrayList_alloc(int xid, SV* exemplar) {
byte * bytes;
size_t sz;
bytes = bytes_SV(exemplar, &sz);
return TarrayListAlloc(xid, 4, 2, sz);
free(bytes);
}
int stasis_perl_TarrayList_extend(int xid, recordid rid, int slots) {
return TarrayListExtend(xid, rid, slots);
}
int stasis_perl_TarrayList_length(int xid, recordid rid) {
return TarrayListLength(xid, rid);
}
/** Iterators */
int stasis_perl_Titerator_next(int xid, void *it) {
return Titerator_next(xid, it);
}
SV* stasis_perl_Titerator_key(int xid, void *it) {
byte * bytes;
STRLEN sz = Titerator_key(xid, it, &bytes);
return SV_bytes(bytes, sz);
}
SV* stasis_perl_Titerator_value(int xid, void *it) {
byte * bytes;
STRLEN sz = Titerator_value(xid, it, &bytes);
return SV_bytes(bytes, sz);
}
void stasis_perl_Titerator_tupleDone(int xid, void *it) {
Titerator_tupleDone(xid, it);
}
void stasis_perl_Titerator_close(int xid, void *it) {
Titerator_close(xid, it);
}
2008-10-27 23:25:03 +00:00
SV* stasis_perl_ROOT_RID() {
return SV_recordid(ROOT_RECORD);
}
SV* stasis_perl_NULL_RID() {
return SV_recordid(NULLRID);
}
int stasis_perl_INVALID_SLOT() {
return INVALID_SLOT;
}
__END__
=head1 NAME
Stasis - Flexible transactional storage
=head1 SYNOPSIS
use Stasis;
my %h;
Stasis::open(\%h);
$h{foo} = 'bar';
# Create an anonymous hash and insert it into h
$h{bat}{bam} = 'boom';
tied(%h)->commit();
$h{bar} = 'bad update';
tied(%h)->abort();
defined $h{bar} && die;
=head1 DESCRIPTION
Stasis is a lightweight transactional storage library. This perl
module provides bindings for Stasis' C functions (Tinit(),
ThashInsert(), etc), and higher-level interfaces based upon tied perl
hashes.
=head2 Programming Style
The synopsis describes Stasis.pm's high level interface. Lower-level
interactions with Stasis are possible as well:
use Stasis;
Stasis::Tinit(); # Initialize Stasis
Stasis::TbootstrapHash(); # Open or bootstrap a stasis database
# Bootstrapping arranges for a hash to live in ROOT_RECORD
my $rid = STASIS::ROOT_RECORD();
my $xid = Stasis::Tbegin(); # Start new transaction
# Insert a value into the hash
Stasis::ThashInsert($xid, $rid, "foo", "bar");
# Lookup the value
my $bar = Stasis::ThashLookup($xid, $rid, "foo", "bar");
Stasis::Tcommit($xid);
$xid = Stasis::Tbegin();
# This update will not be reflected after abort.
Stasis::ThashRemove($xid, $rid, "foo", "bar");
Stasis::Tabort($xid);
#Deinitialize Stasis (Called automatically at shutdown if needed)
Stasis::Tdeinit();
Stasis supports a wide range of other data structures (including
arrays, records, large objects and trees), which are somewhat
supported by Stasis.pm. These bindings are a work in progress;
consult the source code for a list of currently implemented methods.
Note that Stasis (and this module) are thread safe. However, Stasis
does not perform lock management. Refer to the Stasis documentation
for more information before attempting to make use of concurrent (even
if single threaded) transactions.
=head1 CAVEATS AND BUGS
=head2 No garbage collection
Nested hashes are not garbage collected. Therefore, the following
code leaks storage:
my %h;
Stasis::open(\%h);
$h{a}{b} = 1; # Automatically instantiates a hash.
delete $h{a}; # The automatically created hash is now unreachable.
tied(%h)->commit(); # However, abort() would have reclaimed the space
=head2 Small hash performance
Stasis is currently tuned for small numbers of large hashes.
Its hashtable implementation would be more efficient if it included
special cases for small indexes, then dynamically switched to the
current layout for large data sets.
=head2 No tied arrays
Stasis provides an array type, but this module does not export them to
perl as tied arrays. Instead, it includes a partial (and untested)
set of C-style bindings. Support for push() and pop() should be
straightforward. However, Stasis' array implementation does not
currently provide anything analogous to shift() and unshift(). Like
the hashtables, Stasis' arrays are tuned for large data sets.
=head2 Type safety / reflection
Stasis records include a 'type' field that allows 'special' data to be
distinguished from 'normal' slots (eg: application data). Hashtables
do not use this feature, so it is possible to attempt to access
headers as scalar values. This will likely fail by crashing the
process. Similarly, the recordids returned by Stasis are blessed
arrays. Tampering with their contents, then attempting to dereference
them will likely lead to crashes and other trouble. These problems
should not affect "well-written" code.
=head2 This documentation is incomplete
See the source for a complete list of exported Stasis functions.
=head2 This module is a work in progress
Expect API instability. Also, note that many Stasis functions are not
yet exported to perl.