Drizzled Public API Documentation

mi_write.cc
00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 /* Write a row to a MyISAM table */
00017 
00018 #include "myisam_priv.h"
00019 
00020 #include <drizzled/internal/m_string.h>
00021 #include <drizzled/util/test.h>
00022 
00023 using namespace drizzled;
00024 
00025 #define MAX_POINTER_LENGTH 8
00026 
00027   /* Functions declared in this file */
00028 
00029 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
00030         uint32_t comp_flag, unsigned char *key,
00031         uint32_t key_length, internal::my_off_t pos, unsigned char *father_buff,
00032         unsigned char *father_keypos, internal::my_off_t father_page,
00033         bool insert_last);
00034 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,unsigned char *key,
00035           unsigned char *curr_buff,unsigned char *father_buff,
00036           unsigned char *father_keypos,internal::my_off_t father_page);
00037 static unsigned char *_mi_find_last_pos(MI_KEYDEF *keyinfo, unsigned char *page,
00038         unsigned char *key, uint32_t *return_key_length,
00039         unsigned char **after_key);
00040 int _mi_ck_write_tree(register MI_INFO *info, uint32_t keynr,unsigned char *key,
00041           uint32_t key_length);
00042 int _mi_ck_write_btree(register MI_INFO *info, uint32_t keynr,unsigned char *key,
00043            uint32_t key_length);
00044 
00045   /* Write new record to database */
00046 
00047 int mi_write(MI_INFO *info, unsigned char *record)
00048 {
00049   MYISAM_SHARE *share=info->s;
00050   uint32_t i;
00051   int save_errno;
00052   internal::my_off_t filepos;
00053   unsigned char *buff;
00054   bool lock_tree= share->concurrent_insert;
00055 
00056   if (share->options & HA_OPTION_READ_ONLY_DATA)
00057   {
00058     return(errno=EACCES);
00059   }
00060   if (_mi_readinfo(info,F_WRLCK,1))
00061     return(errno);
00062   filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
00063              !info->append_insert_at_end) ?
00064       share->state.dellink :
00065       info->state->data_file_length);
00066 
00067   if (share->base.reloc == (ha_rows) 1 &&
00068       share->base.records == (ha_rows) 1 &&
00069       info->state->records == (ha_rows) 1)
00070   {           /* System file */
00071     errno=HA_ERR_RECORD_FILE_FULL;
00072     goto err2;
00073   }
00074   if (info->state->key_file_length >= share->base.margin_key_file_length)
00075   {
00076     errno=HA_ERR_INDEX_FILE_FULL;
00077     goto err2;
00078   }
00079   if (_mi_mark_file_changed(info))
00080     goto err2;
00081 
00082   /* Calculate and check all unique constraints */
00083   for (i=0 ; i < share->state.header.uniques ; i++)
00084   {
00085     if (mi_check_unique(info,share->uniqueinfo+i,record,
00086          mi_unique_hash(share->uniqueinfo+i,record),
00087          HA_OFFSET_ERROR))
00088       goto err2;
00089   }
00090 
00091   /* Write all keys to indextree */
00092 
00093   buff=info->lastkey2;
00094   for (i=0 ; i < share->base.keys ; i++)
00095   {
00096     if (mi_is_key_active(share->state.key_map, i))
00097     {
00098       bool local_lock_tree= (lock_tree &&
00099                                 !(info->bulk_insert &&
00100                                   is_tree_inited(&info->bulk_insert[i])));
00101       if (local_lock_tree)
00102       {
00103   share->keyinfo[i].version++;
00104       }
00105       {
00106         if (share->keyinfo[i].ck_insert(info,i,buff,
00107       _mi_make_key(info,i,buff,record,filepos)))
00108         {
00109           goto err;
00110         }
00111       }
00112 
00113       /* The above changed info->lastkey2. Inform mi_rnext_same(). */
00114       info->update&= ~HA_STATE_RNEXT_SAME;
00115     }
00116   }
00117   if (share->calc_checksum)
00118     info->checksum=(*share->calc_checksum)(info,record);
00119   if (!(info->opt_flag & OPT_NO_ROWS))
00120   {
00121     if ((*share->write_record)(info,record))
00122       goto err;
00123     info->state->checksum+=info->checksum;
00124   }
00125   if (share->base.auto_key)
00126     set_if_bigger(info->s->state.auto_increment,
00127                   retrieve_auto_increment(info, record));
00128   info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
00129      HA_STATE_ROW_CHANGED);
00130   info->state->records++;
00131   info->lastpos=filepos;
00132   _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
00133 
00134   /*
00135     Update status of the table. We need to do so after each row write
00136     for the log tables, as we want the new row to become visible to
00137     other threads as soon as possible. We don't lock mutex here
00138     (as it is required by pthread memory visibility rules) as (1) it's
00139     not critical to use outdated share->is_log_table value (2) locking
00140     mutex here for every write is too expensive.
00141   */
00142   if (share->is_log_table) // Log table do not exist in Drizzle
00143     assert(0);
00144 
00145   return(0);
00146 
00147 err:
00148   save_errno=errno;
00149   if (errno == HA_ERR_FOUND_DUPP_KEY || errno == HA_ERR_RECORD_FILE_FULL ||
00150       errno == HA_ERR_NULL_IN_SPATIAL || errno == HA_ERR_OUT_OF_MEM)
00151   {
00152     if (info->bulk_insert)
00153     {
00154       uint32_t j;
00155       for (j=0 ; j < share->base.keys ; j++)
00156         mi_flush_bulk_insert(info, j);
00157     }
00158     info->errkey= (int) i;
00159     while ( i-- > 0)
00160     {
00161       if (mi_is_key_active(share->state.key_map, i))
00162       {
00163   {
00164     uint32_t key_length=_mi_make_key(info,i,buff,record,filepos);
00165     if (_mi_ck_delete(info,i,buff,key_length))
00166     {
00167       break;
00168     }
00169   }
00170       }
00171     }
00172   }
00173   else
00174   {
00175     mi_print_error(info->s, HA_ERR_CRASHED);
00176     mi_mark_crashed(info);
00177   }
00178   info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
00179   errno=save_errno;
00180 err2:
00181   save_errno=errno;
00182   _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
00183   return(errno=save_errno);
00184 } /* mi_write */
00185 
00186 
00187   /* Write one key to btree */
00188 
00189 int _mi_ck_write(MI_INFO *info, uint32_t keynr, unsigned char *key, uint32_t key_length)
00190 {
00191   if (info->bulk_insert && is_tree_inited(&info->bulk_insert[keynr]))
00192   {
00193     return(_mi_ck_write_tree(info, keynr, key, key_length));
00194   }
00195   else
00196   {
00197     return(_mi_ck_write_btree(info, keynr, key, key_length));
00198   }
00199 } /* _mi_ck_write */
00200 
00201 
00202 /**********************************************************************
00203  *                Normal insert code                                  *
00204  **********************************************************************/
00205 
00206 int _mi_ck_write_btree(register MI_INFO *info, uint32_t keynr, unsigned char *key,
00207            uint32_t key_length)
00208 {
00209   uint32_t error;
00210   uint32_t comp_flag;
00211   MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
00212   internal::my_off_t  *root=&info->s->state.key_root[keynr];
00213 
00214   if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
00215     comp_flag=SEARCH_BIGGER;      /* Put after same key */
00216   else if (keyinfo->flag & (HA_NOSAME))
00217   {
00218     comp_flag=SEARCH_FIND | SEARCH_UPDATE;  /* No duplicates */
00219     if (keyinfo->flag & HA_NULL_ARE_EQUAL)
00220       comp_flag|= SEARCH_NULL_ARE_EQUAL;
00221   }
00222   else
00223     comp_flag=SEARCH_SAME;      /* Keys in rec-pos order */
00224 
00225   error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
00226                                 root, comp_flag);
00227   return(error);
00228 } /* _mi_ck_write_btree */
00229 
00230 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
00231     unsigned char *key, uint32_t key_length, internal::my_off_t *root, uint32_t comp_flag)
00232 {
00233   int error;
00234   /* key_length parameter is used only if comp_flag is SEARCH_FIND */
00235   if (*root == HA_OFFSET_ERROR ||
00236       (error=w_search(info, keyinfo, comp_flag, key, key_length,
00237           *root, (unsigned char *) 0, (unsigned char*) 0,
00238           (internal::my_off_t) 0, 1)) > 0)
00239     error=_mi_enlarge_root(info,keyinfo,key,root);
00240   return(error);
00241 } /* _mi_ck_real_write_btree */
00242 
00243 
00244   /* Make a new root with key as only pointer */
00245 
00246 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, unsigned char *key,
00247                      internal::my_off_t *root)
00248 {
00249   uint32_t t_length,nod_flag;
00250   MI_KEY_PARAM s_temp;
00251   MYISAM_SHARE *share=info->s;
00252 
00253   nod_flag= (*root != HA_OFFSET_ERROR) ?  share->base.key_reflength : 0;
00254   _mi_kpointer(info,info->buff+2,*root); /* if nod */
00255   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(unsigned char*) 0,
00256         (unsigned char*) 0, (unsigned char*) 0, key,&s_temp);
00257   mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
00258   (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
00259   info->buff_used=info->page_changed=1;   /* info->buff is used */
00260   if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
00261       _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
00262     return(-1);
00263   return(0);
00264 } /* _mi_enlarge_root */
00265 
00266 
00267   /*
00268     Search after a position for a key and store it there
00269     Returns -1 = error
00270        0  = ok
00271        1  = key should be stored in higher tree
00272   */
00273 
00274 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
00275         uint32_t comp_flag, unsigned char *key, uint32_t key_length, internal::my_off_t page,
00276         unsigned char *father_buff, unsigned char *father_keypos,
00277         internal::my_off_t father_page, bool insert_last)
00278 {
00279   int error,flag;
00280   uint32_t nod_flag, search_key_length;
00281   unsigned char *temp_buff,*keypos;
00282   unsigned char keybuff[MI_MAX_KEY_BUFF];
00283   bool was_last_key;
00284   internal::my_off_t next_page, dupp_key_pos;
00285 
00286   search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
00287   if (!(temp_buff= (unsigned char*) malloc(keyinfo->block_length+
00288                    MI_MAX_KEY_BUFF*2)))
00289     return(-1);
00290   if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
00291     goto err;
00292 
00293   flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
00294             comp_flag, &keypos, keybuff, &was_last_key);
00295   nod_flag=mi_test_if_nod(temp_buff);
00296   if (flag == 0)
00297   {
00298     uint32_t tmp_key_length;
00299   /* get position to record with duplicated key */
00300     tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
00301     if (tmp_key_length)
00302       dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
00303     else
00304       dupp_key_pos= HA_OFFSET_ERROR;
00305 
00306     {
00307       info->dupp_key_pos= dupp_key_pos;
00308       free(temp_buff);
00309       errno=HA_ERR_FOUND_DUPP_KEY;
00310       return(-1);
00311     }
00312   }
00313   if (flag == MI_FOUND_WRONG_KEY)
00314     return(-1);
00315   if (!was_last_key)
00316     insert_last=0;
00317   next_page=_mi_kpos(nod_flag,keypos);
00318   if (next_page == HA_OFFSET_ERROR ||
00319       (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
00320           temp_buff, keypos, page, insert_last)) >0)
00321   {
00322     error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
00323          father_keypos,father_page, insert_last);
00324     if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
00325       goto err;
00326   }
00327   free(temp_buff);
00328   return(error);
00329 err:
00330   free(temp_buff);
00331   return (-1);
00332 } /* w_search */
00333 
00334 
00335 /*
00336   Insert new key.
00337 
00338   SYNOPSIS
00339     _mi_insert()
00340     info                        Open table information.
00341     keyinfo                     Key definition information.
00342     key                         New key.
00343     anc_buff                    Key page (beginning).
00344     key_pos                     Position in key page where to insert.
00345     key_buff                    Copy of previous key.
00346     father_buff                 parent key page for balancing.
00347     father_key_pos              position in parent key page for balancing.
00348     father_page                 position of parent key page in file.
00349     insert_last                 If to append at end of page.
00350 
00351   DESCRIPTION
00352     Insert new key at right of key_pos.
00353 
00354   RETURN
00355     2           if key contains key to upper level.
00356     0           OK.
00357     < 0         Error.
00358 */
00359 
00360 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
00361          unsigned char *key, unsigned char *anc_buff, unsigned char *key_pos, unsigned char *key_buff,
00362                unsigned char *father_buff, unsigned char *father_key_pos, internal::my_off_t father_page,
00363          bool insert_last)
00364 {
00365   uint32_t a_length,nod_flag;
00366   int t_length;
00367   unsigned char *endpos, *prev_key;
00368   MI_KEY_PARAM s_temp;
00369 
00370   nod_flag=mi_test_if_nod(anc_buff);
00371   a_length=mi_getint(anc_buff);
00372   endpos= anc_buff+ a_length;
00373   prev_key=(key_pos == anc_buff+2+nod_flag ? (unsigned char*) 0 : key_buff);
00374   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
00375         (key_pos == endpos ? (unsigned char*) 0 : key_pos),
00376         prev_key, prev_key,
00377         key,&s_temp);
00378 
00379   if (t_length > 0)
00380   {
00381     if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
00382     {
00383       mi_print_error(info->s, HA_ERR_CRASHED);
00384       errno=HA_ERR_CRASHED;
00385       return(-1);
00386     }
00387     internal::bmove_upp((unsigned char*) endpos+t_length,(unsigned char*) endpos,(uint) (endpos-key_pos));
00388   }
00389   else
00390   {
00391     if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
00392     {
00393       mi_print_error(info->s, HA_ERR_CRASHED);
00394       errno=HA_ERR_CRASHED;
00395       return(-1);
00396     }
00397     memmove(key_pos, key_pos - t_length, endpos - key_pos + t_length);
00398   }
00399   (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
00400   a_length+=t_length;
00401   mi_putint(anc_buff,a_length,nod_flag);
00402   if (a_length <= keyinfo->block_length)
00403   {
00404     return(0);        /* There is room on page */
00405   }
00406   /* Page is full */
00407   if (nod_flag)
00408     insert_last=0;
00409   if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
00410       father_buff && !insert_last)
00411     return(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
00412          father_key_pos,father_page));
00413   return(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
00414 } /* _mi_insert */
00415 
00416 
00417   /* split a full page in two and assign emerging item to key */
00418 
00419 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
00420        unsigned char *key, unsigned char *buff, unsigned char *key_buff,
00421        bool insert_last_key)
00422 {
00423   uint32_t length,a_length,key_ref_length,t_length,nod_flag,key_length;
00424   unsigned char *key_pos,*pos, *after_key= NULL;
00425   internal::my_off_t new_pos;
00426   MI_KEY_PARAM s_temp;
00427 
00428   if (info->s->keyinfo+info->lastinx == keyinfo)
00429     info->page_changed=1;     /* Info->buff is used */
00430   info->buff_used=1;
00431   nod_flag=mi_test_if_nod(buff);
00432   key_ref_length=2+nod_flag;
00433   if (insert_last_key)
00434     key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
00435   else
00436     key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
00437             &after_key);
00438   if (!key_pos)
00439     return(-1);
00440 
00441   length=(uint) (key_pos-buff);
00442   a_length=mi_getint(buff);
00443   mi_putint(buff,length,nod_flag);
00444 
00445   key_pos=after_key;
00446   if (nod_flag)
00447   {
00448     pos=key_pos-nod_flag;
00449     memcpy(info->buff + 2, pos, nod_flag);
00450   }
00451 
00452   /* Move middle item to key and pointer to new page */
00453   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
00454     return(-1);
00455   _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
00456 
00457   /* Store new page */
00458   if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
00459     return(-1);
00460 
00461   t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(unsigned char *) 0,
00462         (unsigned char*) 0, (unsigned char*) 0,
00463         key_buff, &s_temp);
00464   length=(uint) ((buff+a_length)-key_pos);
00465   memcpy(info->buff+key_ref_length+t_length, key_pos, length);
00466   (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
00467   mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
00468 
00469   if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
00470     return(-1);
00471   return(2);        /* Middle key up */
00472 } /* _mi_split_page */
00473 
00474 
00475   /*
00476     Calculate how to much to move to split a page in two
00477     Returns pointer to start of key.
00478     key will contain the key.
00479     return_key_length will contain the length of key
00480     after_key will contain the position to where the next key starts
00481   */
00482 
00483 unsigned char *_mi_find_half_pos(uint32_t nod_flag, MI_KEYDEF *keyinfo, unsigned char *page,
00484        unsigned char *key, uint32_t *return_key_length,
00485        unsigned char **after_key)
00486 {
00487   uint32_t keys,length,key_ref_length;
00488   unsigned char *end,*lastpos;
00489 
00490   key_ref_length=2+nod_flag;
00491   length=mi_getint(page)-key_ref_length;
00492   page+=key_ref_length;
00493   if (!(keyinfo->flag &
00494   (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
00495    HA_BINARY_PACK_KEY)))
00496   {
00497     key_ref_length=keyinfo->keylength+nod_flag;
00498     keys=length/(key_ref_length*2);
00499     *return_key_length=keyinfo->keylength;
00500     end=page+keys*key_ref_length;
00501     *after_key=end+key_ref_length;
00502     memcpy(key,end,key_ref_length);
00503     return(end);
00504   }
00505 
00506   end=page+length/2-key_ref_length;   /* This is aprox. half */
00507   *key='\0';
00508   do
00509   {
00510     lastpos=page;
00511     if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
00512       return(0);
00513   } while (page < end);
00514   *return_key_length=length;
00515   *after_key=page;
00516   return(lastpos);
00517 } /* _mi_find_half_pos */
00518 
00519 
00520   /*
00521     Split buffer at last key
00522     Returns pointer to the start of the key before the last key
00523     key will contain the last key
00524   */
00525 
00526 static unsigned char *_mi_find_last_pos(MI_KEYDEF *keyinfo, unsigned char *page,
00527         unsigned char *key, uint32_t *return_key_length,
00528         unsigned char **after_key)
00529 {
00530   uint32_t keys;
00531   uint32_t length;
00532   uint32_t last_length= 0;
00533   uint32_t key_ref_length;
00534   unsigned char *end, *lastpos, *prevpos= NULL;
00535   unsigned char key_buff[MI_MAX_KEY_BUFF];
00536 
00537   key_ref_length=2;
00538   length=mi_getint(page)-key_ref_length;
00539   page+=key_ref_length;
00540   if (!(keyinfo->flag &
00541   (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
00542    HA_BINARY_PACK_KEY)))
00543   {
00544     keys=length/keyinfo->keylength-2;
00545     *return_key_length=length=keyinfo->keylength;
00546     end=page+keys*length;
00547     *after_key=end+length;
00548     memcpy(key,end,length);
00549     return(end);
00550   }
00551 
00552   end= page + length - key_ref_length;
00553   *key='\0';
00554   length=0;
00555   lastpos=page;
00556   while (page < end)
00557   {
00558     prevpos=lastpos; lastpos=page;
00559     last_length=length;
00560     memcpy(key, key_buff, length);    /* previous key */
00561     if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
00562     {
00563       mi_print_error(keyinfo->share, HA_ERR_CRASHED);
00564       errno=HA_ERR_CRASHED;
00565       return(0);
00566     }
00567   }
00568   *return_key_length=last_length;
00569   *after_key=lastpos;
00570   return(prevpos);
00571 } /* _mi_find_last_pos */
00572 
00573 
00574   /* Balance page with not packed keys with page on right/left */
00575   /* returns 0 if balance was done */
00576 
00577 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
00578           unsigned char *key, unsigned char *curr_buff, unsigned char *father_buff,
00579           unsigned char *father_key_pos, internal::my_off_t father_page)
00580 {
00581   bool right;
00582   uint32_t k_length,father_length,father_keylength,nod_flag,curr_keylength,
00583        right_length,left_length,new_right_length,new_left_length,extra_length,
00584        length,keys;
00585   unsigned char *pos,*buff,*extra_buff;
00586   internal::my_off_t next_page,new_pos;
00587   unsigned char tmp_part_key[MI_MAX_KEY_BUFF];
00588 
00589   k_length=keyinfo->keylength;
00590   father_length=mi_getint(father_buff);
00591   father_keylength=k_length+info->s->base.key_reflength;
00592   nod_flag=mi_test_if_nod(curr_buff);
00593   curr_keylength=k_length+nod_flag;
00594   info->page_changed=1;
00595 
00596   if ((father_key_pos != father_buff+father_length &&
00597        (info->state->records & 1)) ||
00598       father_key_pos == father_buff+2+info->s->base.key_reflength)
00599   {
00600     right=1;
00601     next_page= _mi_kpos(info->s->base.key_reflength,
00602       father_key_pos+father_keylength);
00603     buff=info->buff;
00604   }
00605   else
00606   {
00607     right=0;
00608     father_key_pos-=father_keylength;
00609     next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
00610           /* Fix that curr_buff is to left */
00611     buff=curr_buff; curr_buff=info->buff;
00612   }         /* father_key_pos ptr to parting key */
00613 
00614   if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
00615     goto err;
00616 
00617   /* Test if there is room to share keys */
00618 
00619   left_length=mi_getint(curr_buff);
00620   right_length=mi_getint(buff);
00621   keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
00622 
00623   if ((right ? right_length : left_length) + curr_keylength <=
00624       keyinfo->block_length)
00625   {           /* Merge buffs */
00626     new_left_length=2+nod_flag+(keys/2)*curr_keylength;
00627     new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
00628     mi_putint(curr_buff,new_left_length,nod_flag);
00629     mi_putint(buff,new_right_length,nod_flag);
00630 
00631     if (left_length < new_left_length)
00632     {           /* Move keys buff -> leaf */
00633       pos=curr_buff+left_length;
00634       memcpy(pos, father_key_pos, k_length);
00635       length= new_left_length - left_length - k_length;
00636       memcpy(pos+k_length, buff+2, length);
00637       pos=buff+2+length;
00638       memcpy(father_key_pos, pos, k_length);
00639       memmove(buff+2, pos+k_length, new_right_length);
00640     }
00641     else
00642     {           /* Move keys -> buff */
00643 
00644       internal::bmove_upp((unsigned char*) buff+new_right_length,(unsigned char*) buff+right_length,
00645     right_length-2);
00646       length=new_right_length-right_length-k_length;
00647       memcpy(buff+2+length,father_key_pos, k_length);
00648       pos=curr_buff+new_left_length;
00649       memcpy(father_key_pos, pos, k_length);
00650       memcpy(buff+2, pos+k_length, length);
00651     }
00652 
00653     if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
00654   _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
00655       goto err;
00656     return(0);
00657   }
00658 
00659   /* curr_buff[] and buff[] are full, lets split and make new nod */
00660 
00661   extra_buff=info->buff+info->s->base.max_key_block_length;
00662   new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
00663   if (keys == 5)        /* Too few keys to balance */
00664     new_left_length-=curr_keylength;
00665   extra_length=nod_flag+left_length+right_length-
00666     new_left_length-new_right_length-curr_keylength;
00667   mi_putint(curr_buff,new_left_length,nod_flag);
00668   mi_putint(buff,new_right_length,nod_flag);
00669   mi_putint(extra_buff,extra_length+2,nod_flag);
00670 
00671   /* move first largest keys to new page  */
00672   pos=buff+right_length-extra_length;
00673   memcpy(extra_buff+2, pos, extra_length);
00674   /* Save new parting key */
00675   memcpy(tmp_part_key, pos-k_length,k_length);
00676   /* Make place for new keys */
00677   internal::bmove_upp((unsigned char*) buff+new_right_length,(unsigned char*) pos-k_length,
00678       right_length-extra_length-k_length-2);
00679   /* Copy keys from left page */
00680   pos= curr_buff+new_left_length;
00681   length= left_length - new_left_length - k_length;
00682   memcpy(buff+2, pos+k_length, length);
00683   /* Copy old parting key */
00684   memcpy(buff+2+length, father_key_pos, k_length);
00685 
00686   /* Move new parting keys up to caller */
00687   memcpy((right ? key : father_key_pos), pos, k_length);
00688   memcpy((right ? father_key_pos : key), tmp_part_key, k_length);
00689 
00690   if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
00691     goto err;
00692   _mi_kpointer(info,key+k_length,new_pos);
00693   if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
00694       DFLT_INIT_HITS,info->buff) ||
00695       _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
00696                         DFLT_INIT_HITS,extra_buff))
00697     goto err;
00698 
00699   return(1);        /* Middle key up */
00700 
00701 err:
00702   return(-1);
00703 } /* _mi_balance_page */
00704 
00705 /**********************************************************************
00706  *                Bulk insert code                                    *
00707  **********************************************************************/
00708 
00709 typedef struct {
00710   MI_INFO *info;
00711   uint32_t keynr;
00712 } bulk_insert_param;
00713 
00714 int _mi_ck_write_tree(register MI_INFO *info, uint32_t keynr, unsigned char *key,
00715           uint32_t key_length)
00716 {
00717   int error;
00718 
00719   error= tree_insert(&info->bulk_insert[keynr], key,
00720          key_length + info->s->rec_reflength,
00721          info->bulk_insert[keynr].custom_arg) ? 0 : HA_ERR_OUT_OF_MEM ;
00722 
00723   return(error);
00724 } /* _mi_ck_write_tree */
00725 
00726 
00727 /* typeof(_mi_keys_compare)=qsort_cmp2 */
00728 
00729 static int keys_compare(bulk_insert_param *param, unsigned char *key1, unsigned char *key2)
00730 {
00731   uint32_t not_used[2];
00732   return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
00733                     key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
00734                     not_used);
00735 }
00736 
00737 
00738 static int keys_free(unsigned char *key, TREE_FREE mode, bulk_insert_param *param)
00739 {
00740   /*
00741     Probably I can use info->lastkey here, but I'm not sure,
00742     and to be safe I'd better use local lastkey.
00743   */
00744   unsigned char lastkey[MI_MAX_KEY_BUFF];
00745   uint32_t keylen;
00746   MI_KEYDEF *keyinfo;
00747 
00748   switch (mode) {
00749   case free_init:
00750     if (param->info->s->concurrent_insert)
00751     {
00752       param->info->s->keyinfo[param->keynr].version++;
00753     }
00754     return 0;
00755   case free_free:
00756     keyinfo=param->info->s->keyinfo+param->keynr;
00757     keylen=_mi_keylength(keyinfo, key);
00758     memcpy(lastkey, key, keylen);
00759     return _mi_ck_write_btree(param->info,param->keynr,lastkey,
00760             keylen - param->info->s->rec_reflength);
00761   case free_end:
00762     return 0;
00763   }
00764   return -1;
00765 }
00766 
00767 
00768 int mi_init_bulk_insert(MI_INFO *info, uint32_t cache_size, ha_rows rows)
00769 {
00770   MYISAM_SHARE *share=info->s;
00771   MI_KEYDEF *key=share->keyinfo;
00772   bulk_insert_param *params;
00773   uint32_t i, num_keys, total_keylength;
00774   uint64_t key_map;
00775 
00776   assert(!info->bulk_insert &&
00777         (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
00778 
00779   mi_clear_all_keys_active(key_map);
00780   for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
00781   {
00782     if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
00783         mi_is_key_active(share->state.key_map, i))
00784     {
00785       num_keys++;
00786       mi_set_key_active(key_map, i);
00787       total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
00788     }
00789   }
00790 
00791   if (num_keys==0 ||
00792       num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
00793     return(0);
00794 
00795   if (rows && rows*total_keylength < cache_size)
00796     cache_size= (uint32_t)rows;
00797   else
00798     cache_size/=total_keylength*16;
00799 
00800   info->bulk_insert=(TREE *)
00801     malloc((sizeof(TREE)*share->base.keys+
00802            sizeof(bulk_insert_param)*num_keys));
00803 
00804   if (!info->bulk_insert)
00805     return(HA_ERR_OUT_OF_MEM);
00806 
00807   params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
00808   for (i=0 ; i < share->base.keys ; i++)
00809   {
00810     if (mi_is_key_active(key_map, i))
00811     {
00812       params->info=info;
00813       params->keynr=i;
00814       /* Only allocate a 16'th of the buffer at a time */
00815       init_tree(&info->bulk_insert[i],
00816                 cache_size * key[i].maxlength,
00817                 cache_size * key[i].maxlength, 0,
00818     (qsort_cmp2)keys_compare, false,
00819     (tree_element_free) keys_free, (void *)params++);
00820     }
00821     else
00822      info->bulk_insert[i].root=0;
00823   }
00824 
00825   return(0);
00826 }
00827 
00828 void mi_flush_bulk_insert(MI_INFO *info, uint32_t inx)
00829 {
00830   if (info->bulk_insert)
00831   {
00832     if (is_tree_inited(&info->bulk_insert[inx]))
00833       reset_tree(&info->bulk_insert[inx]);
00834   }
00835 }
00836 
00837 void mi_end_bulk_insert(MI_INFO *info)
00838 {
00839   if (info->bulk_insert)
00840   {
00841     uint32_t i;
00842     for (i=0 ; i < info->s->base.keys ; i++)
00843     {
00844       if (is_tree_inited(& info->bulk_insert[i]))
00845       {
00846         delete_tree(& info->bulk_insert[i]);
00847       }
00848     }
00849     free((void *)info->bulk_insert);
00850     info->bulk_insert=0;
00851   }
00852 }