Drizzled Public API Documentation

mf_iocache.cc
1 /* Copyright (C) 2000 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17  Cashing of files with only does (sequential) read or writes of fixed-
18  length records. A read isn't allowed to go over file-length. A read is ok
19  if it ends at file-length and next read can try to read after file-length
20  (and get a EOF-error).
21  Possibly use of asyncronic io.
22  macros for read and writes for faster io.
23  Used instead of FILE when reading or writing whole files.
24  This code makes mf_rec_cache obsolete (currently only used by ISAM)
25  One can change info->pos_in_file to a higher value to skip bytes in file if
26  also info->read_pos is set to info->read_end.
27  If called through open_cached_file(), then the temporary file will
28  only be created if a write exeeds the file buffer or if one calls
29  my_b_flush_io_cache().
30 
31  If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
32  reading and another for writing. Reads are first done from disk and
33  then done from the write buffer. This is an efficient way to read
34  from a log file when one is writing to it at the same time.
35  For this to work, the file has to be opened in append mode!
36  Note that when one uses SEQ_READ_APPEND, one MUST write using
37  my_b_append ! This is needed because we need to lock the mutex
38  every time we access the write buffer.
39 
40 TODO:
41  When one SEQ_READ_APPEND and we are reading and writing at the same time,
42  each time the write buffer gets full and it's written to disk, we will
43  always do a disk read to read a part of the buffer from disk to the
44  read buffer.
45  This should be fixed so that when we do a my_b_flush_io_cache() and
46  we have been reading the write buffer, we should transfer the rest of the
47  write buffer to the read buffer before we start to reuse it.
48 */
49 
50 #include <config.h>
51 
52 #include <drizzled/definitions.h>
53 #include <drizzled/error_t.h>
54 #include <drizzled/error.h>
55 #include <drizzled/internal/my_sys.h>
56 #include <drizzled/internal/m_string.h>
57 #include <drizzled/drizzled.h>
58 #include <drizzled/internal/iocache.h>
59 #include <errno.h>
60 #include <drizzled/util/test.h>
61 #include <stdlib.h>
62 #include <algorithm>
63 
64 using namespace std;
65 
66 namespace drizzled {
67 namespace internal {
68 
69 static int _my_b_read(io_cache_st *info, unsigned char *Buffer, size_t Count);
70 static int _my_b_write(io_cache_st *info, const unsigned char *Buffer, size_t Count);
71 
76 inline
77 static void lock_append_buffer(io_cache_st *, int )
78 {
79 }
80 
85 inline
86 static void unlock_append_buffer(io_cache_st *, int )
87 {
88 }
89 
94 inline
95 static size_t io_round_up(size_t x)
96 {
97  return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
98 }
99 
104 inline
105 static size_t io_round_dn(size_t x)
106 {
107  return (x & ~(IO_SIZE-1));
108 }
109 
110 
121 void io_cache_st::setup_io_cache()
122 {
123  /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
124  if (type == WRITE_CACHE)
125  {
126  current_pos= &write_pos;
127  current_end= &write_end;
128  }
129  else
130  {
131  current_pos= &read_pos;
132  current_end= &read_end;
133  }
134 }
135 
136 
137 void io_cache_st::init_functions()
138 {
139  switch (type) {
140  case READ_NET:
141  /*
142  Must be initialized by the caller. The problem is that
143  _my_b_net_read has to be defined in sql directory because of
144  the dependency on THD, and therefore cannot be visible to
145  programs that link against mysys but know nothing about THD, such
146  as myisamchk
147  */
148  break;
149  default:
150  read_function = _my_b_read;
151  write_function = _my_b_write;
152  }
153 
154  setup_io_cache();
155 }
156 
174 int io_cache_st::init_io_cache(int file_arg, size_t cachesize,
175  enum cache_type type_arg, my_off_t seek_offset,
176  bool use_async_io, myf cache_myflags)
177 {
178  size_t min_cache;
179  off_t pos;
180  my_off_t end_of_file_local= ~(my_off_t) 0;
181 
182  file= file_arg;
183  type= TYPE_NOT_SET; /* Don't set it until mutex are created */
184  pos_in_file= seek_offset;
185  pre_close = pre_read = post_read = 0;
186  arg = 0;
187  alloced_buffer = 0;
188  buffer=0;
189  seek_not_done= 0;
190 
191  if (file >= 0)
192  {
193  pos= lseek(file, 0, SEEK_CUR);
194  if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
195  {
196  /*
197  This kind of object doesn't support seek() or tell(). Don't set a
198  flag that will make us again try to seek() later and fail.
199  */
200  seek_not_done= 0;
201  /*
202  Additionally, if we're supposed to start somewhere other than the
203  the beginning of whatever this file is, then somebody made a bad
204  assumption.
205  */
206  assert(seek_offset == 0);
207  }
208  else
209  seek_not_done= test(seek_offset != (my_off_t)pos);
210  }
211 
212  if (!cachesize && !(cachesize= my_default_record_cache_size))
213  return 1; /* No cache requested */
214  min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
215  if (type_arg == READ_CACHE)
216  { /* Assume file isn't growing */
217  if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
218  {
219  /* Calculate end of file to avoid allocating oversized buffers */
220  end_of_file_local=lseek(file,0L,SEEK_END);
221  /* Need to reset seek_not_done now that we just did a seek. */
222  seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
223  if (end_of_file_local < seek_offset)
224  end_of_file_local=seek_offset;
225  /* Trim cache size if the file is very small */
226  if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
227  {
228  cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
229  use_async_io=0; /* No need to use async */
230  }
231  }
232  }
233  cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
234  if (type_arg != READ_NET && type_arg != WRITE_NET)
235  {
236  /* Retry allocating memory in smaller blocks until we get one */
237  cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
238  if (cachesize < min_cache)
239  cachesize = min_cache;
240  size_t buffer_block= cachesize;
241  if (type_arg == READ_CACHE && not global_read_buffer.add(buffer_block))
242  {
243  my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
244  return 2;
245  }
246 
247  buffer= (unsigned char*) malloc(buffer_block);
248  write_buffer=buffer;
249  alloced_buffer= true;
250  }
251 
252  read_length=buffer_length=cachesize;
253  myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
254  request_pos= read_pos= write_pos = buffer;
255 
256  if (type_arg == WRITE_CACHE)
257  write_end=
258  buffer+buffer_length- (seek_offset & (IO_SIZE-1));
259  else
260  read_end=buffer; /* Nothing in cache */
261 
262  /* End_of_file may be changed by user later */
263  end_of_file= end_of_file_local;
264  error= 0;
265  type= type_arg;
266  init_functions();
267  return 0;
268 } /* init_io_cache */
269 
280 bool io_cache_st::reinit_io_cache(enum cache_type type_arg,
281  my_off_t seek_offset,
282  bool,
283  bool clear_cache)
284 {
285  /* One can't do reinit with the following types */
286  assert(type_arg != READ_NET && type != READ_NET &&
287  type_arg != WRITE_NET && type != WRITE_NET);
288 
289  /* If the whole file is in memory, avoid flushing to disk */
290  if (! clear_cache &&
291  seek_offset >= pos_in_file &&
292  seek_offset <= tell())
293  {
294  /* Reuse current buffer without flushing it to disk */
295  unsigned char *pos;
296  if (type == WRITE_CACHE && type_arg == READ_CACHE)
297  {
298  read_end=write_pos;
299  end_of_file= tell();
300  /*
301  Trigger a new seek only if we have a valid
302  file handle.
303  */
304  seek_not_done= (file != -1);
305  }
306  else if (type_arg == WRITE_CACHE)
307  {
308  if (type == READ_CACHE)
309  {
310  write_end=write_buffer+buffer_length;
311  seek_not_done=1;
312  }
313  end_of_file = ~(my_off_t) 0;
314  }
315  pos=request_pos+(seek_offset-pos_in_file);
316  if (type_arg == WRITE_CACHE)
317  write_pos=pos;
318  else
319  read_pos= pos;
320  }
321  else
322  {
323  /*
324  If we change from WRITE_CACHE to READ_CACHE, assume that everything
325  after the current positions should be ignored
326  */
327  if (type == WRITE_CACHE && type_arg == READ_CACHE)
328  end_of_file= tell();
329  /* flush cache if we want to reuse it */
330  if (!clear_cache && flush(1))
331  return 1;
332  pos_in_file=seek_offset;
333  /* Better to do always do a seek */
334  seek_not_done=1;
335  request_pos=read_pos=write_pos=buffer;
336  if (type_arg == READ_CACHE)
337  {
338  read_end=buffer; /* Nothing in cache */
339  }
340  else
341  {
342  write_end= (buffer + buffer_length - (seek_offset & (IO_SIZE-1)));
343  end_of_file= ~(my_off_t) 0;
344  }
345  }
346  type= type_arg;
347  error=0;
348  init_functions();
349 
350  return 0;
351 } /* reinit_io_cache */
352 
373 static int _my_b_read(io_cache_st *info, unsigned char *Buffer, size_t Count)
374 {
375  size_t length_local,diff_length,left_length, max_length;
376  my_off_t pos_in_file_local;
377 
378  if ((left_length= (size_t) (info->read_end-info->read_pos)))
379  {
380  assert(Count >= left_length); /* User is not using my_b_read() */
381  memcpy(Buffer,info->read_pos, left_length);
382  Buffer+=left_length;
383  Count-=left_length;
384  }
385 
386  /* pos_in_file always point on where info->buffer was read */
387  pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
388 
389  /*
390  Whenever a function which operates on io_cache_st flushes/writes
391  some part of the io_cache_st to disk it will set the property
392  "seek_not_done" to indicate this to other functions operating
393  on the io_cache_st.
394  */
395  if (info->seek_not_done)
396  {
397  if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
398  {
399  /* No error, reset seek_not_done flag. */
400  info->seek_not_done= 0;
401  }
402  else
403  {
404  /*
405  If the seek failed and the error number is ESPIPE, it is because
406  info->file is a pipe or socket or FIFO. We never should have tried
407  to seek on that. See Bugs#25807 and #22828 for more info.
408  */
409  assert(errno != ESPIPE);
410  info->error= -1;
411  return 1;
412  }
413  }
414 
415  diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
416  if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
417  { /* Fill first intern buffer */
418  size_t read_length;
419  if (info->end_of_file <= pos_in_file_local)
420  { /* End of file */
421  info->error= (int) left_length;
422  return 1;
423  }
424  length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
425  if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
426  {
427  info->error= (read_length == (size_t) -1 ? -1 :
428  (int) (read_length+left_length));
429  return 1;
430  }
431  Count-= length_local;
432  Buffer+= length_local;
433  pos_in_file_local+= length_local;
434  left_length+= length_local;
435  diff_length=0;
436  }
437 
438  max_length= info->read_length-diff_length;
439  if (info->type != READ_FIFO &&
440  max_length > (info->end_of_file - pos_in_file_local))
441  max_length= (size_t) (info->end_of_file - pos_in_file_local);
442  if (!max_length)
443  {
444  if (Count)
445  {
446  info->error= static_cast<int>(left_length); /* We only got this many char */
447  return 1;
448  }
449  length_local=0; /* Didn't read any chars */
450  }
451  else if (( length_local= my_read(info->file,info->buffer, max_length,
452  info->myflags)) < Count ||
453  length_local == (size_t) -1)
454  {
455  if ( length_local != (size_t) -1)
456  memcpy(Buffer, info->buffer, length_local);
457  info->pos_in_file= pos_in_file_local;
458  info->error= length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
459  info->read_pos=info->read_end=info->buffer;
460  return 1;
461  }
462  info->read_pos=info->buffer+Count;
463  info->read_end=info->buffer+ length_local;
464  info->pos_in_file=pos_in_file_local;
465  memcpy(Buffer, info->buffer, Count);
466  return 0;
467 }
468 
473 int io_cache_st::get()
474 {
475  if (read_pos != read_end)
476  return *read_pos++;
477 
478  if (pre_read)
479  pre_read(this);
480 
481  unsigned char buff;
482  if (read_function(this, &buff, 1))
483  return my_b_EOF;
484 
485  if (post_read)
486  post_read(this);
487 
488  return buff;
489 }
490 
499 int _my_b_write(io_cache_st *info, const unsigned char *Buffer, size_t Count)
500 {
501  size_t rest_length,length_local;
502 
503  if (info->pos_in_file+info->buffer_length > info->end_of_file)
504  {
505  errno=EFBIG;
506  return info->error = -1;
507  }
508 
509  rest_length= (size_t) (info->write_end - info->write_pos);
510  memcpy(info->write_pos,Buffer,(size_t) rest_length);
511  Buffer+=rest_length;
512  Count-=rest_length;
513  info->write_pos+=rest_length;
514 
515  if (info->flush(1))
516  return 1;
517  if (Count >= IO_SIZE)
518  { /* Fill first intern buffer */
519  length_local=Count & (size_t) ~(IO_SIZE-1);
520  if (info->seek_not_done)
521  {
522  /*
523  Whenever a function which operates on io_cache_st flushes/writes
524  some part of the io_cache_st to disk it will set the property
525  "seek_not_done" to indicate this to other functions operating
526  on the io_cache_st.
527  */
528  if (lseek(info->file,info->pos_in_file,SEEK_SET))
529  {
530  info->error= -1;
531  return (1);
532  }
533  info->seek_not_done=0;
534  }
535  if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
536  return info->error= -1;
537 
538  Count-=length_local;
539  Buffer+=length_local;
540  info->pos_in_file+=length_local;
541  }
542  memcpy(info->write_pos,Buffer,(size_t) Count);
543  info->write_pos+=Count;
544  return 0;
545 }
546 
553 static int my_block_write(io_cache_st *info, const unsigned char *Buffer, size_t Count, my_off_t pos)
554 {
555  size_t length_local;
556  int error=0;
557 
558  if (pos < info->pos_in_file)
559  {
560  /* Of no overlap, write everything without buffering */
561  if (pos + Count <= info->pos_in_file)
562  return (pwrite(info->file, Buffer, Count, pos) == 0);
563  /* Write the part of the block that is before buffer */
564  length_local= (uint32_t) (info->pos_in_file - pos);
565  if (pwrite(info->file, Buffer, length_local, pos) == 0)
566  info->error= error= -1;
567  Buffer+=length_local;
568  pos+= length_local;
569  Count-= length_local;
570  }
571 
572  /* Check if we want to write inside the used part of the buffer.*/
573  length_local= (size_t) (info->write_end - info->buffer);
574  if (pos < info->pos_in_file + length_local)
575  {
576  size_t offset= (size_t) (pos - info->pos_in_file);
577  length_local-=offset;
578  if (length_local > Count)
579  length_local=Count;
580  memcpy(info->buffer+offset, Buffer, length_local);
581  Buffer+=length_local;
582  Count-= length_local;
583  /* Fix length_local of buffer if the new data was larger */
584  if (info->buffer+length_local > info->write_pos)
585  info->write_pos=info->buffer+length_local;
586  if (!Count)
587  return (error);
588  }
589  /* Write at the end of the current buffer; This is the normal case */
590  if (_my_b_write(info, Buffer, Count))
591  error= -1;
592  return error;
593 }
594 
595 int io_cache_st::block_write(const void* Buffer, size_t Count, my_off_t pos)
596 {
597  return my_block_write(this, reinterpret_cast<const unsigned char*>(Buffer), Count, pos);
598 }
599 
604 static int my_b_flush_io_cache(io_cache_st *info, int need_append_buffer_lock)
605 {
606  size_t length_local;
607  bool append_cache= false;
608  my_off_t pos_in_file_local;
609 
610  if (info->type == WRITE_CACHE || append_cache)
611  {
612  if (info->file == -1)
613  {
614  if (info->real_open_cached_file())
615  return((info->error= -1));
616  }
617  lock_append_buffer(info, need_append_buffer_lock);
618 
619  if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
620  {
621  pos_in_file_local=info->pos_in_file;
622  /*
623  If we have append cache, we always open the file with
624  O_APPEND which moves the pos to EOF automatically on every write
625  */
626  if (!append_cache && info->seek_not_done)
627  { /* File touched, do seek */
628  if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
629  {
630  unlock_append_buffer(info, need_append_buffer_lock);
631  return((info->error= -1));
632  }
633  if (!append_cache)
634  info->seek_not_done=0;
635  }
636  if (!append_cache)
637  info->pos_in_file+=length_local;
638  info->write_end= (info->write_buffer+info->buffer_length-
639  ((pos_in_file_local+length_local) & (IO_SIZE-1)));
640 
641  if (my_write(info->file,info->write_buffer,length_local,
642  info->myflags | MY_NABP))
643  info->error= -1;
644  else
645  info->error= 0;
646  if (!append_cache)
647  {
648  set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
649  }
650  else
651  {
652  info->end_of_file+=(info->write_pos-info->append_read_pos);
653  my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
654  assert(info->end_of_file == tell_ret);
655  }
656 
657  info->append_read_pos=info->write_pos=info->write_buffer;
658  unlock_append_buffer(info, need_append_buffer_lock);
659  return(info->error);
660  }
661  }
662  unlock_append_buffer(info, need_append_buffer_lock);
663  return 0;
664 }
665 
666 int io_cache_st::flush(int need_append_buffer_lock)
667 {
668  return my_b_flush_io_cache(this, need_append_buffer_lock);
669 }
670 
685 int io_cache_st::end_io_cache()
686 {
687  int _error=0;
688 
689  if (pre_close)
690  {
691  (*pre_close)(this);
692  pre_close= 0;
693  }
694  if (alloced_buffer)
695  {
696  if (type == READ_CACHE)
697  global_read_buffer.sub(buffer_length);
698  alloced_buffer=0;
699  if (file != -1) /* File doesn't exist */
700  _error= my_b_flush_io_cache(this, 1);
701  free((unsigned char*) buffer);
702  buffer=read_pos=(unsigned char*) 0;
703  }
704 
705  return _error;
706 } /* end_io_cache */
707 
708 } /* namespace internal */
709 } /* namespace drizzled */