/alps/fastwriter

To get this branch, use:
bzr branch http://suren.me/webbzr/alps/fastwriter

« back to all changes in this revision

Viewing changes to fastwriter.c

  • Committer: Suren A. Chilingaryan
  • Date: 2011-12-13 13:57:51 UTC
  • Revision ID: csa@dside.dyndns.org-20111213135751-bpzkwwn7ujnkdekc
Initial release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#define _GNU_SOURCE
 
2
 
 
3
#include <stdio.h>
 
4
#include <stdlib.h>
 
5
#include <string.h>
 
6
#include <unistd.h>
 
7
#include <limits.h>
 
8
#include <errno.h>
 
9
 
 
10
#include <pthread.h>
 
11
 
 
12
#include <sys/types.h>
 
13
#include <sys/stat.h>
 
14
#include <sys/time.h>
 
15
 
 
16
#include <fcntl.h>
 
17
 
 
18
 
 
19
#include "private.h"
 
20
#include "default.h"
 
21
#include "sysinfo.h"
 
22
 
 
23
fastwriter_t *fastwriter_init(const char *fs, fastwriter_flags_t flags) {
 
24
    fastwriter_t *ctx;
 
25
    
 
26
    ctx = (fastwriter_t*)malloc(sizeof(fastwriter_t));
 
27
    if (!ctx) return ctx;
 
28
    
 
29
    memset(ctx, 0, sizeof(fastwriter_t));
 
30
    ctx->params.flags = flags;
 
31
    ctx->api = &fastwriter_default_api;
 
32
    
 
33
    return ctx;
 
34
}
 
35
 
 
36
void fastwriter_destroy(fastwriter_t *ctx) {
 
37
    free(ctx);
 
38
}
 
39
 
 
40
int fastwriter_set_buffer_size(fastwriter_t *ctx, size_t buffer_size) {
 
41
    ctx->params.buffer_size = buffer_size;
 
42
    
 
43
    return 0;
 
44
}
 
45
 
 
46
static void *fastwriter_writer_thread(void *user);
 
47
 
 
48
int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flags) {
 
49
    int i;
 
50
    int err;
 
51
    int e[4];
 
52
    
 
53
    ctx->flags = flags | ctx->params.flags;
 
54
    
 
55
    switch (ctx->params.buffer_size) {
 
56
     case FASTWRITER_BUFFER_DEFAULT:
 
57
        ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE;
 
58
        break;
 
59
     case FASTWRITER_BUFFER_MAX:
 
60
        ctx->size = get_free_memory();
 
61
 
 
62
        if ((ctx->size - FASTWRITER_RESERVE_MEMORY) < FASTWRITER_DEFAULT_BUFFER_SIZE)
 
63
            ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE;
 
64
        else
 
65
            ctx->size -= FASTWRITER_RESERVE_MEMORY;
 
66
 
 
67
        break;
 
68
     default:
 
69
        ctx->size = ctx->params.buffer_size;
 
70
    }
 
71
    
 
72
    ctx->buffer = malloc(ctx->size);
 
73
    if (!ctx->buffer) {
 
74
        fastwriter_close(ctx);
 
75
        return ENOMEM;
 
76
    }
 
77
    ctx->err = 0;
 
78
    ctx->written = 0;
 
79
    ctx->commited = 0;
 
80
    ctx->chunked = 0;
 
81
    
 
82
    ctx->tail = 0;
 
83
    ctx->head = 0;
 
84
    ctx->pos = 0;
 
85
    
 
86
    err = ctx->api->open(ctx, name, ctx->flags);
 
87
    if (err) {
 
88
        fastwriter_close(ctx);
 
89
        return err;
 
90
    }
 
91
 
 
92
    e[0] = pthread_mutex_init(&ctx->data_cond_mutex, NULL);
 
93
    e[1] = pthread_mutex_init(&ctx->space_cond_mutex, NULL);
 
94
    e[2] = pthread_cond_init(&ctx->data_cond, NULL);
 
95
    e[3] = pthread_cond_init(&ctx->space_cond, NULL);
 
96
    
 
97
    if (e[0]|e[1]|e[2]|e[3]) {
 
98
        if (!e[3]) pthread_cond_destroy(&ctx->space_cond);
 
99
        if (!e[2]) pthread_cond_destroy(&ctx->data_cond);
 
100
        if (!e[1]) pthread_mutex_destroy(&ctx->space_cond_mutex);
 
101
        if (!e[0]) pthread_mutex_destroy(&ctx->data_cond_mutex);
 
102
        
 
103
        fastwriter_close(ctx);
 
104
        
 
105
        for (i = 0; i < 4; i++) 
 
106
            if (e[i]) return e[i];
 
107
    }
 
108
    
 
109
    ctx->clean_locks = 1;
 
110
    ctx->run_flag = 1;
 
111
    
 
112
    err = pthread_create(&ctx->wthread, NULL, &fastwriter_writer_thread, ctx);
 
113
    if (err) {
 
114
        ctx->run_flag = 0;
 
115
        fastwriter_close(ctx);
 
116
        return err;
 
117
    }
 
118
    
 
119
    return 0;
 
120
}
 
121
 
 
122
int fastwriter_close(fastwriter_t *ctx) {
 
123
    if ((!ctx->err)&&(ctx->pos != ctx->head))
 
124
        return EBADFD;
 
125
 
 
126
    if (ctx->run_flag) {
 
127
        ctx->run_flag = 0;
 
128
 
 
129
        pthread_mutex_lock(&ctx->data_cond_mutex);
 
130
        pthread_cond_broadcast(&ctx->data_cond);
 
131
        pthread_mutex_unlock(&ctx->data_cond_mutex);
 
132
        
 
133
        pthread_join(ctx->wthread, NULL);
 
134
    }
 
135
    
 
136
    if (ctx->clean_locks) {
 
137
        pthread_cond_destroy(&ctx->space_cond);
 
138
        pthread_cond_destroy(&ctx->data_cond);
 
139
        pthread_mutex_destroy(&ctx->space_cond_mutex);
 
140
        pthread_mutex_destroy(&ctx->data_cond_mutex);
 
141
    
 
142
        ctx->clean_locks = 0;
 
143
    }
 
144
    
 
145
    ctx->api->close(ctx);
 
146
 
 
147
    if (ctx->buffer) {
 
148
        free(ctx->buffer);
 
149
        ctx->buffer = NULL;
 
150
    }
 
151
    
 
152
    return 0;
 
153
    
 
154
}
 
155
 
 
156
 
 
157
static inline size_t fastwriter_compute_free_space(fastwriter_t *ctx) {
 
158
    if (ctx->pos < ctx->tail) return ctx->tail - ctx->pos;
 
159
    return ctx->tail + ctx->size - ctx->pos - 1;
 
160
}
 
161
 
 
162
int fastwriter_get_stats(fastwriter_t *ctx, fastwriter_stats_t *stats) {
 
163
    stats->buffer_size = ctx->size;
 
164
    stats->buffer_used = ctx->size - fastwriter_compute_free_space(ctx);
 
165
    stats->buffer_max = ctx->max_usage;
 
166
    stats->commited = ctx->commited;
 
167
    stats->written = ctx->written;
 
168
    return 0;
 
169
}
 
170
 
 
171
 
 
172
static void *fastwriter_writer_thread(void *user) {
 
173
    int err = 0;
 
174
    fastwriter_write_flags_t flags;
 
175
    size_t size;
 
176
    size_t head;
 
177
 
 
178
    fastwriter_t *ctx = (fastwriter_t*)user;
 
179
 
 
180
    while ((ctx->run_flag)||(ctx->head != ctx->tail)) {
 
181
        if (ctx->head != ctx->tail) {
 
182
            head = ctx->head;
 
183
 
 
184
            if (head > ctx->tail) {
 
185
                size = head - ctx->tail;
 
186
                flags = FASTWRITER_WRITE_FLAGS_DEFAULT;
 
187
            } else { 
 
188
                size = ctx->size - ctx->tail;
 
189
                flags = FASTWRITER_WRITE_FLAG_FORCE;
 
190
            }
 
191
            
 
192
            if (!ctx->run_flag) 
 
193
                flags |= FASTWRITER_WRITE_FLAG_FORCE;
 
194
 
 
195
            err = ctx->api->write(ctx, flags, size, ctx->buffer + ctx->tail, &size);
 
196
            if (err) {
 
197
                ctx->err = err;
 
198
                ctx->run_flag = 0;
 
199
 
 
200
                pthread_mutex_lock(&ctx->space_cond_mutex);
 
201
                pthread_cond_broadcast(&ctx->space_cond);
 
202
                pthread_mutex_unlock(&ctx->space_cond_mutex);
 
203
                
 
204
                return NULL;
 
205
            }
 
206
            
 
207
            if (size > 0) {
 
208
                ctx->written += size;
 
209
                
 
210
                size += ctx->tail;
 
211
                if (size == ctx->size) ctx->tail = 0;
 
212
                else ctx->tail = size;
 
213
            
 
214
                pthread_mutex_lock(&ctx->space_cond_mutex);
 
215
                pthread_cond_broadcast(&ctx->space_cond);
 
216
                pthread_mutex_unlock(&ctx->space_cond_mutex);
 
217
            } else {
 
218
                pthread_mutex_lock(&ctx->data_cond_mutex);
 
219
                while ((ctx->run_flag)&&(ctx->head == head)) {
 
220
                    pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
 
221
                }
 
222
            }
 
223
        } else {
 
224
            pthread_mutex_lock(&ctx->data_cond_mutex);
 
225
            while ((ctx->run_flag)&&(ctx->head == ctx->tail)) {
 
226
                pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
 
227
            }
 
228
        }
 
229
    }
 
230
    
 
231
    return NULL;
 
232
}
 
233
 
 
234
 
 
235
int fastwriter_push_chunk(fastwriter_t *ctx, size_t size, const void *data) {
 
236
    size_t part1, end;
 
237
    size_t free = fastwriter_compute_free_space(ctx);
 
238
 
 
239
    if (free < size) {
 
240
        ctx->max_usage = ctx->size;
 
241
        
 
242
        if ((ctx->flags&FASTWRITER_FLAGS_BLOCK)==0)
 
243
            return EWOULDBLOCK;
 
244
        
 
245
        pthread_mutex_lock(&ctx->space_cond_mutex);
 
246
        while ((ctx->run_flag)&&(fastwriter_compute_free_space(ctx) < size)) {
 
247
            pthread_cond_wait(&ctx->space_cond, &ctx->space_cond_mutex);
 
248
        }
 
249
        pthread_mutex_unlock(&ctx->space_cond_mutex);
 
250
    } else {
 
251
        end = ctx->size - (free - size);
 
252
        if (end > ctx->max_usage) ctx->max_usage = end;
 
253
    }
 
254
    
 
255
    if (!ctx->run_flag) {
 
256
        if (ctx->err) return ctx->err;
 
257
        return EBADFD;
 
258
    }
 
259
    
 
260
    if (ctx->pos < ctx->tail) end = ctx->tail;
 
261
    else end = ctx->size;
 
262
    
 
263
 
 
264
    part1 = end - ctx->pos;
 
265
    
 
266
    if (part1 > size) {
 
267
            // tail < pos (we have checked for free space)
 
268
        end = size - part1;
 
269
        memcpy(ctx->buffer + ctx->pos, data, part1);
 
270
        memcpy(ctx->buffer, data + part1, end);
 
271
        ctx->pos = end;
 
272
    } else {
 
273
        memcpy(ctx->buffer + ctx->pos, data, size);
 
274
        ctx->pos += size;
 
275
        
 
276
        if (ctx->pos == ctx->size) ctx->pos = 0;
 
277
    }
 
278
    
 
279
    ctx->chunked += size;
 
280
    
 
281
    return 0;
 
282
}
 
283
 
 
284
 
 
285
int fastwriter_commit(fastwriter_t *ctx) {
 
286
    ctx->head = ctx->pos;
 
287
 
 
288
    pthread_mutex_lock(&ctx->data_cond_mutex);
 
289
    pthread_cond_broadcast(&ctx->data_cond);
 
290
    pthread_mutex_unlock(&ctx->data_cond_mutex);
 
291
    
 
292
    ctx->commited += ctx->chunked;
 
293
    ctx->chunked = 0;
 
294
 
 
295
    return 0;
 
296
}
 
297
 
 
298
 
 
299
int fastwriter_cancel(fastwriter_t *ctx) {
 
300
    ctx->pos = ctx->head;
 
301
    
 
302
    ctx->chunked = 0;
 
303
    
 
304
    return 0;
 
305
}
 
306
 
 
307
 
 
308
int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) {
 
309
    int err;
 
310
    err = fastwriter_push_chunk(ctx, size, buf);
 
311
    if (err) return err;
 
312
    
 
313
    err = fastwriter_commit(ctx);
 
314
    if (err) fastwriter_cancel(ctx);
 
315
 
 
316
    return err;
 
317
}