Skip to content

Commit

Permalink
Prioritise ordering of bulk sends for blocks and workinfo, fixing sta…
Browse files Browse the repository at this point in the history
…ts of message counts
  • Loading branch information
ckolivas committed Jan 17, 2016
1 parent 1a003db commit c5cdb6c
Showing 1 changed file with 37 additions and 25 deletions.
62 changes: 37 additions & 25 deletions src/stratifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -796,13 +796,42 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char

#define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__)

/* Append a bulk list already created to the ssends list */
static void ssend_bulk_append(sdata_t *sdata, ckmsg_t *bulk_send, const int messages)
{
ckmsgq_t *ssends = sdata->ssends;

mutex_lock(ssends->lock);
ssends->messages += messages;
DL_CONCAT(ssends->msgs, bulk_send);
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
}

/* As ssend_bulk_append but for high priority messages to be put at the front
* of the list. */
static void ssend_bulk_prepend(sdata_t *sdata, ckmsg_t *bulk_send, const int messages)
{
ckmsgq_t *ssends = sdata->ssends;
ckmsg_t *tmp;

mutex_lock(ssends->lock);
tmp = ssends->msgs;
ssends->msgs = bulk_send;
ssends->messages += messages;
DL_CONCAT(ssends->msgs, tmp);
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
}

static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id,
const int msg_type);

static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
{
stratum_instance_t *client;
ckmsg_t *bulk_send = NULL;
int messages = 0;

ck_rlock(&sdata->instance_lock);
if (sdata->node_instances) {
Expand Down Expand Up @@ -845,20 +874,15 @@ static void send_node_workinfo(sdata_t *sdata, const workbase_t *wb)
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
json_decref(wb_val);
}
ck_runlock(&sdata->instance_lock);

if (bulk_send) {
ckmsgq_t *ssends = sdata->ssends;

LOGINFO("Sending workinfo to mining nodes");

mutex_lock(ssends->lock);
DL_CONCAT(ssends->msgs, bulk_send);
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
ssend_bulk_prepend(sdata, bulk_send, messages);
}
}

Expand Down Expand Up @@ -1274,8 +1298,8 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
const double diff, const int64_t client_id)
{
stratum_instance_t *client;
int64_t skip, messages = 0;
ckmsg_t *bulk_send = NULL;
int64_t skip;

/* Don't send the block back to a remote node if that's where it was
* found. */
Expand Down Expand Up @@ -1309,20 +1333,15 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
json_decref(val);
}
ck_runlock(&sdata->instance_lock);

if (bulk_send) {
ckmsgq_t *ssends = sdata->ssends;

LOGNOTICE("Sending block to mining nodes");

mutex_lock(ssends->lock);
DL_CONCAT(ssends->msgs, bulk_send);
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
ssend_bulk_prepend(sdata, bulk_send, messages);
}
}

Expand Down Expand Up @@ -2543,7 +2562,7 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type)
stratum_instance_t *client, *tmp;
ckmsg_t *bulk_send = NULL;
time_t now_t = time(NULL);
ckmsgq_t *ssends;
int messages = 0;

if (unlikely(!val)) {
LOGERR("Sent null json to stratum_broadcast");
Expand Down Expand Up @@ -2598,6 +2617,7 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type)
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
ck_runlock(&ckp_sdata->instance_lock);

Expand All @@ -2606,15 +2626,7 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type)
if (!bulk_send)
return;

ssends = sdata->ssends;

mutex_lock(ssends->lock);
if (ssends->msgs)
DL_CONCAT(ssends->msgs, bulk_send);
else
ssends->msgs = bulk_send;
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
ssend_bulk_append(sdata, bulk_send, messages);
}

static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id,
Expand Down

0 comments on commit c5cdb6c

Please sign in to comment.