Refactored to eliminate some code duplication.

This commit is contained in:
Phillip Toland 2009-02-03 09:37:08 -06:00
parent ed24fd72e4
commit 1c11f81416
2 changed files with 81 additions and 151 deletions

View file

@ -46,8 +46,6 @@ static void* deadlock_check(void* arg);
static void* trickle_write(void* arg); static void* trickle_write(void* arg);
static void* txn_checkpoint(void* arg); static void* txn_checkpoint(void* arg);
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc);
/** /**
* Global instance of DB_ENV; only a single one exists per O/S process. * Global instance of DB_ENV; only a single one exists per O/S process.
*/ */
@ -872,6 +870,81 @@ static void tune_system(int target, void* values, BinHelper* bh)
} }
} }
static void async_cleanup(PortData* d)
{
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
}
static void async_cleanup_and_send_rc(PortData* d, int rc)
{
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData pid = d->port_owner;
async_cleanup(d);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
if (rc == 0)
{
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
}
static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
{
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData pid = d->port_owner;
async_cleanup(d);
// Notify port of result
if (rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_BUF2BINARY, (ErlDrvTermData)key->data, (ErlDrvUInt)key->size,
ERL_DRV_BUF2BINARY, (ErlDrvTermData)value->data, (ErlDrvUInt)value->size,
ERL_DRV_TUPLE, 3};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else if (rc == DB_NOTFOUND)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
}
static void do_async_put(void* arg) static void do_async_put(void* arg)
{ {
// Payload is: <<DbRef:32, Flags:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>> // Payload is: <<DbRef:32, Flags:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>>
@ -914,26 +987,7 @@ static void do_async_put(void* arg)
d->txn = 0; d->txn = 0;
} }
// Save the port and pid references -- we need copies independent from the PortData async_cleanup_and_send_rc(d, rc);
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
send_ok_or_error(port, port_owner, rc);
} }
static void do_async_get(void* arg) static void do_async_get(void* arg)
@ -979,42 +1033,7 @@ static void do_async_get(void* arg)
d->txn = 0; d->txn = 0;
} }
// Save the port and pid references -- we need copies independent from the PortData async_cleanup_and_send_kv(d, rc, &key, &value);
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// Notify port of result
if (rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size,
ERL_DRV_TUPLE, 2};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else if (rc == DB_NOTFOUND)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
// Finally, clean up value buffer (driver_send_term made a copy) // Finally, clean up value buffer (driver_send_term made a copy)
driver_free(value.data); driver_free(value.data);
@ -1038,26 +1057,7 @@ static void do_async_txnop(void* arg)
// The transaction is now invalid, regardless of the outcome. // The transaction is now invalid, regardless of the outcome.
d->txn = 0; d->txn = 0;
// Save the port and pid references -- we need copies independent from the PortData async_cleanup_and_send_rc(d, rc);
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
send_ok_or_error(port, port_owner, rc);
} }
@ -1100,42 +1100,7 @@ static void do_async_cursor_get(void* arg)
} }
} }
// Save the port and pid references -- we need copies independent from the PortData async_cleanup_and_send_kv(d, rc, &key, &value);
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// Notify port of result
if (rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_BUF2BINARY, (ErlDrvTermData)key.data, (ErlDrvUInt)key.size,
ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size,
ERL_DRV_TUPLE, 3};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else if (rc == DB_NOTFOUND)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
} }
static void do_async_truncate(void* arg) static void do_async_truncate(void* arg)
@ -1159,26 +1124,7 @@ static void do_async_truncate(void* arg)
d->txn = 0; d->txn = 0;
} }
// Save the port and pid references -- we need copies independent from the PortData async_cleanup_and_send_rc(d, rc);
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
send_ok_or_error(port, port_owner, rc);
} }
@ -1450,19 +1396,3 @@ static void* txn_checkpoint(void* arg)
DBG("Checkpointer exiting.\r\n"); DBG("Checkpointer exiting.\r\n");
return 0; return 0;
} }
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc)
{
if (rc == 0)
{
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
}

View file

@ -174,7 +174,7 @@ get(Db, Key, Opts) ->
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok ->
receive receive
{ok, Bin} -> {ok, binary_to_term(Bin)}; {ok, _, Bin} -> {ok, binary_to_term(Bin)};
not_found -> not_found; not_found -> not_found;
{error, Reason} -> {error, {get, decode_rc(Reason)}} {error, Reason} -> {error, {get, decode_rc(Reason)}}
end; end;