forked from liuliu/ccv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async.c
72 lines (65 loc) · 2.05 KB
/
async.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdlib.h>
#include <assert.h>
#include <ev.h>
#include <dispatch/dispatch.h>
typedef struct {
void *context;
void (*cb)(void*);
} main_async_t;
static dispatch_semaphore_t async_queue_semaphore;
static int queue_position = 0;
static int queue_pending = 0;
static int queue_length = 10;
static main_async_t* async_queue;
static ev_async main_async;
void main_async_f(void* context, void (*cb)(void*))
{
assert(cb);
dispatch_semaphore_wait(async_queue_semaphore, DISPATCH_TIME_FOREVER);
++queue_pending;
if (queue_pending > queue_length)
{
queue_length = (queue_length * 3 + 1) / 2;
async_queue = (main_async_t*)realloc(async_queue, sizeof(main_async_t) * queue_length);
// when expand the queue, the order of our circular buffer is not maintained
// thus, have to reset the queue_postion here
queue_position = queue_pending - 1;
}
async_queue[queue_position].context = context;
async_queue[queue_position].cb = cb;
queue_position = (queue_position + 1) % queue_length;
dispatch_semaphore_signal(async_queue_semaphore);
ev_async_send(EV_DEFAULT_ &main_async);
}
static void main_async_drain(EV_P_ ev_async* w, int revents)
{
dispatch_semaphore_wait(async_queue_semaphore, DISPATCH_TIME_FOREVER);
while (queue_pending > 0)
{
main_async_t async;
queue_position = (queue_position + queue_length - 1) % queue_length;
--queue_pending;
async = async_queue[queue_position];
dispatch_semaphore_signal(async_queue_semaphore);
// call the async block outside the lock
async.cb(async.context);
// continue the lock so we can get correct queue_pending
dispatch_semaphore_wait(async_queue_semaphore, DISPATCH_TIME_FOREVER);
}
dispatch_semaphore_signal(async_queue_semaphore);
}
void main_async_init(void)
{
async_queue_semaphore = dispatch_semaphore_create(1);
async_queue = (main_async_t*)malloc(sizeof(main_async_t) * queue_length);
ev_async_init(&main_async, main_async_drain);
}
void main_async_start(EV_P)
{
ev_async_start(EV_A_ &main_async);
}
void main_async_destroy(void)
{
dispatch_release(async_queue_semaphore);
free(async_queue);
}