00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00022 #include <config.h>
00023
00024 #include <drizzled/drizzled.h>
00025 #include <drizzled/error.h>
00026 #include <drizzled/internal/iocache.h>
00027 #include <drizzled/internal/my_sys.h>
00028 #include <drizzled/optimizer/range.h>
00029 #include <drizzled/plugin/storage_engine.h>
00030 #include <drizzled/records.h>
00031 #include <drizzled/session.h>
00032 #include <drizzled/table.h>
00033
00034 namespace drizzled
00035 {
00036
00037 static int rr_sequential(ReadRecord *info);
00038 static int rr_quick(ReadRecord *info);
00039 static int rr_from_tempfile(ReadRecord *info);
00040 static int rr_unpack_from_tempfile(ReadRecord *info);
00041 static int rr_unpack_from_buffer(ReadRecord *info);
00042 static int rr_from_pointers(ReadRecord *info);
00043 static int rr_from_cache(ReadRecord *info);
00044 static int rr_cmp(unsigned char *a,unsigned char *b);
00045 static int rr_index_first(ReadRecord *info);
00046 static int rr_index(ReadRecord *info);
00047
00048 void ReadRecord::init_reard_record_sequential()
00049 {
00050 read_record= rr_sequential;
00051 }
00052
00053 int ReadRecord::init_read_record_idx(Session *,
00054 Table *table_arg,
00055 bool print_error_arg,
00056 uint32_t idx)
00057 {
00058 table_arg->emptyRecord();
00059 table= table_arg;
00060 cursor= table->cursor;
00061 record= table->getInsertRecord();
00062 print_error= print_error_arg;
00063
00064 table->status=0;
00065 if (not table->cursor->inited)
00066 {
00067 int error= table->cursor->startIndexScan(idx, 1);
00068 if (error != 0)
00069 return error;
00070 }
00071
00072 read_record= rr_index_first;
00073
00074 return 0;
00075 }
00076
00077
00078 int ReadRecord::init_read_record(Session *session_arg,
00079 Table *table_arg,
00080 optimizer::SqlSelect *select_arg,
00081 int use_record_cache,
00082 bool print_error_arg)
00083 {
00084 internal::IO_CACHE *tempfile;
00085 int error= 0;
00086
00087 session= session_arg;
00088 table= table_arg;
00089 cursor= table->cursor;
00090 forms= &table;
00091
00092 if (table->sort.addon_field)
00093 {
00094 rec_buf= table->sort.addon_buf;
00095 ref_length= table->sort.addon_length;
00096 }
00097 else
00098 {
00099 table->emptyRecord();
00100 record= table->getInsertRecord();
00101 ref_length= table->cursor->ref_length;
00102 }
00103 select= select_arg;
00104 print_error= print_error_arg;
00105 ignore_not_found_rows= 0;
00106 table->status=0;
00107
00108 if (select && my_b_inited(select->file))
00109 {
00110 tempfile= select->file;
00111 }
00112 else
00113 {
00114 tempfile= table->sort.io_cache;
00115 }
00116
00117 if (tempfile && my_b_inited(tempfile))
00118 {
00119 read_record= (table->sort.addon_field ?
00120 rr_unpack_from_tempfile : rr_from_tempfile);
00121
00122 io_cache=tempfile;
00123 io_cache->reinit_io_cache(internal::READ_CACHE,0L,0,0);
00124 ref_pos=table->cursor->ref;
00125 if (!table->cursor->inited)
00126 {
00127 error= table->cursor->startTableScan(0);
00128 if (error != 0)
00129 return error;
00130 }
00131
00132
00133
00134
00135
00136
00137 if (!table->sort.addon_field &&
00138 session->variables.read_rnd_buff_size &&
00139 !(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) &&
00140 (table->db_stat & HA_READ_ONLY ||
00141 table->reginfo.lock_type <= TL_READ_NO_INSERT) &&
00142 (uint64_t) table->getShare()->getRecordLength() * (table->cursor->stats.records+
00143 table->cursor->stats.deleted) >
00144 (uint64_t) MIN_FILE_LENGTH_TO_USE_ROW_CACHE &&
00145 io_cache->end_of_file/ref_length * table->getShare()->getRecordLength() >
00146 (internal::my_off_t) MIN_ROWS_TO_USE_TABLE_CACHE &&
00147 !table->getShare()->blob_fields &&
00148 ref_length <= MAX_REFLENGTH)
00149 {
00150 if (init_rr_cache())
00151 {
00152 read_record= rr_from_cache;
00153 }
00154 }
00155 }
00156 else if (select && select->quick)
00157 {
00158 read_record= rr_quick;
00159 }
00160 else if (table->sort.record_pointers)
00161 {
00162 error= table->cursor->startTableScan(0);
00163 if (error != 0)
00164 return error;
00165
00166 cache_pos=table->sort.record_pointers;
00167 cache_end= cache_pos+ table->sort.found_records * ref_length;
00168 read_record= (table->sort.addon_field ? rr_unpack_from_buffer : rr_from_pointers);
00169 }
00170 else
00171 {
00172 read_record= rr_sequential;
00173 error= table->cursor->startTableScan(1);
00174 if (error != 0)
00175 return error;
00176
00177
00178 if (!table->no_cache &&
00179 (use_record_cache > 0 ||
00180 (int) table->reginfo.lock_type <= (int) TL_READ_WITH_SHARED_LOCKS ||
00181 !(table->getShare()->db_options_in_use & HA_OPTION_PACK_RECORD)))
00182 {
00183 table->cursor->extra_opt(HA_EXTRA_CACHE, session->variables.read_buff_size);
00184 }
00185 }
00186
00187 return 0;
00188 }
00189
00190
00191 void ReadRecord::end_read_record()
00192 {
00193 if (cache)
00194 {
00195 global_read_rnd_buffer.sub(session->variables.read_rnd_buff_size);
00196 free((char*) cache);
00197 cache= NULL;
00198 }
00199 if (table)
00200 {
00201 table->filesort_free_buffers();
00202 (void) cursor->extra(HA_EXTRA_NO_CACHE);
00203 if (read_record != rr_quick)
00204 (void) cursor->ha_index_or_rnd_end();
00205
00206 table= NULL;
00207 }
00208 }
00209
00210 static int rr_handle_error(ReadRecord *info, int error)
00211 {
00212 if (error == HA_ERR_END_OF_FILE)
00213 error= -1;
00214 else
00215 {
00216 if (info->print_error)
00217 info->table->print_error(error, MYF(0));
00218 if (error < 0)
00219 error= 1;
00220 }
00221 return error;
00222 }
00223
00225 static int rr_quick(ReadRecord *info)
00226 {
00227 int tmp;
00228 while ((tmp= info->select->quick->get_next()))
00229 {
00230 if (info->session->getKilled())
00231 {
00232 my_error(ER_SERVER_SHUTDOWN, MYF(0));
00233 return 1;
00234 }
00235 if (tmp != HA_ERR_RECORD_DELETED)
00236 {
00237 tmp= rr_handle_error(info, tmp);
00238 break;
00239 }
00240 }
00241
00242 return tmp;
00243 }
00244
00257 static int rr_index_first(ReadRecord *info)
00258 {
00259 int tmp= info->cursor->index_first(info->record);
00260 info->read_record= rr_index;
00261 if (tmp)
00262 tmp= rr_handle_error(info, tmp);
00263 return tmp;
00264 }
00265
00281 static int rr_index(ReadRecord *info)
00282 {
00283 int tmp= info->cursor->index_next(info->record);
00284 if (tmp)
00285 tmp= rr_handle_error(info, tmp);
00286 return tmp;
00287 }
00288
00289 int rr_sequential(ReadRecord *info)
00290 {
00291 int tmp;
00292 while ((tmp= info->cursor->rnd_next(info->record)))
00293 {
00294 if (info->session->getKilled())
00295 {
00296 info->session->send_kill_message();
00297 return 1;
00298 }
00299
00300
00301
00302
00303
00304 if (tmp != HA_ERR_RECORD_DELETED)
00305 {
00306 tmp= rr_handle_error(info, tmp);
00307 break;
00308 }
00309 }
00310
00311 return tmp;
00312 }
00313
00314 static int rr_from_tempfile(ReadRecord *info)
00315 {
00316 int tmp;
00317 for (;;)
00318 {
00319 if (my_b_read(info->io_cache,info->ref_pos,info->ref_length))
00320 return -1;
00321 if (!(tmp=info->cursor->rnd_pos(info->record,info->ref_pos)))
00322 break;
00323
00324 if (tmp == HA_ERR_RECORD_DELETED ||
00325 (tmp == HA_ERR_KEY_NOT_FOUND && info->ignore_not_found_rows))
00326 continue;
00327 tmp= rr_handle_error(info, tmp);
00328 break;
00329 }
00330 return tmp;
00331 }
00332
00348 static int rr_unpack_from_tempfile(ReadRecord *info)
00349 {
00350 if (my_b_read(info->io_cache, info->rec_buf, info->ref_length))
00351 return -1;
00352 Table *table= info->table;
00353 (*table->sort.unpack)(table->sort.addon_field, info->rec_buf);
00354
00355 return 0;
00356 }
00357
00358 static int rr_from_pointers(ReadRecord *info)
00359 {
00360 int tmp;
00361 unsigned char *cache_pos;
00362
00363
00364 for (;;)
00365 {
00366 if (info->cache_pos == info->cache_end)
00367 return -1;
00368 cache_pos= info->cache_pos;
00369 info->cache_pos+= info->ref_length;
00370
00371 if (!(tmp=info->cursor->rnd_pos(info->record,cache_pos)))
00372 break;
00373
00374
00375 if (tmp == HA_ERR_RECORD_DELETED ||
00376 (tmp == HA_ERR_KEY_NOT_FOUND && info->ignore_not_found_rows))
00377 continue;
00378 tmp= rr_handle_error(info, tmp);
00379 break;
00380 }
00381 return tmp;
00382 }
00383
00399 static int rr_unpack_from_buffer(ReadRecord *info)
00400 {
00401 if (info->cache_pos == info->cache_end)
00402 return -1;
00403 Table *table= info->table;
00404 (*table->sort.unpack)(table->sort.addon_field, info->cache_pos);
00405 info->cache_pos+= info->ref_length;
00406
00407 return 0;
00408 }
00409
00410
00411 bool ReadRecord::init_rr_cache()
00412 {
00413 uint32_t local_rec_cache_size;
00414
00415 struct_length= 3 + MAX_REFLENGTH;
00416 reclength= ALIGN_SIZE(table->getShare()->getRecordLength() + 1);
00417 if (reclength < struct_length)
00418 reclength= ALIGN_SIZE(struct_length);
00419
00420 error_offset= table->getShare()->getRecordLength();
00421 cache_records= (session->variables.read_rnd_buff_size /
00422 (reclength + struct_length));
00423 local_rec_cache_size= cache_records * reclength;
00424 rec_cache_size= cache_records * ref_length;
00425
00426 if (not global_read_rnd_buffer.add(session->variables.read_rnd_buff_size))
00427 {
00428 my_error(ER_OUT_OF_GLOBAL_READRNDMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00429 return false;
00430 }
00431
00432
00433 if (cache_records <= 2 ||
00434 !(cache=(unsigned char*) malloc(local_rec_cache_size + cache_records * struct_length + 1)))
00435 {
00436 return false;
00437 }
00438 #ifdef HAVE_VALGRIND
00439
00440 memset(cache, 0, local_rec_cache_size + cache_records * struct_length + 1);
00441 #endif
00442 read_positions= cache + local_rec_cache_size;
00443 cache_pos= cache_end= cache;
00444
00445 return true;
00446 }
00447
00448 static int rr_from_cache(ReadRecord *info)
00449 {
00450 uint32_t length;
00451 internal::my_off_t rest_of_file;
00452 int16_t error;
00453 unsigned char *position,*ref_position,*record_pos;
00454 uint32_t record;
00455
00456 for (;;)
00457 {
00458 if (info->cache_pos != info->cache_end)
00459 {
00460 if (info->cache_pos[info->error_offset])
00461 {
00462 shortget(error,info->cache_pos);
00463 if (info->print_error)
00464 info->table->print_error(error,MYF(0));
00465 }
00466 else
00467 {
00468 error=0;
00469 memcpy(info->record,info->cache_pos, (size_t) info->table->getShare()->getRecordLength());
00470 }
00471 info->cache_pos+= info->reclength;
00472 return ((int) error);
00473 }
00474 length=info->rec_cache_size;
00475 rest_of_file= info->io_cache->end_of_file - my_b_tell(info->io_cache);
00476 if ((internal::my_off_t) length > rest_of_file)
00477 {
00478 length= (uint32_t) rest_of_file;
00479 }
00480
00481 if (!length || my_b_read(info->io_cache, info->getCache(), length))
00482 {
00483 return -1;
00484 }
00485
00486 length/=info->ref_length;
00487 position=info->getCache();
00488 ref_position=info->read_positions;
00489 for (uint32_t i= 0 ; i < length ; i++,position+=info->ref_length)
00490 {
00491 memcpy(ref_position,position,(size_t) info->ref_length);
00492 ref_position+=MAX_REFLENGTH;
00493 int3store(ref_position,(long) i);
00494 ref_position+=3;
00495 }
00496 internal::my_qsort(info->read_positions, length, info->struct_length,
00497 (qsort_cmp) rr_cmp);
00498
00499 position=info->read_positions;
00500 for (uint32_t i= 0 ; i < length ; i++)
00501 {
00502 memcpy(info->ref_pos, position, (size_t)info->ref_length);
00503 position+=MAX_REFLENGTH;
00504 record=uint3korr(position);
00505 position+=3;
00506 record_pos= info->getCache() + record * info->reclength;
00507 if ((error=(int16_t) info->cursor->rnd_pos(record_pos,info->ref_pos)))
00508 {
00509 record_pos[info->error_offset]=1;
00510 shortstore(record_pos,error);
00511 }
00512 else
00513 record_pos[info->error_offset]=0;
00514 }
00515 info->cache_end= (info->cache_pos= info->getCache())+length*info->reclength;
00516 }
00517 }
00518
00519 static int rr_cmp(unsigned char *a,unsigned char *b)
00520 {
00521 if (a[0] != b[0])
00522 return (int) a[0] - (int) b[0];
00523 if (a[1] != b[1])
00524 return (int) a[1] - (int) b[1];
00525 if (a[2] != b[2])
00526 return (int) a[2] - (int) b[2];
00527 #if MAX_REFLENGTH == 4
00528 return (int) a[3] - (int) b[3];
00529 #else
00530 if (a[3] != b[3])
00531 return (int) a[3] - (int) b[3];
00532 if (a[4] != b[4])
00533 return (int) a[4] - (int) b[4];
00534 if (a[5] != b[5])
00535 return (int) a[1] - (int) b[5];
00536 if (a[6] != b[6])
00537 return (int) a[6] - (int) b[6];
00538 return (int) a[7] - (int) b[7];
00539 #endif
00540 }
00541
00542 }