diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index bf057ab..eff05f1 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -46,8 +46,6 @@ static void* deadlock_check(void* arg); static void* trickle_write(void* arg); static void* txn_checkpoint(void* arg); -static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc); - /** * Global instance of DB_ENV; only a single one exists per O/S process. */ @@ -872,6 +870,81 @@ static void tune_system(int target, void* values, BinHelper* bh) } } +static void async_cleanup(PortData* d) +{ + // Release the port for another operation + d->async_pool = 0; + d->async_job = 0; + d->work_buffer_offset = 0; + erl_drv_mutex_lock(d->port_lock); + d->async_op = CMD_NONE; + erl_drv_mutex_unlock(d->port_lock); +} + +static void async_cleanup_and_send_rc(PortData* d, int rc) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + + async_cleanup(d); + + // TODO: May need to tag the messages a bit more explicitly so that if another async + // job runs to completion before the message gets delivered we don't mis-interpret this + // response code. + if (rc == 0) + { + ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } +} + +static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + + async_cleanup(d); + + // Notify port of result + if (rc == 0) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_BUF2BINARY, (ErlDrvTermData)key->data, (ErlDrvUInt)key->size, + ERL_DRV_BUF2BINARY, (ErlDrvTermData)value->data, (ErlDrvUInt)value->size, + ERL_DRV_TUPLE, 3}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else if (rc == DB_NOTFOUND) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } +} + static void do_async_put(void* arg) { // Payload is: <> @@ -914,26 +987,7 @@ static void do_async_put(void* arg) d->txn = 0; } - // Save the port and pid references -- we need copies independent from the PortData - // structure. Once we release the port_lock after clearing the cmd, it's possible that - // the port could go away without waiting on us to finish. This is acceptable, but we need - // to be certain that there is no overlap of data between the two threads. driver_send_term - // is safe to use from a thread, even if the port you're sending from has already expired. - ErlDrvPort port = d->port; - ErlDrvTermData port_owner = d->port_owner; - - // Release the port for another operation - d->async_pool = 0; - d->async_job = 0; - d->work_buffer_offset = 0; - erl_drv_mutex_lock(d->port_lock); - d->async_op = CMD_NONE; - erl_drv_mutex_unlock(d->port_lock); - - // TODO: May need to tag the messages a bit more explicitly so that if another async - // job runs to completion before the message gets delivered we don't mis-interpret this - // response code. - send_ok_or_error(port, port_owner, rc); + async_cleanup_and_send_rc(d, rc); } static void do_async_get(void* arg) @@ -979,42 +1033,7 @@ static void do_async_get(void* arg) d->txn = 0; } - // Save the port and pid references -- we need copies independent from the PortData - // structure. Once we release the port_lock after clearing the cmd, it's possible that - // the port could go away without waiting on us to finish. This is acceptable, but we need - // to be certain that there is no overlap of data between the two threads. driver_send_term - // is safe to use from a thread, even if the port you're sending from has already expired. - ErlDrvPort port = d->port; - ErlDrvTermData port_owner = d->port_owner; - - // Release the port for another operation - d->async_pool = 0; - d->async_job = 0; - d->work_buffer_offset = 0; - erl_drv_mutex_lock(d->port_lock); - d->async_op = CMD_NONE; - erl_drv_mutex_unlock(d->port_lock); - - // Notify port of result - if (rc == 0) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), - ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); - } - else if (rc == DB_NOTFOUND) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; - driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); - } + async_cleanup_and_send_kv(d, rc, &key, &value); // Finally, clean up value buffer (driver_send_term made a copy) driver_free(value.data); @@ -1038,26 +1057,7 @@ static void do_async_txnop(void* arg) // The transaction is now invalid, regardless of the outcome. d->txn = 0; - // Save the port and pid references -- we need copies independent from the PortData - // structure. Once we release the port_lock after clearing the cmd, it's possible that - // the port could go away without waiting on us to finish. This is acceptable, but we need - // to be certain that there is no overlap of data between the two threads. driver_send_term - // is safe to use from a thread, even if the port you're sending from has already expired. - ErlDrvPort port = d->port; - ErlDrvTermData port_owner = d->port_owner; - - // Release the port for another operation - d->async_pool = 0; - d->async_job = 0; - d->work_buffer_offset = 0; - erl_drv_mutex_lock(d->port_lock); - d->async_op = CMD_NONE; - erl_drv_mutex_unlock(d->port_lock); - - // TODO: May need to tag the messages a bit more explicitly so that if another async - // job runs to completion before the message gets delivered we don't mis-interpret this - // response code. - send_ok_or_error(port, port_owner, rc); + async_cleanup_and_send_rc(d, rc); } @@ -1100,42 +1100,7 @@ static void do_async_cursor_get(void* arg) } } - // Save the port and pid references -- we need copies independent from the PortData - // structure. Once we release the port_lock after clearing the cmd, it's possible that - // the port could go away without waiting on us to finish. This is acceptable, but we need - // to be certain that there is no overlap of data between the two threads. driver_send_term - // is safe to use from a thread, even if the port you're sending from has already expired. - ErlDrvPort port = d->port; - ErlDrvTermData port_owner = d->port_owner; - - // Release the port for another operation - d->async_pool = 0; - d->async_job = 0; - erl_drv_mutex_lock(d->port_lock); - d->async_op = CMD_NONE; - erl_drv_mutex_unlock(d->port_lock); - - // Notify port of result - if (rc == 0) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), - ERL_DRV_BUF2BINARY, (ErlDrvTermData)key.data, (ErlDrvUInt)key.size, - ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size, - ERL_DRV_TUPLE, 3}; - driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); - } - else if (rc == DB_NOTFOUND) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; - driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); - } + async_cleanup_and_send_kv(d, rc, &key, &value); } static void do_async_truncate(void* arg) @@ -1159,26 +1124,7 @@ static void do_async_truncate(void* arg) d->txn = 0; } - // Save the port and pid references -- we need copies independent from the PortData - // structure. Once we release the port_lock after clearing the cmd, it's possible that - // the port could go away without waiting on us to finish. This is acceptable, but we need - // to be certain that there is no overlap of data between the two threads. driver_send_term - // is safe to use from a thread, even if the port you're sending from has already expired. - ErlDrvPort port = d->port; - ErlDrvTermData port_owner = d->port_owner; - - // Release the port for another operation - d->async_pool = 0; - d->async_job = 0; - d->work_buffer_offset = 0; - erl_drv_mutex_lock(d->port_lock); - d->async_op = CMD_NONE; - erl_drv_mutex_unlock(d->port_lock); - - // TODO: May need to tag the messages a bit more explicitly so that if another async - // job runs to completion before the message gets delivered we don't mis-interpret this - // response code. - send_ok_or_error(port, port_owner, rc); + async_cleanup_and_send_rc(d, rc); } @@ -1450,19 +1396,3 @@ static void* txn_checkpoint(void* arg) DBG("Checkpointer exiting.\r\n"); return 0; } - -static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc) -{ - if (rc == 0) - { - ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } -} diff --git a/src/bdberl.erl b/src/bdberl.erl index fefe3cd..9167270 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -174,7 +174,7 @@ get(Db, Key, Opts) -> case decode_rc(Result) of ok -> receive - {ok, Bin} -> {ok, binary_to_term(Bin)}; + {ok, _, Bin} -> {ok, binary_to_term(Bin)}; not_found -> not_found; {error, Reason} -> {error, {get, decode_rc(Reason)}} end;