/alps/fwbench

To get this branch, use:
bzr branch http://suren.me/webbzr/alps/fwbench

« back to all changes in this revision

Viewing changes to seqreader.c

  • Committer: Suren A. Chilingaryan
  • Date: 2012-11-25 07:09:41 UTC
  • Revision ID: csa@dside.dyndns.org-20121125070941-p96btyg84ca93e1x
Use kernel asynchronous i/o in seqreader

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
 
3
3
#include <stdio.h>
4
4
#include <stdlib.h>
 
5
#include <stdint.h>
5
6
#include <sys/types.h>
6
7
#include <sys/stat.h>
7
8
#include <sys/time.h>
11
12
#include <string.h>
12
13
#include <errno.h>
13
14
 
 
15
#include <libaio.h>
 
16
 
14
17
#define FASTWRITER_SYNCIO_ALIGN 512
15
18
 
16
19
#define SYNC_MODE
17
 
#define BUFSIZE 2097152
 
20
#define AIO_MODE 2
 
21
#define EXTRA_BUFFERS 2
 
22
#define WRITE_INTERVAL 1
 
23
 
 
24
#define RAID_STRIP_SIZE         256
 
25
#define RAID_DISKS              8
 
26
#define STRIPS_AT_ONCE          2
 
27
 
 
28
#ifdef AIO_MODE
 
29
# define SYNC_MODE
 
30
#endif /* AIO_MODE */
 
31
 
18
32
#ifdef SYNC_MODE
19
 
# define BLOCK_SIZE 2097152
 
33
# define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
20
34
#else /* SYNC_MODE */
21
35
# define BLOCK_SIZE 16384
22
36
#endif /* SYNC_MODE */
23
 
#define WRITE_INTERVAL 1
 
37
 
 
38
#ifdef AIO_MODE
 
39
# define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
 
40
#else /* AIO_MODE */
 
41
# define BUFSIZE BLOCK_SIZE
 
42
#endif /* AIO_MODE */
24
43
 
25
44
 
26
45
int main(int argc, char *argv[]) {
33
52
    size_t files = 0;
34
53
    size_t total_size = 0;
35
54
    size_t last_write = 0;
 
55
    size_t last_size = 0;
36
56
    size_t skip;
37
57
    size_t run;
 
58
    size_t ready;
38
59
    ssize_t res;
39
60
    size_t max_size = (size_t)-1;
40
61
    char *buffer;//[BUFSIZE];
41
62
    long double mcoef = 1000000. / (1024 * 1024);
42
63
    int flags = O_RDONLY|O_NOATIME|O_LARGEFILE;
43
64
 
 
65
#ifdef AIO_MODE
 
66
    int i;
 
67
    size_t curio, schedio;
 
68
    int done[AIO_MODE + EXTRA_BUFFERS];
 
69
    
 
70
    io_context_t aio;
 
71
    struct iocb io[AIO_MODE], *ioptr[AIO_MODE];
 
72
 
 
73
    int events;
 
74
    struct io_event ev[AIO_MODE];
 
75
#endif /* AIO_MODE */
 
76
 
44
77
#ifdef SYNC_MODE
45
78
    flags |= O_DIRECT;
46
79
#endif
47
 
    
 
80
 
 
81
    printf("Used buffer: %i MB, Block: %i KB\n", BUFSIZE / 1024 / 1024, BLOCK_SIZE/1024);
 
82
 
48
83
    posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
49
84
    
50
85
    if (argc < 2) {
63
98
            printf("Unable to open device %s\n", argv[1]);
64
99
            exit(1);
65
100
        }
66
 
        
 
101
 
67
102
        size_t size = BLOCK_SIZE;
 
103
 
 
104
#ifdef AIO_MODE 
 
105
        memset(done, 0, sizeof(done));
 
106
        memset(&aio, 0, sizeof(aio));
 
107
        io_queue_init(AIO_MODE, &aio);
 
108
        for (i = 0; i < AIO_MODE; i++) {
 
109
            ioptr[i] = &io[i];
 
110
            memset(ioptr[i], 0, sizeof(struct iocb));
 
111
            io_prep_pread(ioptr[i], fd, buffer + i * BLOCK_SIZE, BLOCK_SIZE, i * BLOCK_SIZE);
 
112
            io_set_callback(ioptr[i], (void*)(uintptr_t)i);
 
113
        }
 
114
 
 
115
        curio = 0;
 
116
        schedio = AIO_MODE;
 
117
        events = 0;
 
118
#endif /* AIO_MODE */
68
119
        
69
120
        gettimeofday(&start, NULL);
70
 
        
 
121
 
 
122
#ifdef AIO_MODE 
 
123
        err = io_submit(aio, AIO_MODE, ioptr);
 
124
        if (err != AIO_MODE) {
 
125
            printf("io_submit returned %i\n", err);
 
126
            perror("Failed to submit initial AIO jobs");
 
127
        }
 
128
#endif /* AIO_MODE */
 
129
        
 
130
#ifdef AIO_MODE 
 
131
        ready = 0;
 
132
        while (1) {
 
133
            if (!done[curio%(AIO_MODE + EXTRA_BUFFERS)]) {
 
134
                err = io_getevents(aio, 1, AIO_MODE - events, &ev[events], NULL);
 
135
                if (err < 0) perror("Error waiting for AIO\n");
 
136
                
 
137
                if ((!ready)&&(err > 1)) {
 
138
                    printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
 
139
                    printf("      Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
 
140
                    printf("      More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
 
141
                }
 
142
                
 
143
                for (i = 0; i < err; i++) {
 
144
                    struct io_event *ep = &ev[events + i];
 
145
                    int doneio = (uintptr_t)ep->data;
 
146
                    if (ep->res2 || (ep->res != BLOCK_SIZE)) perror("Error in async IO");
 
147
                    done[doneio%(AIO_MODE + EXTRA_BUFFERS)] = 1;
 
148
//                  printf("done (%i): %i\n", i, doneio);
 
149
                }
 
150
                
 
151
                events += err;
 
152
                
 
153
                for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
 
154
                    struct iocb *newio = (struct iocb *)ev[i].obj;
 
155
                    memset(newio, 0, sizeof(struct iocb));
 
156
                    io_prep_pread(newio, fd, buffer + (schedio % (AIO_MODE + EXTRA_BUFFERS)) * BLOCK_SIZE, BLOCK_SIZE, schedio * BLOCK_SIZE);
 
157
                    io_set_callback(newio, (void*)(uintptr_t)schedio);
 
158
                    err = io_submit(aio, 1, &newio);
 
159
                    if (err != 1) perror("Failed to submit AIO jobs");
 
160
                    schedio++;
 
161
                }
 
162
                events = i + 1;
 
163
                
 
164
                if (events) {
 
165
                    printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
 
166
//                  printf("      curio (%zu), schedio (%zu)\n", curio, schedio);
 
167
                }
 
168
 
 
169
                ready = 1;
 
170
                continue;
 
171
            }
 
172
 
 
173
            done[curio%(AIO_MODE + EXTRA_BUFFERS)] = 0;
 
174
            curio++;
 
175
 
 
176
            res = BLOCK_SIZE;
 
177
#else /* AIO_MODE */
71
178
        res = read(fd, buffer, size);
72
179
        while (res > 0) {
 
180
#endif /* AIO_MODE */
 
181
 
73
182
            if (res != size) {
74
183
                printf("Incomplete read: %zu bytes read instead of %zu\n", res, size);
75
184
                exit(-1);
79
188
            gettimeofday(&tv, NULL);
80
189
            us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
81
190
            if ((us - last_write) > WRITE_INTERVAL * 1000000) {
 
191
                printf("Reading: %s (%lu GB),  Measured speed: %zu MB/s, Current speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us), (size_t)(mcoef * (total_size - last_size) / (us - last_write)));
82
192
                last_write = us;
83
 
                printf("Reading: %s (%lu GB),  Measured speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
 
193
                last_size = total_size;
84
194
            }
85
195
            
86
196
            if (total_size > max_size) {
88
198
                break;
89
199
            }
90
200
        
 
201
#ifndef AIO_MODE        
91
202
            res = read(fd, buffer, size);
 
203
#endif /* AIO_MODE */
92
204
        }
93
205
        
 
206
#ifdef AIO_MODE 
 
207
        io_queue_release(aio);
 
208
#endif /* AIO_MODE */
 
209
 
94
210
        close(fd);
95
211
 
96
212
        if (res < 0) {