00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "myisam_priv.h"
00022 #include <stddef.h>
00023 #include <queue>
00024 #include <algorithm>
00025
00026
00027
00028 #undef MIN_SORT_MEMORY
00029 #undef MYF_RW
00030 #undef DISK_BUFFER_SIZE
00031
00032 #define MERGEBUFF 15
00033 #define MERGEBUFF2 31
00034 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
00035 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
00036 #define DISK_BUFFER_SIZE (IO_SIZE*16)
00037
00038 using namespace std;
00039 using namespace drizzled;
00040
00041
00042
00043
00044
00045
00046 extern void print_error(const char *fmt,...);
00047
00048
00049
00050 static ha_rows find_all_keys(MI_SORT_PARAM *info,uint32_t keys,
00051 unsigned char **sort_keys,
00052 DYNAMIC_ARRAY *buffpek,
00053 size_t *maxbuffer,
00054 internal::IO_CACHE *tempfile,
00055 internal::IO_CACHE *tempfile_for_exceptions);
00056 static int write_keys(MI_SORT_PARAM *info,unsigned char **sort_keys,
00057 uint32_t count, BUFFPEK *buffpek,internal::IO_CACHE *tempfile);
00058 static int write_key(MI_SORT_PARAM *info, unsigned char *key,
00059 internal::IO_CACHE *tempfile);
00060 static int write_index(MI_SORT_PARAM *info,unsigned char * *sort_keys,
00061 uint32_t count);
00062 static int merge_many_buff(MI_SORT_PARAM *info,uint32_t keys,
00063 unsigned char * *sort_keys,
00064 BUFFPEK *buffpek,size_t *maxbuffer,
00065 internal::IO_CACHE *t_file);
00066 static uint32_t read_to_buffer(internal::IO_CACHE *fromfile,BUFFPEK *buffpek,
00067 uint32_t sort_length);
00068 static int merge_buffers(MI_SORT_PARAM *info,uint32_t keys,
00069 internal::IO_CACHE *from_file, internal::IO_CACHE *to_file,
00070 unsigned char * *sort_keys, BUFFPEK *lastbuff,
00071 BUFFPEK *Fb, BUFFPEK *Tb);
00072 static int merge_index(MI_SORT_PARAM *,uint,unsigned char **,BUFFPEK *, int,
00073 internal::IO_CACHE *);
00074 static int write_keys_varlen(MI_SORT_PARAM *info,unsigned char **sort_keys,
00075 uint32_t count, BUFFPEK *buffpek,
00076 internal::IO_CACHE *tempfile);
00077 static uint32_t read_to_buffer_varlen(internal::IO_CACHE *fromfile,BUFFPEK *buffpek,
00078 uint32_t sort_length);
00079 static int write_merge_key(MI_SORT_PARAM *info, internal::IO_CACHE *to_file,
00080 unsigned char *key, uint32_t sort_length, uint32_t count);
00081 static int write_merge_key_varlen(MI_SORT_PARAM *info,
00082 internal::IO_CACHE *to_file,
00083 unsigned char* key, uint32_t sort_length,
00084 uint32_t count);
00085
00086 inline int
00087 my_var_write(MI_SORT_PARAM *info, internal::IO_CACHE *to_file, unsigned char *bufs);
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103 int _create_index_by_sort(MI_SORT_PARAM *info,bool no_messages,
00104 size_t sortbuff_size)
00105 {
00106 int error;
00107 size_t maxbuffer, skr;
00108 uint32_t memavl,old_memavl,keys,sort_length;
00109 DYNAMIC_ARRAY buffpek;
00110 ha_rows records;
00111 unsigned char **sort_keys;
00112 internal::IO_CACHE tempfile, tempfile_for_exceptions;
00113
00114 if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
00115 {
00116 info->write_keys=write_keys_varlen;
00117 info->read_to_buffer=read_to_buffer_varlen;
00118 info->write_key= write_merge_key_varlen;
00119 }
00120 else
00121 {
00122 info->write_keys=write_keys;
00123 info->read_to_buffer=read_to_buffer;
00124 info->write_key=write_merge_key;
00125 }
00126
00127 my_b_clear(&tempfile);
00128 my_b_clear(&tempfile_for_exceptions);
00129 memset(&buffpek, 0, sizeof(buffpek));
00130 sort_keys= (unsigned char **) NULL; error= 1;
00131 maxbuffer=1;
00132
00133 memavl=max(sortbuff_size,(size_t)MIN_SORT_MEMORY);
00134 records= info->sort_info->max_records;
00135 sort_length= info->key_length;
00136
00137 while (memavl >= MIN_SORT_MEMORY)
00138 {
00139 if ((records < UINT32_MAX) &&
00140 ((internal::my_off_t) (records + 1) *
00141 (sort_length + sizeof(char*)) <= (internal::my_off_t) memavl))
00142 keys= (uint)records+1;
00143 else
00144 do
00145 {
00146 skr=maxbuffer;
00147 if (memavl < sizeof(BUFFPEK)* maxbuffer ||
00148 (keys=(memavl-sizeof(BUFFPEK)* maxbuffer)/
00149 (sort_length+sizeof(char*))) <= 1 ||
00150 keys < maxbuffer)
00151 {
00152 mi_check_print_error(info->sort_info->param,
00153 "myisam_sort_buffer_size is too small");
00154 goto err;
00155 }
00156 }
00157 while ((maxbuffer= (size_t)(records/(keys-1)+1)) != skr);
00158
00159 if ((sort_keys=(unsigned char **)malloc(keys*(sort_length+sizeof(char*)))))
00160 {
00161 if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
00162 maxbuffer/2))
00163 {
00164 free((unsigned char*) sort_keys);
00165 sort_keys= 0;
00166 }
00167 else
00168 break;
00169 }
00170 old_memavl=memavl;
00171 if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
00172 memavl=MIN_SORT_MEMORY;
00173 }
00174 if (memavl < MIN_SORT_MEMORY)
00175 {
00176 mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small");
00177 goto err;
00178 }
00179 (*info->lock_in_memory)(info->sort_info->param);
00180
00181 if (!no_messages)
00182 printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
00183
00184 if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
00185 &tempfile,&tempfile_for_exceptions))
00186 == HA_POS_ERROR)
00187 goto err;
00188 if (maxbuffer == 0)
00189 {
00190 if (!no_messages)
00191 printf(" - Dumping %u keys\n", (uint32_t) records);
00192 if (write_index(info,sort_keys, (uint) records))
00193 goto err;
00194 }
00195 else
00196 {
00197 keys=(keys*(sort_length+sizeof(char*)))/sort_length;
00198 if (maxbuffer >= MERGEBUFF2)
00199 {
00200 if (!no_messages)
00201 printf(" - Merging %u keys\n", (uint32_t) records);
00202 if (merge_many_buff(info,keys,sort_keys, (BUFFPEK*)buffpek.buffer, &maxbuffer, &tempfile))
00203 goto err;
00204 }
00205 if (internal::flush_io_cache(&tempfile) ||
00206 tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00207 goto err;
00208 if (!no_messages)
00209 printf(" - Last merge and dumping keys\n");
00210 if (merge_index(info,keys,sort_keys, (BUFFPEK*)buffpek.buffer, maxbuffer, &tempfile))
00211 goto err;
00212 }
00213
00214 if (flush_pending_blocks(info))
00215 goto err;
00216
00217 if (my_b_inited(&tempfile_for_exceptions))
00218 {
00219 MI_INFO *idx=info->sort_info->info;
00220 uint32_t keyno=info->key;
00221 uint32_t key_length, ref_length=idx->s->rec_reflength;
00222
00223 if (not no_messages)
00224 printf(" - Adding exceptions\n");
00225
00226 if (flush_io_cache(&tempfile_for_exceptions) || tempfile_for_exceptions.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00227 {
00228 goto err;
00229 }
00230
00231 while (!my_b_read(&tempfile_for_exceptions,(unsigned char*)&key_length,
00232 sizeof(key_length))
00233 && !my_b_read(&tempfile_for_exceptions,(unsigned char*)sort_keys,
00234 (uint) key_length))
00235 {
00236 if (_mi_ck_write(idx,keyno,(unsigned char*) sort_keys,key_length-ref_length))
00237 goto err;
00238 }
00239 }
00240
00241 error =0;
00242
00243 err:
00244 if (sort_keys)
00245 free((unsigned char*) sort_keys);
00246 delete_dynamic(&buffpek);
00247 tempfile.close_cached_file();
00248 tempfile_for_exceptions.close_cached_file();
00249
00250 return(error ? -1 : 0);
00251 }
00252
00253
00254
00255
00256 static ha_rows find_all_keys(MI_SORT_PARAM *info, uint32_t keys,
00257 unsigned char **sort_keys,
00258 DYNAMIC_ARRAY *buffpek,
00259 size_t *maxbuffer, internal::IO_CACHE *tempfile,
00260 internal::IO_CACHE *tempfile_for_exceptions)
00261 {
00262 int error;
00263 uint32_t idx;
00264
00265 idx=error=0;
00266 sort_keys[0]=(unsigned char*) (sort_keys+keys);
00267
00268 while (!(error=(*info->key_read)(info,sort_keys[idx])))
00269 {
00270 if (info->real_key_length > info->key_length)
00271 {
00272 if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
00273 return(HA_POS_ERROR);
00274 continue;
00275 }
00276
00277 if (++idx == keys)
00278 {
00279 if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)alloc_dynamic(buffpek),
00280 tempfile))
00281 return(HA_POS_ERROR);
00282
00283 sort_keys[0]=(unsigned char*) (sort_keys+keys);
00284 memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
00285 idx=1;
00286 }
00287 sort_keys[idx]=sort_keys[idx-1]+info->key_length;
00288 }
00289 if (error > 0)
00290 return(HA_POS_ERROR);
00291 if (buffpek->size())
00292 {
00293 if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
00294 tempfile))
00295 return(HA_POS_ERROR);
00296 *maxbuffer=buffpek->size() - 1;
00297 }
00298 else
00299 *maxbuffer=0;
00300
00301 return((*maxbuffer)*(keys-1)+idx);
00302 }
00303
00304
00305 int thr_write_keys(MI_SORT_PARAM *sort_param)
00306 {
00307 SORT_INFO *sort_info= sort_param->sort_info;
00308 MI_CHECK *param= sort_info->param;
00309 uint32_t length= 0, keys;
00310 ulong *rec_per_key_part= param->rec_per_key_part;
00311 int got_error=sort_info->got_error;
00312 uint32_t i;
00313 MI_INFO *info=sort_info->info;
00314 MYISAM_SHARE *share=info->s;
00315 MI_SORT_PARAM *sinfo;
00316 unsigned char *mergebuf=0;
00317
00318 for (i= 0, sinfo= sort_param ;
00319 i < sort_info->total_keys ;
00320 i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
00321 {
00322 if (!sinfo->sort_keys)
00323 {
00324 got_error=1;
00325 void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
00326 if (rec_buff_ptr != NULL)
00327 free(rec_buff_ptr);
00328 continue;
00329 }
00330 if (!got_error)
00331 {
00332 mi_set_key_active(share->state.key_map, sinfo->key);
00333 if (!sinfo->buffpek.size())
00334 {
00335 if (param->testflag & T_VERBOSE)
00336 {
00337 printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
00338 fflush(stdout);
00339 }
00340 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || flush_pending_blocks(sinfo))
00341 got_error=1;
00342 }
00343 if (!got_error && param->testflag & T_STATISTICS)
00344 update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
00345 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
00346 sinfo->notnull: NULL,
00347 (uint64_t) info->state->records);
00348 }
00349 free((unsigned char*) sinfo->sort_keys);
00350 void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
00351 if (rec_buff_ptr != NULL)
00352 free(rec_buff_ptr);
00353 sinfo->sort_keys=0;
00354 }
00355
00356 for (i= 0, sinfo= sort_param ;
00357 i < sort_info->total_keys ;
00358 i++,
00359 delete_dynamic(&sinfo->buffpek),
00360 sinfo->tempfile.close_cached_file(),
00361 sinfo->tempfile_for_exceptions.close_cached_file(),
00362 sinfo++)
00363 {
00364 if (got_error)
00365 continue;
00366 if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
00367 {
00368 sinfo->write_keys=write_keys_varlen;
00369 sinfo->read_to_buffer=read_to_buffer_varlen;
00370 sinfo->write_key=write_merge_key_varlen;
00371 }
00372 else
00373 {
00374 sinfo->write_keys=write_keys;
00375 sinfo->read_to_buffer=read_to_buffer;
00376 sinfo->write_key=write_merge_key;
00377 }
00378 if (sinfo->buffpek.size())
00379 {
00380 size_t maxbuffer=sinfo->buffpek.size() - 1;
00381 if (!mergebuf)
00382 {
00383 length=param->sort_buffer_length;
00384 while (length >= MIN_SORT_MEMORY)
00385 {
00386 if ((mergebuf= (unsigned char *)malloc(length)))
00387 break;
00388 length=length*3/4;
00389 }
00390 if (!mergebuf)
00391 {
00392 got_error=1;
00393 continue;
00394 }
00395 }
00396 keys=length/sinfo->key_length;
00397 if (maxbuffer >= MERGEBUFF2)
00398 {
00399 if (param->testflag & T_VERBOSE)
00400 printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
00401 if (merge_many_buff(sinfo, keys, (unsigned char **)mergebuf, (BUFFPEK*)sinfo->buffpek.buffer,
00402 &maxbuffer, &sinfo->tempfile))
00403 {
00404 got_error=1;
00405 continue;
00406 }
00407 }
00408 if (flush_io_cache(&sinfo->tempfile) || sinfo->tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00409 {
00410 got_error=1;
00411 continue;
00412 }
00413 if (param->testflag & T_VERBOSE)
00414 printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
00415 if (merge_index(sinfo, keys, (unsigned char **)mergebuf, (BUFFPEK*)sinfo->buffpek.buffer,
00416 maxbuffer,&sinfo->tempfile) ||
00417 flush_pending_blocks(sinfo))
00418 {
00419 got_error=1;
00420 continue;
00421 }
00422 }
00423 if (my_b_inited(&sinfo->tempfile_for_exceptions))
00424 {
00425 uint32_t key_length;
00426
00427 if (param->testflag & T_VERBOSE)
00428 printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
00429
00430 if (flush_io_cache(&sinfo->tempfile_for_exceptions) || sinfo->tempfile_for_exceptions.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00431 {
00432 got_error=1;
00433 continue;
00434 }
00435
00436 while (!got_error &&
00437 !my_b_read(&sinfo->tempfile_for_exceptions,(unsigned char*)&key_length,
00438 sizeof(key_length)))
00439 {
00440 unsigned char ft_buf[10];
00441 if (key_length > sizeof(ft_buf) ||
00442 my_b_read(&sinfo->tempfile_for_exceptions, (unsigned char*)ft_buf,
00443 (uint)key_length) ||
00444 _mi_ck_write(info, sinfo->key, (unsigned char*)ft_buf,
00445 key_length - info->s->rec_reflength))
00446 got_error=1;
00447 }
00448 }
00449 }
00450 free((unsigned char*) mergebuf);
00451 return(got_error);
00452 }
00453
00454
00455
00456 static int write_keys(MI_SORT_PARAM *info, register unsigned char **sort_keys,
00457 uint32_t count, BUFFPEK *buffpek, internal::IO_CACHE *tempfile)
00458 {
00459 unsigned char **end;
00460 uint32_t sort_length=info->key_length;
00461
00462 internal::my_qsort2((unsigned char*) sort_keys,count,sizeof(unsigned char*),(qsort2_cmp) info->key_cmp,
00463 info);
00464 if (!my_b_inited(tempfile) && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00465 return(1);
00466
00467 buffpek->file_pos=my_b_tell(tempfile);
00468 buffpek->count=count;
00469
00470 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
00471 {
00472 if (my_b_write(tempfile,(unsigned char*) *sort_keys,(uint) sort_length))
00473 return(1);
00474 }
00475 return(0);
00476 }
00477
00478
00479 inline int
00480 my_var_write(MI_SORT_PARAM *info, internal::IO_CACHE *to_file, unsigned char *bufs)
00481 {
00482 int err;
00483 uint16_t len = _mi_keylength(info->keyinfo, (unsigned char*) bufs);
00484
00485
00486 if ((err= my_b_write(to_file, (unsigned char*)&len, sizeof(len))))
00487 return (err);
00488 if ((err= my_b_write(to_file,bufs, (uint) len)))
00489 return (err);
00490 return (0);
00491 }
00492
00493
00494 static int write_keys_varlen(MI_SORT_PARAM *info,
00495 register unsigned char **sort_keys,
00496 uint32_t count, BUFFPEK *buffpek,
00497 internal::IO_CACHE *tempfile)
00498 {
00499 unsigned char **end;
00500 int err;
00501
00502 internal::my_qsort2((unsigned char*) sort_keys,count,sizeof(unsigned char*),(qsort2_cmp) info->key_cmp,
00503 info);
00504 if (!my_b_inited(tempfile) && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00505 return(1);
00506
00507 buffpek->file_pos=my_b_tell(tempfile);
00508 buffpek->count=count;
00509 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
00510 {
00511 if ((err= my_var_write(info,tempfile, (unsigned char*) *sort_keys)))
00512 return(err);
00513 }
00514 return(0);
00515 }
00516
00517
00518 static int write_key(MI_SORT_PARAM *info, unsigned char *key,
00519 internal::IO_CACHE *tempfile)
00520 {
00521 uint32_t key_length=info->real_key_length;
00522
00523 if (!my_b_inited(tempfile) && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00524 return(1);
00525
00526 if (my_b_write(tempfile,(unsigned char*)&key_length,sizeof(key_length)) ||
00527 my_b_write(tempfile,(unsigned char*)key,(uint) key_length))
00528 return(1);
00529 return(0);
00530 }
00531
00532
00533
00534
00535 static int write_index(MI_SORT_PARAM *info, register unsigned char **sort_keys,
00536 register uint32_t count)
00537 {
00538 internal::my_qsort2((unsigned char*) sort_keys,(size_t) count,sizeof(unsigned char*),
00539 (qsort2_cmp) info->key_cmp,info);
00540 while (count--)
00541 {
00542 if ((*info->key_write)(info,*sort_keys++))
00543 return(-1);
00544 }
00545 return(0);
00546 }
00547
00548
00549
00550
00551 static int merge_many_buff(MI_SORT_PARAM *info, uint32_t keys,
00552 unsigned char **sort_keys, BUFFPEK *buffpek,
00553 size_t *maxbuffer, internal::IO_CACHE *t_file)
00554 {
00555 uint32_t i;
00556 internal::IO_CACHE t_file2, *from_file, *to_file, *temp;
00557 BUFFPEK *lastbuff;
00558
00559 if (*maxbuffer < MERGEBUFF2)
00560 return(0);
00561 if (flush_io_cache(t_file) || t_file2.open_cached_file(P_tmpdir, "ST",
00562 DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
00563 return(1);
00564
00565 from_file= t_file ; to_file= &t_file2;
00566 while (*maxbuffer >= MERGEBUFF2)
00567 {
00568 from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0);
00569 to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0);
00570 lastbuff=buffpek;
00571 for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
00572 {
00573 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
00574 buffpek+i,buffpek+i+MERGEBUFF-1))
00575 goto cleanup;
00576 }
00577 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
00578 buffpek+i,buffpek+ *maxbuffer))
00579 break;
00580 if (flush_io_cache(to_file))
00581 break;
00582 temp=from_file; from_file=to_file; to_file=temp;
00583 *maxbuffer= (int) (lastbuff-buffpek)-1;
00584 }
00585 cleanup:
00586 to_file->close_cached_file();
00587 if (to_file == t_file)
00588 *t_file=t_file2;
00589
00590 return(*maxbuffer >= MERGEBUFF2);
00591 }
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607 static uint32_t read_to_buffer(internal::IO_CACHE *fromfile, BUFFPEK *buffpek,
00608 uint32_t sort_length)
00609 {
00610 register uint32_t count;
00611 uint32_t length;
00612
00613 if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
00614 {
00615 if (my_pread(fromfile->file,(unsigned char*) buffpek->base,
00616 (length= sort_length*count),buffpek->file_pos,MYF_RW))
00617 return((uint) -1);
00618 buffpek->key=buffpek->base;
00619 buffpek->file_pos+= length;
00620 buffpek->count-= count;
00621 buffpek->mem_count= count;
00622 }
00623 return (count*sort_length);
00624 }
00625
00626 static uint32_t read_to_buffer_varlen(internal::IO_CACHE *fromfile, BUFFPEK *buffpek,
00627 uint32_t sort_length)
00628 {
00629 register uint32_t count;
00630 uint16_t length_of_key = 0;
00631 uint32_t idx;
00632 unsigned char *buffp;
00633
00634 if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
00635 {
00636 buffp = buffpek->base;
00637
00638 for (idx=1;idx<=count;idx++)
00639 {
00640 if (my_pread(fromfile->file,(unsigned char*)&length_of_key,sizeof(length_of_key),
00641 buffpek->file_pos,MYF_RW))
00642 return((uint) -1);
00643 buffpek->file_pos+=sizeof(length_of_key);
00644 if (my_pread(fromfile->file,(unsigned char*) buffp,length_of_key,
00645 buffpek->file_pos,MYF_RW))
00646 return((uint) -1);
00647 buffpek->file_pos+=length_of_key;
00648 buffp = buffp + sort_length;
00649 }
00650 buffpek->key=buffpek->base;
00651 buffpek->count-= count;
00652 buffpek->mem_count= count;
00653 }
00654 return (count*sort_length);
00655 }
00656
00657
00658 static int write_merge_key_varlen(MI_SORT_PARAM *info,
00659 internal::IO_CACHE *to_file, unsigned char* key,
00660 uint32_t sort_length, uint32_t count)
00661 {
00662 uint32_t idx;
00663 unsigned char *bufs = key;
00664
00665 for (idx=1;idx<=count;idx++)
00666 {
00667 int err;
00668 if ((err= my_var_write(info, to_file, bufs)))
00669 return (err);
00670 bufs=bufs+sort_length;
00671 }
00672 return(0);
00673 }
00674
00675
00676 static int write_merge_key(MI_SORT_PARAM *info,
00677 internal::IO_CACHE *to_file, unsigned char *key,
00678 uint32_t sort_length, uint32_t count)
00679 {
00680 (void)info;
00681 return my_b_write(to_file, key, (size_t) sort_length*count);
00682 }
00683
00684
00685
00686
00687
00688 class compare_functor
00689 {
00690 qsort2_cmp key_compare;
00691 void *key_compare_arg;
00692 public:
00693 compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
00694 : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
00695 inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
00696 {
00697 int val= key_compare(key_compare_arg,
00698 &i->key, &j->key);
00699 return (val >= 0);
00700 }
00701 };
00702
00703
00704
00705
00706
00707
00708 static int
00709 merge_buffers(MI_SORT_PARAM *info, uint32_t keys, internal::IO_CACHE *from_file,
00710 internal::IO_CACHE *to_file, unsigned char **sort_keys, BUFFPEK *lastbuff,
00711 BUFFPEK *Fb, BUFFPEK *Tb)
00712 {
00713 int error;
00714 uint32_t sort_length,maxcount;
00715 ha_rows count;
00716 internal::my_off_t to_start_filepos= 0;
00717 unsigned char *strpos;
00718 BUFFPEK *buffpek;
00719 priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor >
00720 queue(compare_functor((qsort2_cmp) info->key_cmp, static_cast<void *>(info)));
00721 volatile int *killed= killed_ptr(info->sort_info->param);
00722
00723 count=error=0;
00724 maxcount=keys/((uint) (Tb-Fb) +1);
00725 assert(maxcount > 0);
00726 if (to_file)
00727 to_start_filepos=my_b_tell(to_file);
00728 strpos=(unsigned char*) sort_keys;
00729 sort_length=info->key_length;
00730
00731 for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
00732 {
00733 count+= buffpek->count;
00734 buffpek->base= strpos;
00735 buffpek->max_keys=maxcount;
00736 strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
00737 sort_length));
00738 if (error == -1)
00739 goto err;
00740 queue.push(buffpek);
00741 }
00742
00743 while (queue.size() > 1)
00744 {
00745 for (;;)
00746 {
00747 if (*killed)
00748 {
00749 error=1; goto err;
00750 }
00751 buffpek= queue.top();
00752 if (to_file)
00753 {
00754 if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
00755 (uint) sort_length,1))
00756 {
00757 error=1; goto err;
00758 }
00759 }
00760 else
00761 {
00762 if ((*info->key_write)(info,(void*) buffpek->key))
00763 {
00764 error=1; goto err;
00765 }
00766 }
00767 buffpek->key+=sort_length;
00768 if (! --buffpek->mem_count)
00769 {
00770 if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
00771 {
00772 queue.pop();
00773 break;
00774 }
00775 }
00776 else if (error == -1)
00777 goto err;
00778
00779 queue.pop();
00780 queue.push(buffpek);
00781 }
00782 }
00783 buffpek= queue.top();
00784 buffpek->base=(unsigned char *) sort_keys;
00785 buffpek->max_keys=keys;
00786 do
00787 {
00788 if (to_file)
00789 {
00790 if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
00791 sort_length,buffpek->mem_count))
00792 {
00793 error=1; goto err;
00794 }
00795 }
00796 else
00797 {
00798 register unsigned char *end;
00799 strpos= buffpek->key;
00800 for (end=strpos+buffpek->mem_count*sort_length;
00801 strpos != end ;
00802 strpos+=sort_length)
00803 {
00804 if ((*info->key_write)(info,(void*) strpos))
00805 {
00806 error=1; goto err;
00807 }
00808 }
00809 }
00810 }
00811 while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
00812 error != 0);
00813
00814 lastbuff->count=count;
00815 if (to_file)
00816 lastbuff->file_pos=to_start_filepos;
00817 err:
00818 return(error);
00819 }
00820
00821
00822
00823
00824 static int
00825 merge_index(MI_SORT_PARAM *info, uint32_t keys, unsigned char **sort_keys,
00826 BUFFPEK *buffpek, int maxbuffer, internal::IO_CACHE *tempfile)
00827 {
00828 if (merge_buffers(info,keys,tempfile,(internal::IO_CACHE*) 0,sort_keys,buffpek,buffpek,
00829 buffpek+maxbuffer))
00830 return(1);
00831 return(0);
00832 }
00833