/alps/fastwriter

To get this branch, use:
bzr branch http://suren.me/webbzr/alps/fastwriter
1 by Suren A. Chilingaryan
Initial release
1
#define _FASTWRITER_DEFAULT_C
2
3
#define _GNU_SOURCE
4
#define _XOPEN_SOURCE 600
5
#define _POSIX_C_SOURCE 200112L
6
#define _LARGEFILE64_SOURCE
7
8
#include "config.h"
9
10
#include <stdio.h>
11
#include <stdlib.h>
12
#include <string.h>
13
#include <unistd.h>
14
#include <limits.h>
15
#include <errno.h>
16
17
#include <pthread.h>
18
19
#include <sys/types.h>
20
#include <sys/stat.h>
21
#include <sys/time.h>
22
23
#include <fcntl.h>
24
25
26
#ifdef HAVE_LINUX_FALLOC_H
27
# include <linux/falloc.h>
28
#endif /* HAVE_LINUX_FALLOC_H */
29
9 by Suren A. Chilingaryan
Support XFS RealTime partition
30
#ifndef DISABLE_XFS_REALTIME
31
# include <xfs/xfs.h>
32
#endif /* !DISABLE_XFS_REALTIME */
33
13 by Suren A. Chilingaryan
AIO support
34
1 by Suren A. Chilingaryan
Initial release
35
#include "fastwriter.h"
36
#include "private.h"
37
#include "sysinfo.h"
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
38
#include "default.h"
1 by Suren A. Chilingaryan
Initial release
39
40
#define SYNC_MODE
41
#define HAVE_FALLOCATE
42
#define EXT4_WRITEBLOCK 4194304
43
#define EXT4_PREALLOCATE 1073741824
13 by Suren A. Chilingaryan
AIO support
44
#define OCFS_WRITEBLOCK 262144
45
#define AIO_QUEUE_LENGTH 4
46
#define AIO_BUFFERS 8
47
48
49
#ifndef DISABLE_AIO
50
# include <libaio.h>
51
# if AIO_QUEUE_LENGTH > AIO_BUFFERS
52
#  error "AIO_QUEUE_LENGTH > AIO_BUFFERS"
53
# endif
54
#endif /* DISABLE_AIO */
55
56
57
#ifndef DISABLE_AIO
58
typedef struct {
59
    size_t offset;
60
    size_t size;
61
    int ios;
62
    int ready;			/**< 0 - unused, 1 - processing, 2 - done */
63
} fastwriter_data_t;
64
#endif /* !DISABLE_AIO */
1 by Suren A. Chilingaryan
Initial release
65
66
typedef struct {
67
    int fd;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
68
13 by Suren A. Chilingaryan
AIO support
69
    int sync_mode;		/**< Open with O_DIRECT flag to avoid caches */
70
    int aio_mode;		/**< Use kernel AIO (libaio.h) */
1 by Suren A. Chilingaryan
Initial release
71
    
72
    size_t prior_size;		/**< original size of file */
73
    size_t preallocated;	/**< preallocated bytes */
74
    
75
    size_t wr_block;		/**< minimal block of data to write */
76
    size_t pa_block;		/**< preallocation setp */
13 by Suren A. Chilingaryan
AIO support
77
78
#ifndef DISABLE_AIO
79
    io_context_t aio;
80
    
81
    int ios_ready_n;
82
    int ios_ready[AIO_QUEUE_LENGTH];
83
    struct iocb ios[AIO_QUEUE_LENGTH];
84
85
    int data_head, data_tail;
86
    fastwriter_data_t data[AIO_BUFFERS];
87
    
88
    int ios_status[AIO_QUEUE_LENGTH];
89
    
90
    size_t sched;		/**< how far we ahead of currently writted head */
91
    size_t fd_offset;		/**< current file offset */
92
93
    int page_size;
94
#endif /* !DISABLE_AIO */
1 by Suren A. Chilingaryan
Initial release
95
} fastwriter_default_t;
96
97
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
98
int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags_t flags) {
1 by Suren A. Chilingaryan
Initial release
99
    int err;
100
    char fs[16];
101
9 by Suren A. Chilingaryan
Support XFS RealTime partition
102
#ifndef DISABLE_XFS_REALTIME
103
    struct fsxattr attr;
104
#endif /* !DISABLE_XFS_REALTIME */
105
1 by Suren A. Chilingaryan
Initial release
106
    int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE);
107
    int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
108
109
    
110
    fastwriter_default_t *ctx;
111
5 by Suren A. Chilingaryan
Properly detect /dev/null as raw device and do not set DIRECT flag on raw devices
112
    err = fastwriter_get_file_fs(name, sizeof(fs) - 1, fs);
1 by Suren A. Chilingaryan
Initial release
113
    if (err) return err;
114
    
115
    ctx = (fastwriter_default_t*)malloc(sizeof(fastwriter_default_t));
116
    if (!ctx) return ENOMEM;
117
118
    memset(ctx, 0, sizeof(fastwriter_default_t));
119
120
    fw->ctx = ctx;
121
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
122
#ifdef SYNC_MODE
123
    ctx->sync_mode = 1;
124
#endif /* SYNC_MODE */
125
126
    ctx->prior_size = 0;
127
1 by Suren A. Chilingaryan
Initial release
128
    if (!strcmp(fs, "raw")) {
129
	ctx->wr_block = EXT4_WRITEBLOCK;
130
	ctx->pa_block = 0;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
131
	ctx->prior_size = (size_t)-1;
7 by Suren A. Chilingaryan
Clean open flags in raw mode
132
	open_flags &= ~(O_CREAT|O_NOATIME|O_LARGEFILE);
1 by Suren A. Chilingaryan
Initial release
133
    } else if (!strcmp(fs, "ext4")) {
134
	ctx->wr_block = EXT4_WRITEBLOCK;
135
	ctx->pa_block = EXT4_PREALLOCATE;
136
    } else if (!strcmp(fs, "btrfs")) {
137
	ctx->wr_block = EXT4_WRITEBLOCK;
138
	ctx->pa_block = EXT4_PREALLOCATE;
139
    } else if (!strcmp(fs, "xfs")) {
140
	ctx->wr_block = EXT4_WRITEBLOCK;
141
	ctx->pa_block = EXT4_PREALLOCATE;
13 by Suren A. Chilingaryan
AIO support
142
    } else if (!strcmp(fs, "ocfs2")) {
143
#ifndef DISABLE_AIO
144
	ctx->aio_mode = 1;
145
	ctx->sync_mode = 0;
146
	ctx->wr_block = OCFS_WRITEBLOCK;
147
#else /* !DISABLE_AIO */
148
	ctx->wr_block = EXT4_WRITEBLOCK;
149
#endif /* !DISABLE_AIO */
150
	ctx->pa_block = EXT4_PREALLOCATE;
15 by Suren A. Chilingaryan
Do not use O_DIRECT by default
151
/*    } else if (!strcmp(fs, "fhgfs")) {
152
	ctx->sync_mode = 0;
153
	ctx->wr_block = OCFS_WRITEBLOCK;
154
	ctx->pa_block = EXT4_PREALLOCATE;
155
    } else if (strstr(fs, "gluster")) {
156
	ctx->sync_mode = 0;
157
	ctx->wr_block = OCFS_WRITEBLOCK;
158
	ctx->pa_block = EXT4_PREALLOCATE;*/
1 by Suren A. Chilingaryan
Initial release
159
    } else {
15 by Suren A. Chilingaryan
Do not use O_DIRECT by default
160
	ctx->sync_mode = 0;
161
	ctx->wr_block = OCFS_WRITEBLOCK;
1 by Suren A. Chilingaryan
Initial release
162
	ctx->pa_block = 0;
163
    }
13 by Suren A. Chilingaryan
AIO support
164
5 by Suren A. Chilingaryan
Properly detect /dev/null as raw device and do not set DIRECT flag on raw devices
165
    if (ctx->sync_mode) {
166
	open_flags |= O_DIRECT;
167
    }
168
1 by Suren A. Chilingaryan
Initial release
169
    if (flags&FASTWRITER_FLAGS_OVERWRITE)
170
	open_flags |= O_TRUNC;
171
172
    ctx->fd = open(name, open_flags, open_mode);
10 by Suren A. Chilingaryan
Try without O_DIRECT if run under normal user
173
    if (ctx->fd < 0) {
174
	    // Running as normal user, try to disable direct mode
175
	if ((errno == EINVAL)&&(ctx->sync_mode)) {
176
	    ctx->sync_mode = 0;
177
	    open_flags &= ~O_DIRECT;
178
	    ctx->fd = open(name, open_flags, open_mode);
179
	}
180
	if (ctx->fd < 0) return errno;
181
    }
1 by Suren A. Chilingaryan
Initial release
182
183
    if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) {
13 by Suren A. Chilingaryan
AIO support
184
	ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
185
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
186
	if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) {
187
	    close(ctx->fd);
188
	    
189
	    ctx->fd = open(name, open_flags&~O_DIRECT, open_mode);
190
	    if (ctx->fd < 0) return errno;
191
	    
13 by Suren A. Chilingaryan
AIO support
192
	    ctx->prior_size = lseek64(ctx->fd, 0, SEEK_END);
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
193
	    
194
	    ctx->sync_mode = 0;
13 by Suren A. Chilingaryan
AIO support
195
	    ctx->aio_mode = 0;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
196
	}
1 by Suren A. Chilingaryan
Initial release
197
    }
198
9 by Suren A. Chilingaryan
Support XFS RealTime partition
199
#ifndef DISABLE_XFS_REALTIME
200
    if (!strcmp(fs, "xfs")) {
201
	err = xfsctl (name, ctx->fd, XFS_IOC_FSGETXATTR, (void *) &attr);
202
	if (!err) {
203
	    attr.fsx_xflags |= XFS_XFLAG_REALTIME;
204
	    err = xfsctl (name, ctx->fd, XFS_IOC_FSSETXATTR, (void *) &attr);
13 by Suren A. Chilingaryan
AIO support
205
	    if (err) fprintf(stderr, "Error initializing XFS real-time mode (%i), disabling...\n", err);
9 by Suren A. Chilingaryan
Support XFS RealTime partition
206
	}
207
    }
208
#endif /* !DISABLE_XFS_REALTIME */
209
13 by Suren A. Chilingaryan
AIO support
210
#ifndef DISABLE_AIO
211
    if (ctx->aio_mode) {
212
	int i;
213
	ctx->page_size = getpagesize();
214
	ctx->fd_offset = ctx->prior_size;
215
216
	ctx->ios_ready_n = AIO_QUEUE_LENGTH;
217
	for (i = 0; i < AIO_QUEUE_LENGTH; i++) {
218
	    ctx->ios_ready[i] = i;
219
	}
220
	
221
	err = io_queue_init(AIO_QUEUE_LENGTH, &ctx->aio);
222
	if (err) {
223
	    fprintf(stderr, "Error initializing AIO mode (%i), disabling...\n", -err);
224
	    ctx->aio_mode = 0;
225
	}
226
    }
227
#endif /* !DISABLE_AIO */
228
1 by Suren A. Chilingaryan
Initial release
229
    ctx->preallocated = 0;
230
231
    return 0;
232
}
233
234
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
235
void fastwriter_default_close(fastwriter_t *fw) {
1 by Suren A. Chilingaryan
Initial release
236
    if (fw->ctx) {
237
	fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
238
239
	if (ctx->fd >= 0) {
13 by Suren A. Chilingaryan
AIO support
240
#ifndef DISABLE_AIO
241
	    if ((ctx->aio_mode)&&(ctx->aio)) {
242
		int n_ev;
243
		struct io_event ev[AIO_QUEUE_LENGTH];
244
		
245
		while (ctx->ios_ready_n < AIO_QUEUE_LENGTH) {
246
		    n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
247
		    if (n_ev <= 0) {
248
			fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
249
			break;
250
		    }
251
		    ctx->ios_ready_n += n_ev;
252
		}
253
		
254
		io_queue_release(ctx->aio);
255
	    }
256
#endif /* DISABLE_AIO */
257
258
#ifdef HAVE_LINUX_FALLOC_H
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
259
	    if (ctx->prior_size != (size_t)-1) {
13 by Suren A. Chilingaryan
AIO support
260
#else /* HAVE_LINUX_FALLOC_H */
261
	    if ((ctx->prior_size != (size_t)-1)&&((ctx->sync_mode)||(ctx->aio_mode))) {
262
#endif /* HAVE_LINUX_FALLOC_H */
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
263
		ftruncate(ctx->fd, ctx->prior_size + fw->written);
1 by Suren A. Chilingaryan
Initial release
264
	    }
265
	    close(ctx->fd);
266
	}
267
	
268
	free(ctx);
269
	fw->ctx = NULL;
270
    }
271
}
272
273
1.1.1 by Suren A. Chilingaryan
Compile-in default api descriptor
274
int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) {
1 by Suren A. Chilingaryan
Initial release
275
    size_t sum = 0;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
276
    size_t delta = 0;
1 by Suren A. Chilingaryan
Initial release
277
    ssize_t res;
278
    fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx;
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
279
1 by Suren A. Chilingaryan
Initial release
280
    if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) {
281
	if (size < ctx->wr_block) {
282
	    *written = 0;
283
	    return 0;
284
	}
13 by Suren A. Chilingaryan
AIO support
285
1 by Suren A. Chilingaryan
Initial release
286
        size -= size % ctx->wr_block;
287
    }
288
289
    if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) {
290
#ifdef HAVE_LINUX_FALLOC_H
291
    	if (fallocate(ctx->fd, FALLOC_FL_KEEP_SIZE, ctx->preallocated, ctx->pa_block)) {
292
#else /* HAVE_LINUX_FALLOC_H */
293
    	if (posix_fallocate(ctx->fd, ctx->preallocated, ctx->pa_block)) {
294
#endif /* HAVE_LINUX_FALLOC_H */
295
	    ctx->pa_block = 0;
296
	} else {
297
	    ctx->preallocated += ctx->pa_block;
298
	}
299
    }
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
300
301
	// we expect this to happen only at last iteration (buffer is multiply of the required align)
13 by Suren A. Chilingaryan
AIO support
302
    if (((ctx->aio_mode)||(ctx->sync_mode))&&(size%FASTWRITER_SYNCIO_ALIGN)) {
4 by Suren A. Chilingaryan
Few synchronization and alignment related fixes
303
	delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN;
304
    }
1 by Suren A. Chilingaryan
Initial release
305
    
13 by Suren A. Chilingaryan
AIO support
306
#ifndef DISABLE_AIO
307
    if (ctx->aio_mode) {
308
	int err;
309
	size_t done = 0;
310
	size_t sched = 0;
311
312
	fastwriter_data_t *iodata;
313
	struct iocb *newio;
314
	size_t wr_block = ctx->wr_block;
315
316
	do {
317
	    if (!ctx->ios_ready_n) {
318
		int i, n_ev;
319
		struct io_event ev[AIO_QUEUE_LENGTH];
320
		
321
		n_ev = io_getevents(ctx->aio, 1, AIO_QUEUE_LENGTH, &ev[0], NULL);
322
		if (n_ev <= 0) {
323
		    fprintf(stderr, "AIO io_getevents have failed (%i)", -n_ev);
324
		    return -n_ev;
325
		}
326
		
327
		for (i = 0; i < n_ev; i++) {
328
		    fastwriter_data_t *ev_data = (fastwriter_data_t *)(ev[i].data);
329
		    if ((ev[i].res2)||(ev[i].res < ev_data->size)) {
330
			fprintf(stderr, "AIO write failed (res: %li, res2: %li, expected: %zu), no handling data will be corrupted...\n", ev[i].res, ev[i].res2, ev_data->size);
331
			return -ev[i].res2;
332
		    }
333
334
		    ctx->ios_ready[ctx->ios_ready_n++] = ev_data->ios;
335
//		    printf("Data: %i (ios %i)\n", ev_data->ready, ev_data->ios);
336
		    ev_data->ready = 2;
337
		}
338
		
339
		while (ctx->data[ctx->data_tail].ready > 1) {
340
//		    printf("Done: %i %zu\n", ctx->data_tail, ctx->data[ctx->data_tail].offset);
341
		    ctx->data[ctx->data_tail].ready = 0;
342
343
		    done += ctx->data[ctx->data_tail].size;
344
		    if ((++ctx->data_tail) == AIO_BUFFERS) ctx->data_tail = 0;
345
		}
346
	    }
347
	    
348
	    if ((ctx->sched + sched) < size) {
349
		if ((ctx->data_head == ctx->data_tail)&&(ctx->data[ctx->data_head].ready)) continue;
350
351
		newio = (struct iocb*)&ctx->ios[ctx->ios_ready[--ctx->ios_ready_n]];
352
	        iodata = &ctx->data[ctx->data_head];
353
354
		if (wr_block > ((size + delta) - (ctx->sched + sched))) {
355
		    wr_block = (size + delta) - (ctx->sched + sched);
356
		    if (wr_block % ctx->page_size) {
357
			fprintf(stderr, "We need to write incomplete page (%zu bytes). This is no supported yet...\n", wr_block);
358
			return -1;
359
		    }
360
		}
361
		
362
//		printf("Sched: %lu => %lu (%lu) [tail %lu, head %lu]\n", ctx->sched + sched, ctx->fd_offset, wr_block, ctx->data_tail, ctx->data_head);
363
364
		iodata->offset = ctx->fd_offset;
365
		iodata->size = wr_block;
366
		iodata->ios = ctx->ios_ready_n;
367
368
		io_prep_pwrite(newio, ctx->fd, data + ctx->sched + sched, wr_block, ctx->fd_offset);
369
		io_set_callback(newio, (void*)iodata);
370
		err = io_submit(ctx->aio, 1, &newio);
371
		if (err != 1) {
372
		    fprintf(stderr, "Error submiting AIO job (%i)\n", -err);
373
		    return -err;
374
		}
375
376
		iodata->ready = 1;
377
		sched += wr_block;
378
		ctx->fd_offset += wr_block;
379
		if ((++ctx->data_head) == AIO_BUFFERS) ctx->data_head = 0;
380
	    }
381
	} while (!done);
382
383
	ctx->sched += sched - done;
384
	size = done;
385
    } else {
386
#endif /* !DISABLE_AIO */
387
	do {
388
	    res = write(ctx->fd, data + sum, size + delta - sum);
389
	    if (res < 0) {
390
		*written = sum;
391
		return errno;
392
	    }
1 by Suren A. Chilingaryan
Initial release
393
	
13 by Suren A. Chilingaryan
AIO support
394
	    sum += res;
395
	} while (sum < size);
396
#ifndef DISABLE_AIO
397
    }
398
#endif /* !DISABLE_AIO */
399
400
    if ((ctx->sync_mode)||(ctx->aio_mode)) {
401
	posix_fadvise(ctx->fd, fw->written, size, POSIX_FADV_DONTNEED);
402
    }
403
1 by Suren A. Chilingaryan
Initial release
404
    *written = size;
13 by Suren A. Chilingaryan
AIO support
405
1 by Suren A. Chilingaryan
Initial release
406
    return 0;
407
}