- improvement to error handling in pobj: now properly handles resource

allocation errors (at least most cases...?), including hash/queue
  library calls.
This commit is contained in:
Gilad Arnold 2005-02-08 00:54:30 +00:00
parent c94bee33a3
commit eec80a0e38
2 changed files with 288 additions and 151 deletions

View file

@ -90,7 +90,7 @@ struct pobj_repo_list {
struct pobj_repo_list_item *list;
};
static struct pobj_repo_list *g_pobj_repo_list;
static struct pobj_repo_list *g_pobj_repo_list = NULL;
static int g_pobj_repo_list_max;
static recordid g_pobj_last_seg_rid;
struct repo_seg g_pobj_last_seg;
@ -111,7 +111,7 @@ struct static_repo_list {
struct static_repo_list_item *list;
};
static struct static_repo_list *g_static_repo_list;
static struct static_repo_list *g_static_repo_list = NULL;
static int g_static_repo_list_max;
static recordid g_static_last_seg_rid;
struct repo_seg g_static_last_seg;
@ -1422,36 +1422,6 @@ pobj_static_update_ref (void *static_tmp_ptr)
static void
pobj_restore (void *objid, size_t size, recordid *rid,
struct hash *pobj_convert_hash, int xid)
{
struct pobj *p;
void *obj;
debug_start ();
debug ("restoring objid=%p size=%d (%d) rid={%d,%d,%ld}",
objid, size, POBJ_SIZE (size), rid->page, rid->slot, rid->size);
/* Allocate memory. */
p = (struct pobj *) g_memfunc.malloc (POBJ_SIZE (size));
obj = POBJ2OBJ (p);
debug ("new object allocated at %p (%p)", obj, (void *) p);
/* Read object from backing store. */
Tread (xid, *rid, p);
debug ("inserting %p->%p conversion pair",
objid, obj);
/* Insert old/new objid pair to conversion table. */
hash_insert (pobj_convert_hash, OBJ2KEY (objid), (unsigned long) obj);
debug_end ();
}
static void *
pobj_adjust (void *old_objid, struct hash *pobj_convert_hash, int xid)
{
@ -1554,12 +1524,15 @@ pobj_boot_restore (int xid)
{
struct hash *pobj_convert_hash;
recordid tmp_rid;
void *new_objid;
struct pobj *p;
void *objid, *new_objid;
size_t size;
int i = 0, j;
int changed = 0;
int count;
struct pobj_repo_list_item *pobj_slot;
struct static_repo_list_item *static_slot;
int adjusted = 0;
int ret = 0;
debug_start ();
@ -1591,6 +1564,7 @@ pobj_boot_restore (int xid)
while (i-- > 0)
XFREE (g_pobj_repo_list[i].list);
XFREE (g_pobj_repo_list);
g_pobj_repo_list = NULL;
}
debug_end ();
return -1;
@ -1622,110 +1596,236 @@ pobj_boot_restore (int xid)
* Reconstruct heap objects.
*/
debug ("reconstructing objects and forming conversion table...");
pobj_convert_hash = hash_new (CONVERT_HASH_NBUCKEXP);
for (count = 0, i = g_pobj_repo.occupied_head; i >= 0;
count++, i = pobj_slot->next_index)
{
pobj_slot = g_pobj_repo_list[POBJ_REPO_SEG (i)].list + POBJ_REPO_OFF (i);
pobj_restore (pobj_slot->objid, pobj_slot->size, &pobj_slot->rid,
pobj_convert_hash, xid);
if (! (pobj_convert_hash = hash_new (CONVERT_HASH_NBUCKEXP))) {
debug ("error: hash initialization failed");
debug ("...interrupted");
ret = -1;
}
else {
for (count = 0, i = g_pobj_repo.occupied_head; i >= 0;
count++, i = pobj_slot->next_index)
{
pobj_slot = g_pobj_repo_list[POBJ_REPO_SEG (i)].list
+ POBJ_REPO_OFF (i);
objid = pobj_slot->objid;
size = pobj_slot->size;
debug ("restoring objid=%p size=%d (%d) rid={%d,%d,%ld}",
objid, size, POBJ_SIZE (size), pobj_slot->rid.page,
pobj_slot->rid.slot, pobj_slot->rid.size);
/* Allocate memory. */
p = (struct pobj *) g_memfunc.malloc (POBJ_SIZE (size));
if (! p) {
debug ("error :object allocation failed");
ret = -1;
}
else {
new_objid = POBJ2OBJ (p);
debug ("new object allocated at %p (%p)", new_objid, (void *) p);
/* Read object from backing store. */
Tread (xid, pobj_slot->rid, p);
debug ("inserting %p->%p conversion pair",
objid, new_objid);
/* Insert old/new objid pair to conversion table. */
if (hash_insert (pobj_convert_hash, OBJ2KEY (objid),
(unsigned long) new_objid) < 0) {
debug ("error: hash insertion failed");
ret = -1;
}
}
if (ret < 0) {
if (p)
g_memfunc.free (p);
break;
}
}
if (ret < 0)
debug ("...interrupted (%d object reconstructed)", count);
else
debug ("...done (%d objects reconstructed)", count);
}
debug ("...done (%d objects reconstructed)", count);
/*
* Restore static references repository.
*/
debug ("restoring static references repository...");
if (! ret) {
debug ("restoring static references repository...");
debug ("restoring repository control block");
Tread (xid, g_boot.static_repo_rid, &g_static_repo);
debug ("restoring repository control block");
Tread (xid, g_boot.static_repo_rid, &g_static_repo);
debug ("allocating memory for %d segments", g_static_repo.nseg);
g_static_repo_list = (struct static_repo_list *)
XMALLOC (sizeof (struct static_repo_list) * g_static_repo.nseg);
if (g_static_repo_list)
for (i = 0; i < g_static_repo.nseg; i++) {
g_static_repo_list[i].list = (struct static_repo_list_item *)
XMALLOC (sizeof (struct static_repo_list_item) * STATIC_REPO_SEG_MAX);
if (! g_static_repo_list[i].list)
break;
debug ("allocating memory for %d segments", g_static_repo.nseg);
g_static_repo_list = (struct static_repo_list *)
XMALLOC (sizeof (struct static_repo_list) * g_static_repo.nseg);
if (g_static_repo_list)
for (i = 0; i < g_static_repo.nseg; i++) {
g_static_repo_list[i].list = (struct static_repo_list_item *)
XMALLOC (sizeof (struct static_repo_list_item) * STATIC_REPO_SEG_MAX);
if (! g_static_repo_list[i].list)
break;
}
if (! (g_static_repo_list && i == g_static_repo.nseg)) {
debug ("error: allocation failed");
if (g_static_repo_list) {
while (i-- > 0)
XFREE (g_static_repo_list[i].list);
XFREE (g_static_repo_list);
g_static_repo_list = NULL;
}
debug ("...interrupted");
ret = -1;
}
if (! (g_static_repo_list && i == g_static_repo.nseg)) {
debug ("error: allocation failed");
if (g_static_repo_list) {
while (i-- > 0)
XFREE (g_static_repo_list[i].list);
XFREE (g_static_repo_list);
else {
debug ("reading list segments...");
g_static_last_seg_rid = g_static_repo.first_seg_rid;
g_static_repo_list_max = 0;
for (i = 0; i < g_static_repo.nseg; i++) {
Tread (xid, g_static_last_seg_rid, &g_static_last_seg);
g_static_repo_list[i].rid = g_static_last_seg.list_rid;
tmp_rid.page = g_static_last_seg.list_rid.page;
tmp_rid.size = sizeof (struct static_repo_list_item);
for (j = 0; j < STATIC_REPO_SEG_MAX; j++) {
tmp_rid.slot = j;
Tread (xid, tmp_rid, g_static_repo_list[i].list + j);
}
g_static_repo_list_max += STATIC_REPO_SEG_MAX;
if (g_static_last_seg.next_seg_rid.size > 0)
g_static_last_seg_rid = g_static_last_seg.next_seg_rid;
}
debug ("...done (%d segments read)", i);
debug ("...done");
}
debug_end ();
return -1;
}
debug ("reading list segments...");
g_static_last_seg_rid = g_static_repo.first_seg_rid;
g_static_repo_list_max = 0;
for (i = 0; i < g_static_repo.nseg; i++) {
Tread (xid, g_static_last_seg_rid, &g_static_last_seg);
g_static_repo_list[i].rid = g_static_last_seg.list_rid;
tmp_rid.page = g_static_last_seg.list_rid.page;
tmp_rid.size = sizeof (struct static_repo_list_item);
for (j = 0; j < STATIC_REPO_SEG_MAX; j++) {
tmp_rid.slot = j;
Tread (xid, tmp_rid, g_static_repo_list[i].list + j);
}
g_static_repo_list_max += STATIC_REPO_SEG_MAX;
if (g_static_last_seg.next_seg_rid.size > 0)
g_static_last_seg_rid = g_static_last_seg.next_seg_rid;
}
debug ("...done (%d segments read)", i);
debug ("...done");
/*
* Adjust references within objects.
*/
debug ("adjusting object references...");
for (i = g_pobj_repo.occupied_head, count = 0; i >= 0;
i = pobj_slot->next_index, count++) {
pobj_slot = g_pobj_repo_list[POBJ_REPO_SEG (i)].list + POBJ_REPO_OFF (i);
pobj_adjust (pobj_slot->objid, pobj_convert_hash, xid);
if (! ret) {
debug ("adjusting object references...");
for (i = g_pobj_repo.occupied_head, count = 0; i >= 0;
i = pobj_slot->next_index, count++) {
pobj_slot = g_pobj_repo_list[POBJ_REPO_SEG (i)].list + POBJ_REPO_OFF (i);
pobj_adjust (pobj_slot->objid, pobj_convert_hash, xid);
}
adjusted = 1;
debug ("...done (%d objects adjusted)", count);
}
debug ("...done (%d objects adjusted)", count);
/*
* Adjust/tie static references to objects.
*/
debug ("tying adjusted static references, building fast lookup table...");
g_static_repo_hash = hash_new (STATIC_REPO_HASH_NBUCKEXP);
tmp_rid.size = sizeof (struct static_repo_list_item);
for (i = g_static_repo.occupied_head; i >= 0; i = static_slot->next_index) {
static_slot = g_static_repo_list[STATIC_REPO_SEG(i)].list + STATIC_REPO_OFF(i);
if (! ret) {
debug ("tying adjusted static references, building fast lookup table...");
new_objid = (void *) hash_lookup (pobj_convert_hash,
OBJ2KEY (static_slot->objid));
if (! (g_static_repo_hash = hash_new (STATIC_REPO_HASH_NBUCKEXP))) {
debug ("error: hash initialization failed");
ret = -1;
debug ("...interrupted");
}
else {
tmp_rid.size = sizeof (struct static_repo_list_item);
for (count = 0, i = g_static_repo.occupied_head; i >= 0;
count++, i = static_slot->next_index) {
static_slot = g_static_repo_list[STATIC_REPO_SEG(i)].list
+ STATIC_REPO_OFF(i);
new_objid = (void *) hash_lookup (pobj_convert_hash,
OBJ2KEY (static_slot->objid));
debug ("adjusting %p: %p->%p",
(void *) static_slot->static_ptr, static_slot->objid, new_objid);
*static_slot->static_ptr = new_objid;
static_slot->objid = new_objid;
debug ("adjusting %p: %p->%p",
(void *) static_slot->static_ptr, static_slot->objid, new_objid);
*static_slot->static_ptr = new_objid;
static_slot->objid = new_objid;
/* Note: we apply a deliberate offset of 1 to index values,
* so as to be able to distinguish a 0 index value from "key not found"
* on subsequent table lookups. */
hash_insert (g_static_repo_hash, OBJ2KEY (static_slot->static_ptr), i + 1);
/* Note: we apply a deliberate offset of 1 to index values,
* so as to be able to distinguish a 0 index value from "key not found"
* on subsequent table lookups. */
if (hash_insert (g_static_repo_hash,
OBJ2KEY (static_slot->static_ptr), i + 1) < 0) {
debug ("error: hash insertion failed");
ret = -1;
break;
}
tmp_rid.page = g_static_repo_list[STATIC_REPO_SEG(i)].rid.page;
tmp_rid.slot = i;
Tset (xid, tmp_rid, static_slot);
changed++;
tmp_rid.page = g_static_repo_list[STATIC_REPO_SEG(i)].rid.page;
tmp_rid.slot = i;
Tset (xid, tmp_rid, static_slot);
}
if (ret < 0)
debug ("...interrupted");
else
debug ("...done (%d tied)", count);
}
}
debug ("...done (%d tied)", changed);
hash_free (pobj_convert_hash);
/*
* Deconstruct everything in case of failure.
*/
if (ret < 0) {
if (g_static_repo_hash) {
debug ("nullifying static references and deallocating lookup table...");
for (i = g_static_repo.occupied_head; i >= 0;
i = static_slot->next_index) {
static_slot = g_static_repo_list[STATIC_REPO_SEG(i)].list
+ STATIC_REPO_OFF(i);
*static_slot->static_ptr = NULL;
}
hash_free (g_static_repo_hash);
g_static_repo_hash = NULL;
debug ("...done");
}
if (g_static_repo_list) {
debug ("deallocating static references repository...");
for (i = 0; i < g_static_repo.nseg; i++)
XFREE (g_static_repo_list[i].list);
XFREE (g_static_repo_list);
g_static_repo_list = NULL;
debug ("...done");
}
if (pobj_convert_hash) {
debug ("deallocating persistent objects...");
for (count = 0, i = g_pobj_repo.occupied_head; i >= 0;
count++, i = pobj_slot->next_index) {
pobj_slot = g_pobj_repo_list[POBJ_REPO_SEG (i)].list
+ POBJ_REPO_OFF (i);
if (adjusted)
new_objid = pobj_slot->objid;
else
new_objid = (void *) hash_lookup (pobj_convert_hash,
OBJ2KEY (pobj_slot->objid));
if (new_objid) {
p = OBJ2POBJ (new_objid);
debug ("deallocating %p (%p)", new_objid, (void *) p);
g_memfunc.free (p);
}
else
break;
}
debug ("...done (%d object deallocated)", count);
}
debug ("deallocating object repository...");
for (i = 0; i < g_pobj_repo.nseg; i++)
XFREE (g_pobj_repo_list[i].list);
XFREE (g_pobj_repo_list);
g_pobj_repo_list = NULL;
debug ("...done");
}
if (pobj_convert_hash)
hash_free (pobj_convert_hash);
debug_end ();
return 0;
return ret;
}
@ -1740,25 +1840,57 @@ pobj_boot_init (int xid)
debug_start ();
/*
* Create a single-segment persistent object repository.
* Allocate memory for single-segment object/static repositories and
* fast lookup table.
*/
debug ("creating persistent object repository...");
debug ("allocating memory for repositories and fast lookup table...");
debug ("allocating memory");
g_pobj_repo_list = (struct pobj_repo_list *)
XMALLOC (sizeof (struct pobj_repo_list));
if (g_pobj_repo_list)
g_pobj_repo_list[0].list = (struct pobj_repo_list_item *)
XMALLOC (sizeof (struct pobj_repo_list_item) * POBJ_REPO_SEG_MAX);
if (! (g_pobj_repo_list && g_pobj_repo_list[0].list)) {
g_static_repo_list = (struct static_repo_list *)
XMALLOC (sizeof (struct static_repo_list));
if (g_static_repo_list)
g_static_repo_list[0].list = (struct static_repo_list_item *)
XMALLOC (sizeof (struct static_repo_list_item) * STATIC_REPO_SEG_MAX);
g_static_repo_hash = hash_new (STATIC_REPO_HASH_NBUCKEXP);
if (! (g_pobj_repo_list && g_pobj_repo_list[0].list
&& g_static_repo_list && g_static_repo_list[0].list
&& g_static_repo_hash))
{
debug ("error: allocation failed");
if (g_pobj_repo_list)
if (g_pobj_repo_list) {
if (g_pobj_repo_list[0].list)
XFREE (g_pobj_repo_list[0].list);
XFREE (g_pobj_repo_list);
}
if (g_static_repo_list) {
if (g_static_repo_list[0].list)
XFREE (g_static_repo_list[0].list);
XFREE (g_static_repo_list);
}
if (g_static_repo_hash)
hash_free (g_static_repo_hash);
debug_end ();
return -1;
}
debug ("initializing in-memory single-segment object list");
debug ("...done");
/*
* Initialize persistent object repository.
*/
debug ("creating persistent object repository...");
debug ("initializing single-segment list");
/* Note: don't care for back pointers in vacant list. */
pobj_slot = g_pobj_repo_list[0].list;
next_index = 1;
@ -1798,25 +1930,11 @@ pobj_boot_init (int xid)
debug ("...done");
/*
* Create a single-segment static reference repository.
* Initialize static reference repository.
*/
debug ("creating static reference repository...");
debug ("allocating memory");
g_static_repo_list = (struct static_repo_list *)
XMALLOC (sizeof (struct static_repo_list));
if (g_static_repo_list)
g_static_repo_list[0].list = (struct static_repo_list_item *)
XMALLOC (sizeof (struct static_repo_list_item) * STATIC_REPO_SEG_MAX);
if (! (g_static_repo_list && g_static_repo_list[0].list)) {
debug ("error: allocation failed");
if (g_static_repo_list)
XFREE (g_static_repo_list);
debug_end ();
return -1;
}
debug ("initializing in-memory single-segment static reference list");
debug ("initializing single-segment list");
/* Note: don't care for back pointers in vacant list. */
static_slot = g_static_repo_list[0].list;
next_index = 1;
@ -1853,11 +1971,11 @@ pobj_boot_init (int xid)
g_boot.static_repo_rid = Talloc (xid, sizeof (struct repo));
Tset (xid, g_boot.static_repo_rid, &g_static_repo);
g_static_repo_hash = hash_new (STATIC_REPO_HASH_NBUCKEXP);
debug ("...done");
/*
* Initialize boot record.
*/
debug ("writing boot record...");
Tset (xid, g_boot_rid, &g_boot);
debug ("...done");

View file

@ -18,6 +18,7 @@ struct queue_seg {
struct queue {
struct queue_seg *head_seg;
struct queue_seg *tail_seg;
struct queue_seg *recycle_seg;
int seg_size;
};
@ -52,6 +53,7 @@ queue_new (int seg_size)
return NULL;
}
q->recycle_seg = NULL;
q->seg_size = seg_size;
debug_end ();
return q;
@ -66,6 +68,8 @@ queue_free (struct queue *q)
next = seg->next;
XFREE (seg);
}
if (q->recycle_seg)
XFREE (q->recycle_seg);
XFREE (q);
}
@ -95,10 +99,18 @@ queue_deq (struct queue *q)
if (seg->head == q->seg_size)
seg->head = 0;
/* Deallocate empty head segment, if it isn't the last one. */
/* Deallocate empty head segment, if it isn't the last one.
* Note: we "recycle" a single segment, to avoid repeated allocation /
* deallocation when a contant length cyclic enq/deq sequence is due. */
if (seg->head == seg->tail && seg->next) {
q->head_seg = seg->next;
XFREE (seg);
if (q->recycle_seg)
XFREE (seg);
else {
debug ("recycling seg=%p", seg);
memset (seg, 0, sizeof (struct queue_seg));
q->recycle_seg = seg;
}
}
debug_end ();
@ -123,13 +135,20 @@ queue_enq (struct queue *q, unsigned long val)
/* Allocate new segment if current was exhausted. */
if (seg->head == seg->tail) {
seg->next = queue_seg_alloc (q->seg_size);
if (! seg->next) {
if (! seg->tail--)
seg->tail += q->seg_size;
debug ("segment allocation failed, enqueue undone");
debug_end ();
return -1;
if (q->recycle_seg) {
debug ("using recycled segment seg=%p", q->recycle_seg);
seg->next = q->recycle_seg;
q->recycle_seg = NULL;
}
else {
seg->next = queue_seg_alloc (q->seg_size);
if (! seg->next) {
if (! seg->tail--)
seg->tail += q->seg_size;
debug ("segment allocation failed, enqueue undone");
debug_end ();
return -1;
}
}
q->tail_seg = seg->next;
}