/alps/fastwriter

To get this branch, use:
bzr branch http://suren.me/webbzr/alps/fastwriter
1 by Suren A. Chilingaryan
Initial release
1
#define _GNU_SOURCE
2
3
#include <stdio.h>
4
#include <stdlib.h>
5
#include <string.h>
6
#include <unistd.h>
7
#include <limits.h>
8
#include <errno.h>
9
10
#include <pthread.h>
11
12
#include <sys/types.h>
13
#include <sys/stat.h>
14
#include <sys/time.h>
15
16
#include <fcntl.h>
17
18
#include "private.h"
19
#include "default.h"
20
#include "sysinfo.h"
12.1.2 by Suren A. Chilingaryan
Seems new memcpy is only good for ipepdvcompute2, make it optional and disabled by default
21
22
#ifdef USE_CUSTOM_MEMCPY
23
# include "memcpy.h"
24
#else /* USE_CUSTOM_MEMCPY */
25
# define fast_memcpy memcpy
26
#endif /* USE_CUSTOM_MEMCPY */
27
1 by Suren A. Chilingaryan
Initial release
28
29
fastwriter_t *fastwriter_init(const char *fs, fastwriter_flags_t flags) {
30
    fastwriter_t *ctx;
31
    
32
    ctx = (fastwriter_t*)malloc(sizeof(fastwriter_t));
33
    if (!ctx) return ctx;
34
    
35
    memset(ctx, 0, sizeof(fastwriter_t));
36
    ctx->params.flags = flags;
37
    ctx->api = &fastwriter_default_api;
38
    
39
    return ctx;
40
}
41
42
void fastwriter_destroy(fastwriter_t *ctx) {
43
    free(ctx);
44
}
45
46
int fastwriter_set_buffer_size(fastwriter_t *ctx, size_t buffer_size) {
47
    ctx->params.buffer_size = buffer_size;
48
    
49
    return 0;
50
}
51
52
static void *fastwriter_writer_thread(void *user);
53
54
int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flags) {
55
    int i;
56
    int err;
57
    int e[4];
58
    
59
    ctx->flags = flags | ctx->params.flags;
60
    
61
    switch (ctx->params.buffer_size) {
62
     case FASTWRITER_BUFFER_DEFAULT:
63
        ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE;
64
	break;
65
     case FASTWRITER_BUFFER_MAX:
5 by Suren A. Chilingaryan
Properly detect /dev/null as raw device and do not set DIRECT flag on raw devices
66
        ctx->size = fastwriter_get_free_memory();
16 by Suren A. Chilingaryan
RPM support
67
	
68
	if ((ctx->size == (size_t)-1)||((ctx->size - FASTWRITER_RESERVE_MEMORY) < FASTWRITER_DEFAULT_BUFFER_SIZE))
1 by Suren A. Chilingaryan
Initial release
69
    	    ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE;
70
	else
71
	    ctx->size -= FASTWRITER_RESERVE_MEMORY;
72
73
        break;
74
     default:
75
	ctx->size = ctx->params.buffer_size;
76
    }
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
77
78
    if (ctx->size%FASTWRITER_SYNCIO_ALIGN)
79
	ctx->size += FASTWRITER_SYNCIO_ALIGN - (ctx->size%FASTWRITER_SYNCIO_ALIGN);
80
81
    err = posix_memalign(&ctx->buffer, FASTWRITER_SYNCIO_ALIGN, ctx->size);
82
    if ((err)||(!ctx->buffer)) {
1 by Suren A. Chilingaryan
Initial release
83
	fastwriter_close(ctx);
84
	return ENOMEM;
85
    }
86
    ctx->err = 0;
87
    ctx->written = 0;
88
    ctx->commited = 0;
89
    ctx->chunked = 0;
90
    
91
    ctx->tail = 0;
92
    ctx->head = 0;
93
    ctx->pos = 0;
94
    
95
    err = ctx->api->open(ctx, name, ctx->flags);
96
    if (err) {
97
	fastwriter_close(ctx);
98
	return err;
99
    }
100
101
    e[0] = pthread_mutex_init(&ctx->data_cond_mutex, NULL);
102
    e[1] = pthread_mutex_init(&ctx->space_cond_mutex, NULL);
103
    e[2] = pthread_cond_init(&ctx->data_cond, NULL);
104
    e[3] = pthread_cond_init(&ctx->space_cond, NULL);
105
    
106
    if (e[0]|e[1]|e[2]|e[3]) {
107
	if (!e[3]) pthread_cond_destroy(&ctx->space_cond);
108
	if (!e[2]) pthread_cond_destroy(&ctx->data_cond);
109
	if (!e[1]) pthread_mutex_destroy(&ctx->space_cond_mutex);
110
	if (!e[0]) pthread_mutex_destroy(&ctx->data_cond_mutex);
111
	
112
	fastwriter_close(ctx);
113
	
114
	for (i = 0; i < 4; i++) 
115
	    if (e[i]) return e[i];
116
    }
117
    
118
    ctx->clean_locks = 1;
119
    ctx->run_flag = 1;
120
    
121
    err = pthread_create(&ctx->wthread, NULL, &fastwriter_writer_thread, ctx);
122
    if (err) {
123
	ctx->run_flag = 0;
124
	fastwriter_close(ctx);
125
	return err;
126
    }
127
    
128
    return 0;
129
}
130
131
int fastwriter_close(fastwriter_t *ctx) {
132
    if ((!ctx->err)&&(ctx->pos != ctx->head))
133
	return EBADFD;
134
135
    if (ctx->run_flag) {
136
	ctx->run_flag = 0;
137
138
	pthread_mutex_lock(&ctx->data_cond_mutex);
139
	pthread_cond_broadcast(&ctx->data_cond);
140
	pthread_mutex_unlock(&ctx->data_cond_mutex);
141
	
142
	pthread_join(ctx->wthread, NULL);
143
    }
144
    
145
    if (ctx->clean_locks) {
146
	pthread_cond_destroy(&ctx->space_cond);
147
	pthread_cond_destroy(&ctx->data_cond);
148
	pthread_mutex_destroy(&ctx->space_cond_mutex);
149
	pthread_mutex_destroy(&ctx->data_cond_mutex);
150
    
151
	ctx->clean_locks = 0;
152
    }
153
    
154
    ctx->api->close(ctx);
155
156
    if (ctx->buffer) {
157
	free(ctx->buffer);
158
	ctx->buffer = NULL;
159
    }
160
    
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
161
    return ctx->err;
1 by Suren A. Chilingaryan
Initial release
162
    
163
}
164
165
166
static inline size_t fastwriter_compute_free_space(fastwriter_t *ctx) {
167
    if (ctx->pos < ctx->tail) return ctx->tail - ctx->pos;
168
    return ctx->tail + ctx->size - ctx->pos - 1;
169
}
170
171
int fastwriter_get_stats(fastwriter_t *ctx, fastwriter_stats_t *stats) {
172
    stats->buffer_size = ctx->size;
173
    stats->buffer_used = ctx->size - fastwriter_compute_free_space(ctx);
174
    stats->buffer_max = ctx->max_usage;
175
    stats->commited = ctx->commited;
176
    stats->written = ctx->written;
177
    return 0;
178
}
179
180
181
static void *fastwriter_writer_thread(void *user) {
182
    int err = 0;
183
    fastwriter_write_flags_t flags;
184
    size_t size;
185
    size_t head;
186
187
    fastwriter_t *ctx = (fastwriter_t*)user;
188
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
189
1 by Suren A. Chilingaryan
Initial release
190
    while ((ctx->run_flag)||(ctx->head != ctx->tail)) {
191
	if (ctx->head != ctx->tail) {
192
	    head = ctx->head;
193
194
	    if (head > ctx->tail) {
195
		size = head - ctx->tail;
196
		flags = FASTWRITER_WRITE_FLAGS_DEFAULT;
197
	    } else { 
198
		size = ctx->size - ctx->tail;
199
		flags = FASTWRITER_WRITE_FLAG_FORCE;
200
	    }
201
	    
202
	    if (!ctx->run_flag) 
203
		flags |= FASTWRITER_WRITE_FLAG_FORCE;
204
205
	    err = ctx->api->write(ctx, flags, size, ctx->buffer + ctx->tail, &size);
206
	    if (err) {
207
		ctx->err = err;
208
		ctx->run_flag = 0;
209
210
		pthread_mutex_lock(&ctx->space_cond_mutex);
211
		pthread_cond_broadcast(&ctx->space_cond);
212
		pthread_mutex_unlock(&ctx->space_cond_mutex);
213
		
214
		return NULL;
215
	    }
216
	    
217
	    if (size > 0) {
218
		ctx->written += size;
219
		
220
		size += ctx->tail;
221
		if (size == ctx->size) ctx->tail = 0;
222
		else ctx->tail = size;
223
	    
224
		pthread_mutex_lock(&ctx->space_cond_mutex);
225
		pthread_cond_broadcast(&ctx->space_cond);
226
		pthread_mutex_unlock(&ctx->space_cond_mutex);
227
	    } else {
228
		pthread_mutex_lock(&ctx->data_cond_mutex);
229
		while ((ctx->run_flag)&&(ctx->head == head)) {
230
		    pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
231
		}
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
232
		pthread_mutex_unlock(&ctx->data_cond_mutex);
1 by Suren A. Chilingaryan
Initial release
233
	    }
234
	} else {
235
	    pthread_mutex_lock(&ctx->data_cond_mutex);
236
	    while ((ctx->run_flag)&&(ctx->head == ctx->tail)) {
237
		pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
238
	    }
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
239
	    pthread_mutex_unlock(&ctx->data_cond_mutex);
1 by Suren A. Chilingaryan
Initial release
240
	}
241
    }
242
    
243
    return NULL;
244
}
245
246
2 by Suren A. Chilingaryan
Just push instead push_chunk
247
int fastwriter_push(fastwriter_t *ctx, size_t size, const void *data) {
1 by Suren A. Chilingaryan
Initial release
248
    size_t part1, end;
249
    size_t free = fastwriter_compute_free_space(ctx);
11 by Suren A. Chilingaryan
Report with different error codes if library will block until buffer is free or if the supplied block is too big to fit into the buffer
250
    
1 by Suren A. Chilingaryan
Initial release
251
    if (free < size) {
252
	ctx->max_usage = ctx->size;
11 by Suren A. Chilingaryan
Report with different error codes if library will block until buffer is free or if the supplied block is too big to fit into the buffer
253
254
        if (size > ctx->size) {
255
            return EOVERFLOW;
256
        }
1 by Suren A. Chilingaryan
Initial release
257
	
258
	if ((ctx->flags&FASTWRITER_FLAGS_BLOCK)==0)
259
	    return EWOULDBLOCK;
260
	
261
	pthread_mutex_lock(&ctx->space_cond_mutex);
262
	while ((ctx->run_flag)&&(fastwriter_compute_free_space(ctx) < size)) {
263
	    pthread_cond_wait(&ctx->space_cond, &ctx->space_cond_mutex);
264
	}
265
	pthread_mutex_unlock(&ctx->space_cond_mutex);
266
    } else {
267
	end = ctx->size - (free - size);
268
	if (end > ctx->max_usage) ctx->max_usage = end;
269
    }
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
270
1 by Suren A. Chilingaryan
Initial release
271
    if (!ctx->run_flag) {
272
	if (ctx->err) return ctx->err;
273
	return EBADFD;
274
    }
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
275
1 by Suren A. Chilingaryan
Initial release
276
    if (ctx->pos < ctx->tail) end = ctx->tail;
277
    else end = ctx->size;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
278
1 by Suren A. Chilingaryan
Initial release
279
    part1 = end - ctx->pos;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
280
281
    if (part1 < size) {
1 by Suren A. Chilingaryan
Initial release
282
	    // tail < pos (we have checked for free space)
283
	end = size - part1;
12.1.1 by Suren A. Chilingaryan
Use memcpy implementation by Daniel Vik
284
	fast_memcpy(ctx->buffer + ctx->pos, data, part1);
285
	fast_memcpy(ctx->buffer, data + part1, end);
1 by Suren A. Chilingaryan
Initial release
286
	ctx->pos = end;
287
    } else {
12.1.1 by Suren A. Chilingaryan
Use memcpy implementation by Daniel Vik
288
	fast_memcpy(ctx->buffer + ctx->pos, data, size);
1 by Suren A. Chilingaryan
Initial release
289
	ctx->pos += size;
290
	
291
	if (ctx->pos == ctx->size) ctx->pos = 0;
292
    }
293
    
294
    ctx->chunked += size;
295
    
296
    return 0;
297
}
298
299
300
int fastwriter_commit(fastwriter_t *ctx) {
301
    ctx->head = ctx->pos;
302
303
    pthread_mutex_lock(&ctx->data_cond_mutex);
304
    pthread_cond_broadcast(&ctx->data_cond);
305
    pthread_mutex_unlock(&ctx->data_cond_mutex);
306
    
307
    ctx->commited += ctx->chunked;
308
    ctx->chunked = 0;
309
310
    return 0;
311
}
312
313
314
int fastwriter_cancel(fastwriter_t *ctx) {
315
    ctx->pos = ctx->head;
316
    
317
    ctx->chunked = 0;
318
    
319
    return 0;
320
}
321
322
323
int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) {
324
    int err;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
325
2 by Suren A. Chilingaryan
Just push instead push_chunk
326
    err = fastwriter_push(ctx, size, buf);
1 by Suren A. Chilingaryan
Initial release
327
    if (err) return err;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
328
1 by Suren A. Chilingaryan
Initial release
329
    err = fastwriter_commit(ctx);
330
    if (err) fastwriter_cancel(ctx);
331
332
    return err;
333
}