Drizzled Public API Documentation

table.cc
00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 
00017 /* Some general useful functions */
00018 
00019 #include <config.h>
00020 
00021 #include <float.h>
00022 #include <fcntl.h>
00023 
00024 #include <string>
00025 #include <vector>
00026 #include <algorithm>
00027 
00028 #include <drizzled/error.h>
00029 #include <drizzled/gettext.h>
00030 
00031 #include <drizzled/plugin/transactional_storage_engine.h>
00032 #include <drizzled/plugin/authorization.h>
00033 #include <drizzled/nested_join.h>
00034 #include <drizzled/sql_parse.h>
00035 #include <drizzled/item/sum.h>
00036 #include <drizzled/table_list.h>
00037 #include <drizzled/session.h>
00038 #include <drizzled/sql_base.h>
00039 #include <drizzled/sql_select.h>
00040 #include <drizzled/field/blob.h>
00041 #include <drizzled/field/varstring.h>
00042 #include <drizzled/field/double.h>
00043 #include <drizzled/unireg.h>
00044 #include <drizzled/message/table.pb.h>
00045 #include <drizzled/sql_table.h>
00046 #include <drizzled/charset.h>
00047 #include <drizzled/internal/m_string.h>
00048 #include <plugin/myisam/myisam.h>
00049 #include <drizzled/plugin/storage_engine.h>
00050 
00051 #include <drizzled/item/string.h>
00052 #include <drizzled/item/int.h>
00053 #include <drizzled/item/decimal.h>
00054 #include <drizzled/item/float.h>
00055 #include <drizzled/item/null.h>
00056 #include <drizzled/temporal.h>
00057 
00058 #include <drizzled/refresh_version.h>
00059 
00060 #include <drizzled/table/singular.h>
00061 
00062 #include <drizzled/table_proto.h>
00063 #include <drizzled/typelib.h>
00064 
00065 using namespace std;
00066 
00067 namespace drizzled
00068 {
00069 
00070 extern plugin::StorageEngine *heap_engine;
00071 extern plugin::StorageEngine *myisam_engine;
00072 
00073 /* Functions defined in this cursor */
00074 
00075 /*************************************************************************/
00076 
00077 // @note this should all be the destructor
00078 int Table::delete_table(bool free_share)
00079 {
00080   int error= 0;
00081 
00082   if (db_stat)
00083     error= cursor->close();
00084   _alias.clear();
00085 
00086   if (field)
00087   {
00088     for (Field **ptr=field ; *ptr ; ptr++)
00089     {
00090       delete *ptr;
00091     }
00092     field= 0;
00093   }
00094   safe_delete(cursor);
00095 
00096   if (free_share)
00097   {
00098     release();
00099   }
00100 
00101   return error;
00102 }
00103 
00104 Table::~Table()
00105 {
00106   mem_root.free_root(MYF(0));
00107 }
00108 
00109 
00110 void Table::resetTable(Session *session,
00111                        TableShare *share,
00112                        uint32_t db_stat_arg)
00113 {
00114   setShare(share);
00115   in_use= session;
00116 
00117   field= NULL;
00118 
00119   cursor= NULL;
00120   next= NULL;
00121   prev= NULL;
00122 
00123   read_set= NULL;
00124   write_set= NULL;
00125 
00126   tablenr= 0;
00127   db_stat= db_stat_arg;
00128 
00129   record[0]= (unsigned char *) NULL;
00130   record[1]= (unsigned char *) NULL;
00131 
00132   insert_values.clear();
00133   key_info= NULL;
00134   next_number_field= NULL;
00135   found_next_number_field= NULL;
00136   timestamp_field= NULL;
00137 
00138   pos_in_table_list= NULL;
00139   group= NULL;
00140   _alias.clear();
00141   null_flags= NULL;
00142 
00143   lock_position= 0;
00144   lock_data_start= 0;
00145   lock_count= 0;
00146   used_fields= 0;
00147   status= 0;
00148   derived_select_number= 0;
00149   current_lock= F_UNLCK;
00150   copy_blobs= false;
00151 
00152   maybe_null= false;
00153 
00154   null_row= false;
00155 
00156   force_index= false;
00157   distinct= false;
00158   const_table= false;
00159   no_rows= false;
00160   key_read= false;
00161   no_keyread= false;
00162 
00163   open_placeholder= false;
00164   locked_by_name= false;
00165   no_cache= false;
00166 
00167   auto_increment_field_not_null= false;
00168   alias_name_used= false;
00169 
00170   query_id= 0;
00171   quick_condition_rows= 0;
00172 
00173   timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
00174   map= 0;
00175 
00176   reginfo.reset();
00177 
00178   covering_keys.reset();
00179 
00180   quick_keys.reset();
00181   merge_keys.reset();
00182 
00183   keys_in_use_for_query.reset();
00184   keys_in_use_for_group_by.reset();
00185   keys_in_use_for_order_by.reset();
00186 
00187   memset(quick_rows, 0, sizeof(ha_rows) * MAX_KEY);
00188   memset(const_key_parts, 0, sizeof(ha_rows) * MAX_KEY);
00189 
00190   memset(quick_key_parts, 0, sizeof(unsigned int) * MAX_KEY);
00191   memset(quick_n_ranges, 0, sizeof(unsigned int) * MAX_KEY);
00192 
00193   memory::init_sql_alloc(&mem_root, TABLE_ALLOC_BLOCK_SIZE, 0);
00194 }
00195 
00196 
00197 
00198 /* Deallocate temporary blob storage */
00199 
00200 void free_blobs(Table *table)
00201 {
00202   uint32_t *ptr, *end;
00203   for (ptr= table->getBlobField(), end=ptr + table->sizeBlobFields();
00204        ptr != end ;
00205        ptr++)
00206   {
00207     ((Field_blob*) table->getField(*ptr))->free();
00208   }
00209 }
00210 
00211 
00212 TYPELIB *typelib(memory::Root *mem_root, List<String> &strings)
00213 {
00214   TYPELIB *result= (TYPELIB*) mem_root->alloc_root(sizeof(TYPELIB));
00215   if (!result)
00216     return 0;
00217   result->count= strings.size();
00218   result->name= "";
00219   uint32_t nbytes= (sizeof(char*) + sizeof(uint32_t)) * (result->count + 1);
00220   
00221   if (!(result->type_names= (const char**) mem_root->alloc_root(nbytes)))
00222     return 0;
00223     
00224   result->type_lengths= (uint*) (result->type_names + result->count + 1);
00225 
00226   List<String>::iterator it(strings.begin());
00227   String *tmp;
00228   for (uint32_t i= 0; (tmp= it++); i++)
00229   {
00230     result->type_names[i]= tmp->ptr();
00231     result->type_lengths[i]= tmp->length();
00232   }
00233 
00234   result->type_names[result->count]= 0;   // End marker
00235   result->type_lengths[result->count]= 0;
00236 
00237   return result;
00238 }
00239 
00240   /* Check that the integer is in the internal */
00241 
00242 int set_zone(int nr, int min_zone, int max_zone)
00243 {
00244   if (nr<=min_zone)
00245     return (min_zone);
00246   if (nr>=max_zone)
00247     return (max_zone);
00248   return (nr);
00249 } /* set_zone */
00250 
00251 
00252 /*
00253   Store an SQL quoted string.
00254 
00255   SYNOPSIS
00256     append_unescaped()
00257     res   result String
00258     pos   string to be quoted
00259     length  it's length
00260 
00261   NOTE
00262     This function works correctly with utf8 or single-byte charset strings.
00263     May fail with some multibyte charsets though.
00264 */
00265 
00266 void append_unescaped(String *res, const char *pos, uint32_t length)
00267 {
00268   const char *end= pos+length;
00269   res->append('\'');
00270 
00271   for (; pos != end ; pos++)
00272   {
00273     uint32_t mblen;
00274     if (use_mb(default_charset_info) &&
00275         (mblen= my_ismbchar(default_charset_info, pos, end)))
00276     {
00277       res->append(pos, mblen);
00278       pos+= mblen - 1;
00279       if (pos >= end)
00280         break;
00281       continue;
00282     }
00283 
00284     switch (*pos) {
00285     case 0:       /* Must be escaped for 'mysql' */
00286       res->append('\\');
00287       res->append('0');
00288       break;
00289     case '\n':        /* Must be escaped for logs */
00290       res->append('\\');
00291       res->append('n');
00292       break;
00293     case '\r':
00294       res->append('\\');    /* This gives better readability */
00295       res->append('r');
00296       break;
00297     case '\\':
00298       res->append('\\');    /* Because of the sql syntax */
00299       res->append('\\');
00300       break;
00301     case '\'':
00302       res->append('\'');    /* Because of the sql syntax */
00303       res->append('\'');
00304       break;
00305     default:
00306       res->append(*pos);
00307       break;
00308     }
00309   }
00310   res->append('\'');
00311 }
00312 
00313 
00314 int rename_file_ext(const char * from,const char * to,const char * ext)
00315 {
00316   string from_s, to_s;
00317 
00318   from_s.append(from);
00319   from_s.append(ext);
00320   to_s.append(to);
00321   to_s.append(ext);
00322   return (internal::my_rename(from_s.c_str(),to_s.c_str(),MYF(MY_WME)));
00323 }
00324 
00325 /*
00326   Allow anything as a table name, as long as it doesn't contain an
00327   ' ' at the end
00328   returns 1 on error
00329 */
00330 bool check_table_name(const char *name, uint32_t length)
00331 {
00332   if (!length || length > NAME_LEN || name[length - 1] == ' ')
00333     return 1;
00334   LEX_STRING ident;
00335   ident.str= (char*) name;
00336   ident.length= length;
00337   return check_identifier_name(&ident);
00338 }
00339 
00340 
00341 /*
00342   Eventually, a "length" argument should be added
00343   to this function, and the inner loop changed to
00344   check_identifier_name() call.
00345 */
00346 bool check_column_name(const char *name)
00347 {
00348   uint32_t name_length= 0;  // name length in symbols
00349   bool last_char_is_space= true;
00350 
00351   while (*name)
00352   {
00353     last_char_is_space= my_isspace(system_charset_info, *name);
00354     if (use_mb(system_charset_info))
00355     {
00356       int len=my_ismbchar(system_charset_info, name,
00357                           name+system_charset_info->mbmaxlen);
00358       if (len)
00359       {
00360         if (len > 3) /* Disallow non-BMP characters */
00361           return 1;
00362         name += len;
00363         name_length++;
00364         continue;
00365       }
00366     }
00367     /*
00368       NAMES_SEP_CHAR is used in FRM format to separate SET and ENUM values.
00369       It is defined as 0xFF, which is a not valid byte in utf8.
00370       This assert is to catch use of this byte if we decide to
00371       use non-utf8 as system_character_set.
00372     */
00373     assert(*name != NAMES_SEP_CHAR);
00374     name++;
00375     name_length++;
00376   }
00377   /* Error if empty or too long column name */
00378   return last_char_is_space || (uint32_t) name_length > NAME_CHAR_LEN;
00379 }
00380 
00381 
00382 /*****************************************************************************
00383   Functions to handle column usage bitmaps (read_set, write_set etc...)
00384 *****************************************************************************/
00385 
00386 /* Reset all columns bitmaps */
00387 
00388 void Table::clear_column_bitmaps()
00389 {
00390   /*
00391     Reset column read/write usage. It's identical to:
00392     bitmap_clear_all(&table->def_read_set);
00393     bitmap_clear_all(&table->def_write_set);
00394   */
00395   def_read_set.reset();
00396   def_write_set.reset();
00397   column_bitmaps_set(def_read_set, def_write_set);
00398 }
00399 
00400 
00401 /*
00402   Tell Cursor we are going to call position() and rnd_pos() later.
00403 
00404   NOTES:
00405   This is needed for handlers that uses the primary key to find the
00406   row. In this case we have to extend the read bitmap with the primary
00407   key fields.
00408 */
00409 
00410 void Table::prepare_for_position()
00411 {
00412 
00413   if ((cursor->getEngine()->check_flag(HTON_BIT_PRIMARY_KEY_IN_READ_INDEX)) &&
00414       getShare()->hasPrimaryKey())
00415   {
00416     mark_columns_used_by_index_no_reset(getShare()->getPrimaryKey());
00417   }
00418   return;
00419 }
00420 
00421 
00422 /*
00423   Mark that only fields from one key is used
00424 
00425   NOTE:
00426     This changes the bitmap to use the tmp bitmap
00427     After this, you can't access any other columns in the table until
00428     bitmaps are reset, for example with Table::clear_column_bitmaps()
00429     or Table::restore_column_maps_after_mark_index()
00430 */
00431 
00432 void Table::mark_columns_used_by_index(uint32_t index)
00433 {
00434   boost::dynamic_bitset<> *bitmap= &tmp_set;
00435 
00436   (void) cursor->extra(HA_EXTRA_KEYREAD);
00437   bitmap->reset();
00438   mark_columns_used_by_index_no_reset(index, *bitmap);
00439   column_bitmaps_set(*bitmap, *bitmap);
00440   return;
00441 }
00442 
00443 
00444 /*
00445   Restore to use normal column maps after key read
00446 
00447   NOTES
00448     This reverse the change done by mark_columns_used_by_index
00449 
00450   WARNING
00451     For this to work, one must have the normal table maps in place
00452     when calling mark_columns_used_by_index
00453 */
00454 
00455 void Table::restore_column_maps_after_mark_index()
00456 {
00457 
00458   key_read= 0;
00459   (void) cursor->extra(HA_EXTRA_NO_KEYREAD);
00460   default_column_bitmaps();
00461   return;
00462 }
00463 
00464 
00465 /*
00466   mark columns used by key, but don't reset other fields
00467 */
00468 
00469 void Table::mark_columns_used_by_index_no_reset(uint32_t index)
00470 {
00471     mark_columns_used_by_index_no_reset(index, *read_set);
00472 }
00473 
00474 
00475 void Table::mark_columns_used_by_index_no_reset(uint32_t index,
00476                                                 boost::dynamic_bitset<>& bitmap)
00477 {
00478   KeyPartInfo *key_part= key_info[index].key_part;
00479   KeyPartInfo *key_part_end= (key_part + key_info[index].key_parts);
00480   for (; key_part != key_part_end; key_part++)
00481   {
00482     if (! bitmap.empty())
00483       bitmap.set(key_part->fieldnr-1);
00484   }
00485 }
00486 
00487 
00488 /*
00489   Mark auto-increment fields as used fields in both read and write maps
00490 
00491   NOTES
00492     This is needed in insert & update as the auto-increment field is
00493     always set and sometimes read.
00494 */
00495 
00496 void Table::mark_auto_increment_column()
00497 {
00498   assert(found_next_number_field);
00499   /*
00500     We must set bit in read set as update_auto_increment() is using the
00501     store() to check overflow of auto_increment values
00502   */
00503   setReadSet(found_next_number_field->position());
00504   setWriteSet(found_next_number_field->position());
00505   if (getShare()->next_number_keypart)
00506     mark_columns_used_by_index_no_reset(getShare()->next_number_index);
00507 }
00508 
00509 
00510 /*
00511   Mark columns needed for doing an delete of a row
00512 
00513   DESCRIPTON
00514     Some table engines don't have a cursor on the retrieve rows
00515     so they need either to use the primary key or all columns to
00516     be able to delete a row.
00517 
00518     If the engine needs this, the function works as follows:
00519     - If primary key exits, mark the primary key columns to be read.
00520     - If not, mark all columns to be read
00521 
00522     If the engine has HA_REQUIRES_KEY_COLUMNS_FOR_DELETE, we will
00523     mark all key columns as 'to-be-read'. This allows the engine to
00524     loop over the given record to find all keys and doesn't have to
00525     retrieve the row again.
00526 */
00527 
00528 void Table::mark_columns_needed_for_delete()
00529 {
00530   /*
00531     If the Cursor has no cursor capabilites, or we have row-based
00532     replication active for the current statement, we have to read
00533     either the primary key, the hidden primary key or all columns to
00534     be able to do an delete
00535 
00536   */
00537   if (not getShare()->hasPrimaryKey())
00538   {
00539     /* fallback to use all columns in the table to identify row */
00540     use_all_columns();
00541     return;
00542   }
00543   else
00544     mark_columns_used_by_index_no_reset(getShare()->getPrimaryKey());
00545 
00546   /* If we the engine wants all predicates we mark all keys */
00547   if (cursor->getEngine()->check_flag(HTON_BIT_REQUIRES_KEY_COLUMNS_FOR_DELETE))
00548   {
00549     Field **reg_field;
00550     for (reg_field= field ; *reg_field ; reg_field++)
00551     {
00552       if ((*reg_field)->flags & PART_KEY_FLAG)
00553         setReadSet((*reg_field)->position());
00554     }
00555   }
00556 }
00557 
00558 
00559 /*
00560   Mark columns needed for doing an update of a row
00561 
00562   DESCRIPTON
00563     Some engines needs to have all columns in an update (to be able to
00564     build a complete row). If this is the case, we mark all not
00565     updated columns to be read.
00566 
00567     If this is no the case, we do like in the delete case and mark
00568     if neeed, either the primary key column or all columns to be read.
00569     (see mark_columns_needed_for_delete() for details)
00570 
00571     If the engine has HTON_BIT_REQUIRES_KEY_COLUMNS_FOR_DELETE, we will
00572     mark all USED key columns as 'to-be-read'. This allows the engine to
00573     loop over the given record to find all changed keys and doesn't have to
00574     retrieve the row again.
00575 */
00576 
00577 void Table::mark_columns_needed_for_update()
00578 {
00579   /*
00580     If the Cursor has no cursor capabilites, or we have row-based
00581     logging active for the current statement, we have to read either
00582     the primary key, the hidden primary key or all columns to be
00583     able to do an update
00584   */
00585   if (not getShare()->hasPrimaryKey())
00586   {
00587     /* fallback to use all columns in the table to identify row */
00588     use_all_columns();
00589     return;
00590   }
00591   else
00592     mark_columns_used_by_index_no_reset(getShare()->getPrimaryKey());
00593 
00594   if (cursor->getEngine()->check_flag(HTON_BIT_REQUIRES_KEY_COLUMNS_FOR_DELETE))
00595   {
00596     /* Mark all used key columns for read */
00597     Field **reg_field;
00598     for (reg_field= field ; *reg_field ; reg_field++)
00599     {
00600       /* Merge keys is all keys that had a column refered to in the query */
00601       if (is_overlapping(merge_keys, (*reg_field)->part_of_key))
00602         setReadSet((*reg_field)->position());
00603     }
00604   }
00605 
00606 }
00607 
00608 
00609 /*
00610   Mark columns the Cursor needs for doing an insert
00611 
00612   For now, this is used to mark fields used by the trigger
00613   as changed.
00614 */
00615 
00616 void Table::mark_columns_needed_for_insert()
00617 {
00618   if (found_next_number_field)
00619     mark_auto_increment_column();
00620 }
00621 
00622 
00623 
00624 size_t Table::max_row_length(const unsigned char *data)
00625 {
00626   size_t length= getRecordLength() + 2 * sizeFields();
00627   uint32_t *const beg= getBlobField();
00628   uint32_t *const end= beg + sizeBlobFields();
00629 
00630   for (uint32_t *ptr= beg ; ptr != end ; ++ptr)
00631   {
00632     Field_blob* const blob= (Field_blob*) field[*ptr];
00633     length+= blob->get_length((const unsigned char*)
00634                               (data + blob->offset(getInsertRecord()))) +
00635       HA_KEY_BLOB_LENGTH;
00636   }
00637   return length;
00638 }
00639 
00640 void Table::setVariableWidth(void)
00641 {
00642   assert(in_use);
00643   if (in_use && in_use->lex().sql_command == SQLCOM_CREATE_TABLE)
00644   {
00645     getMutableShare()->setVariableWidth();
00646     return;
00647   }
00648 
00649   assert(0); // Programming error, you can't set this on a plain old Table.
00650 }
00651 
00652 /****************************************************************************
00653  Functions for creating temporary tables.
00654 ****************************************************************************/
00677 Field *create_tmp_field_from_field(Session *session, Field *org_field,
00678                                    const char *name, Table *table,
00679                                    Item_field *item, uint32_t convert_blob_length)
00680 {
00681   Field *new_field;
00682 
00683   /*
00684     Make sure that the blob fits into a Field_varstring which has
00685     2-byte lenght.
00686   */
00687   if (convert_blob_length && convert_blob_length <= Field_varstring::MAX_SIZE &&
00688       (org_field->flags & BLOB_FLAG))
00689   {
00690     table->setVariableWidth();
00691     new_field= new Field_varstring(convert_blob_length,
00692                                    org_field->maybe_null(),
00693                                    org_field->field_name,
00694                                    org_field->charset());
00695   }
00696   else
00697   {
00698     new_field= org_field->new_field(session->mem_root, table,
00699                                     table == org_field->getTable());
00700   }
00701   if (new_field)
00702   {
00703     new_field->init(table);
00704     new_field->orig_table= org_field->orig_table;
00705     if (item)
00706       item->result_field= new_field;
00707     else
00708       new_field->field_name= name;
00709     new_field->flags|= (org_field->flags & NO_DEFAULT_VALUE_FLAG);
00710     if (org_field->maybe_null() || (item && item->maybe_null))
00711       new_field->flags&= ~NOT_NULL_FLAG;  // Because of outer join
00712     if (org_field->type() == DRIZZLE_TYPE_VARCHAR)
00713       table->getMutableShare()->db_create_options|= HA_OPTION_PACK_RECORD;
00714     else if (org_field->type() == DRIZZLE_TYPE_DOUBLE)
00715       ((Field_double *) new_field)->not_fixed= true;
00716   }
00717   return new_field;
00718 }
00719 
00720 
00747 #define STRING_TOTAL_LENGTH_TO_PACK_ROWS 128
00748 #define AVG_STRING_LENGTH_TO_PACK_ROWS   64
00749 #define RATIO_TO_PACK_ROWS         2
00750 
00751 Table *
00752 create_tmp_table(Session *session,Tmp_Table_Param *param,List<Item> &fields,
00753      Order *group, bool distinct, bool save_sum_fields,
00754      uint64_t select_options, ha_rows rows_limit,
00755      const char *table_alias)
00756 {
00757   memory::Root *mem_root_save;
00758   uint  i,field_count,null_count,null_pack_length;
00759   uint32_t  copy_func_count= param->func_count;
00760   uint32_t  hidden_null_count, hidden_null_pack_length, hidden_field_count;
00761   uint32_t  blob_count,group_null_items, string_count;
00762   uint32_t fieldnr= 0;
00763   ulong reclength, string_total_length;
00764   bool  using_unique_constraint= false;
00765   bool  use_packed_rows= true;
00766   bool  not_all_columns= !(select_options & TMP_TABLE_ALL_COLUMNS);
00767   unsigned char *pos, *group_buff;
00768   unsigned char *null_flags;
00769   Field **reg_field, **from_field, **default_field;
00770   CopyField *copy= 0;
00771   KeyInfo *keyinfo;
00772   KeyPartInfo *key_part_info;
00773   Item **copy_func;
00774   MI_COLUMNDEF *recinfo;
00775   uint32_t total_uneven_bit_length= 0;
00776   bool force_copy_fields= param->force_copy_fields;
00777   uint64_t max_rows= 0;
00778 
00779   session->status_var.created_tmp_tables++;
00780 
00781   if (group)
00782   {
00783     if (! param->quick_group)
00784     {
00785       group= 0;         // Can't use group key
00786     }
00787     else for (Order *tmp=group ; tmp ; tmp=tmp->next)
00788     {
00789       /*
00790         marker == 4 means two things:
00791         - store NULLs in the key, and
00792         - convert BIT fields to 64-bit long, needed because MEMORY tables
00793           can't index BIT fields.
00794       */
00795       (*tmp->item)->marker= 4;
00796       if ((*tmp->item)->max_length >= CONVERT_IF_BIGGER_TO_BLOB)
00797   using_unique_constraint= true;
00798     }
00799     if (param->group_length >= MAX_BLOB_WIDTH)
00800       using_unique_constraint= true;
00801     if (group)
00802       distinct= 0;        // Can't use distinct
00803   }
00804 
00805   field_count=param->field_count+param->func_count+param->sum_func_count;
00806   hidden_field_count=param->hidden_field_count;
00807 
00808   /*
00809     When loose index scan is employed as access method, it already
00810     computes all groups and the result of all aggregate functions. We
00811     make space for the items of the aggregate function in the list of
00812     functions Tmp_Table_Param::items_to_copy, so that the values of
00813     these items are stored in the temporary table.
00814   */
00815   if (param->precomputed_group_by)
00816   {
00817     copy_func_count+= param->sum_func_count;
00818   }
00819 
00820   table::Singular *table;
00821   table= session->getInstanceTable(); // This will not go into the tableshare cache, so no key is used.
00822 
00823   if (not table->getMemRoot()->multi_alloc_root(0,
00824                                                 &default_field, sizeof(Field*) * (field_count),
00825                                                 &from_field, sizeof(Field*)*field_count,
00826                                                 &copy_func, sizeof(*copy_func)*(copy_func_count+1),
00827                                                 &param->keyinfo, sizeof(*param->keyinfo),
00828                                                 &key_part_info, sizeof(*key_part_info)*(param->group_parts+1),
00829                                                 &param->start_recinfo, sizeof(*param->recinfo)*(field_count*2+4),
00830                                                 &group_buff, (group && ! using_unique_constraint ?
00831                                                               param->group_length : 0),
00832                                                 NULL))
00833   {
00834     return NULL;
00835   }
00836   /* CopyField belongs to Tmp_Table_Param, allocate it in Session mem_root */
00837   if (!(param->copy_field= copy= new (session->mem_root) CopyField[field_count]))
00838   {
00839     return NULL;
00840   }
00841   param->items_to_copy= copy_func;
00842   /* make table according to fields */
00843 
00844   memset(default_field, 0, sizeof(Field*) * (field_count));
00845   memset(from_field, 0, sizeof(Field*)*field_count);
00846 
00847   mem_root_save= session->mem_root;
00848   session->mem_root= table->getMemRoot();
00849 
00850   table->getMutableShare()->setFields(field_count+1);
00851   table->setFields(table->getMutableShare()->getFields(true));
00852   reg_field= table->getMutableShare()->getFields(true);
00853   table->setAlias(table_alias);
00854   table->reginfo.lock_type=TL_WRITE;  /* Will be updated */
00855   table->db_stat=HA_OPEN_KEYFILE+HA_OPEN_RNDFILE;
00856   table->map=1;
00857   table->copy_blobs= 1;
00858   assert(session);
00859   table->in_use= session;
00860   table->quick_keys.reset();
00861   table->covering_keys.reset();
00862   table->keys_in_use_for_query.reset();
00863 
00864   table->getMutableShare()->blob_field.resize(field_count+1);
00865   uint32_t *blob_field= &table->getMutableShare()->blob_field[0];
00866   table->getMutableShare()->db_low_byte_first=1;                // True for HEAP and MyISAM
00867   table->getMutableShare()->table_charset= param->table_charset;
00868   table->getMutableShare()->keys_for_keyread.reset();
00869   table->getMutableShare()->keys_in_use.reset();
00870 
00871   /* Calculate which type of fields we will store in the temporary table */
00872 
00873   reclength= string_total_length= 0;
00874   blob_count= string_count= null_count= hidden_null_count= group_null_items= 0;
00875   param->using_indirect_summary_function= 0;
00876 
00877   List<Item>::iterator li(fields.begin());
00878   Item *item;
00879   Field **tmp_from_field=from_field;
00880   while ((item=li++))
00881   {
00882     Item::Type type=item->type();
00883     if (not_all_columns)
00884     {
00885       if (item->with_sum_func && type != Item::SUM_FUNC_ITEM)
00886       {
00887         if (item->used_tables() & OUTER_REF_TABLE_BIT)
00888           item->update_used_tables();
00889         if (type == Item::SUBSELECT_ITEM ||
00890             (item->used_tables() & ~OUTER_REF_TABLE_BIT))
00891         {
00892     /*
00893       Mark that the we have ignored an item that refers to a summary
00894       function. We need to know this if someone is going to use
00895       DISTINCT on the result.
00896     */
00897     param->using_indirect_summary_function=1;
00898     continue;
00899         }
00900       }
00901       if (item->const_item() && (int) hidden_field_count <= 0)
00902         continue; // We don't have to store this
00903     }
00904     if (type == Item::SUM_FUNC_ITEM && !group && !save_sum_fields)
00905     {           /* Can't calc group yet */
00906       ((Item_sum*) item)->result_field= 0;
00907       for (i= 0 ; i < ((Item_sum*) item)->arg_count ; i++)
00908       {
00909   Item **argp= ((Item_sum*) item)->args + i;
00910   Item *arg= *argp;
00911   if (!arg->const_item())
00912   {
00913     Field *new_field=
00914             create_tmp_field(session, table, arg, arg->type(), &copy_func,
00915                              tmp_from_field, &default_field[fieldnr],
00916                              group != 0,not_all_columns,
00917                              false,
00918                              param->convert_blob_length);
00919     if (!new_field)
00920       goto err;         // Should be OOM
00921     tmp_from_field++;
00922     reclength+=new_field->pack_length();
00923     if (new_field->flags & BLOB_FLAG)
00924     {
00925       *blob_field++= fieldnr;
00926       blob_count++;
00927     }
00928     *(reg_field++)= new_field;
00929           if (new_field->real_type() == DRIZZLE_TYPE_VARCHAR)
00930           {
00931             string_count++;
00932             string_total_length+= new_field->pack_length();
00933           }
00934           session->mem_root= mem_root_save;
00935           *argp= new Item_field(new_field);
00936           session->mem_root= table->getMemRoot();
00937     if (!(new_field->flags & NOT_NULL_FLAG))
00938           {
00939       null_count++;
00940             /*
00941               new_field->maybe_null() is still false, it will be
00942               changed below. But we have to setup Item_field correctly
00943             */
00944             (*argp)->maybe_null=1;
00945           }
00946           new_field->setPosition(fieldnr++);
00947   }
00948       }
00949     }
00950     else
00951     {
00952       /*
00953   The last parameter to create_tmp_field() is a bit tricky:
00954 
00955   We need to set it to 0 in union, to get fill_record() to modify the
00956   temporary table.
00957   We need to set it to 1 on multi-table-update and in select to
00958   write rows to the temporary table.
00959   We here distinguish between UNION and multi-table-updates by the fact
00960   that in the later case group is set to the row pointer.
00961       */
00962       Field *new_field=
00963         create_tmp_field(session, table, item, type, &copy_func,
00964                          tmp_from_field, &default_field[fieldnr],
00965                          group != 0,
00966                          !force_copy_fields &&
00967                            (not_all_columns || group != 0),
00968                          force_copy_fields,
00969                          param->convert_blob_length);
00970 
00971       if (!new_field)
00972       {
00973   if (session->is_fatal_error)
00974     goto err;       // Got OOM
00975   continue;       // Some kindf of const item
00976       }
00977       if (type == Item::SUM_FUNC_ITEM)
00978   ((Item_sum *) item)->result_field= new_field;
00979       tmp_from_field++;
00980       reclength+=new_field->pack_length();
00981       if (!(new_field->flags & NOT_NULL_FLAG))
00982   null_count++;
00983       if (new_field->flags & BLOB_FLAG)
00984       {
00985         *blob_field++= fieldnr;
00986   blob_count++;
00987       }
00988       if (item->marker == 4 && item->maybe_null)
00989       {
00990   group_null_items++;
00991   new_field->flags|= GROUP_FLAG;
00992       }
00993       new_field->setPosition(fieldnr++);
00994       *(reg_field++)= new_field;
00995     }
00996     if (!--hidden_field_count)
00997     {
00998       /*
00999         This was the last hidden field; Remember how many hidden fields could
01000         have null
01001       */
01002       hidden_null_count=null_count;
01003       /*
01004   We need to update hidden_field_count as we may have stored group
01005   functions with constant arguments
01006       */
01007       param->hidden_field_count= fieldnr;
01008       null_count= 0;
01009     }
01010   }
01011   assert(fieldnr == (uint32_t) (reg_field - table->getFields()));
01012   assert(field_count >= (uint32_t) (reg_field - table->getFields()));
01013   field_count= fieldnr;
01014   *reg_field= 0;
01015   *blob_field= 0;       // End marker
01016   table->getMutableShare()->setFieldSize(field_count);
01017 
01018   /* If result table is small; use a heap */
01019   /* future: storage engine selection can be made dynamic? */
01020   if (blob_count || using_unique_constraint || 
01021       (session->lex().select_lex.options & SELECT_BIG_RESULT) ||
01022       (session->lex().current_select->olap == ROLLUP_TYPE) ||
01023       (select_options & (OPTION_BIG_TABLES | SELECT_SMALL_RESULT)) == OPTION_BIG_TABLES)
01024   {
01025     table->getMutableShare()->storage_engine= myisam_engine;
01026     table->cursor= table->getMutableShare()->db_type()->getCursor(*table);
01027     if (group &&
01028   (param->group_parts > table->cursor->getEngine()->max_key_parts() ||
01029    param->group_length > table->cursor->getEngine()->max_key_length()))
01030     {
01031       using_unique_constraint= true;
01032     }
01033   }
01034   else
01035   {
01036     table->getMutableShare()->storage_engine= heap_engine;
01037     table->cursor= table->getMutableShare()->db_type()->getCursor(*table);
01038   }
01039   if (! table->cursor)
01040     goto err;
01041 
01042 
01043   if (! using_unique_constraint)
01044     reclength+= group_null_items; // null flag is stored separately
01045 
01046   table->getMutableShare()->blob_fields= blob_count;
01047   if (blob_count == 0)
01048   {
01049     /* We need to ensure that first byte is not 0 for the delete link */
01050     if (param->hidden_field_count)
01051       hidden_null_count++;
01052     else
01053       null_count++;
01054   }
01055   hidden_null_pack_length=(hidden_null_count+7)/8;
01056   null_pack_length= (hidden_null_pack_length +
01057                      (null_count + total_uneven_bit_length + 7) / 8);
01058   reclength+=null_pack_length;
01059   if (!reclength)
01060     reclength=1;        // Dummy select
01061   /* Use packed rows if there is blobs or a lot of space to gain */
01062   if (blob_count || ((string_total_length >= STRING_TOTAL_LENGTH_TO_PACK_ROWS) && (reclength / string_total_length <= RATIO_TO_PACK_ROWS || (string_total_length / string_count) >= AVG_STRING_LENGTH_TO_PACK_ROWS)))
01063     use_packed_rows= 1;
01064 
01065   table->getMutableShare()->setRecordLength(reclength);
01066   {
01067     uint32_t alloc_length=ALIGN_SIZE(reclength+MI_UNIQUE_HASH_LENGTH+1);
01068     table->getMutableShare()->rec_buff_length= alloc_length;
01069     if (!(table->record[0]= (unsigned char*) table->alloc_root(alloc_length*2)))
01070     {
01071       goto err;
01072     }
01073     table->record[1]= table->getInsertRecord()+alloc_length;
01074     table->getMutableShare()->resizeDefaultValues(alloc_length);
01075   }
01076   copy_func[0]= 0;        // End marker
01077   param->func_count= copy_func - param->items_to_copy;
01078 
01079   table->setup_tmp_table_column_bitmaps();
01080 
01081   recinfo=param->start_recinfo;
01082   null_flags=(unsigned char*) table->getInsertRecord();
01083   pos=table->getInsertRecord()+ null_pack_length;
01084   if (null_pack_length)
01085   {
01086     memset(recinfo, 0, sizeof(*recinfo));
01087     recinfo->type=FIELD_NORMAL;
01088     recinfo->length=null_pack_length;
01089     recinfo++;
01090     memset(null_flags, 255, null_pack_length);  // Set null fields
01091 
01092     table->null_flags= (unsigned char*) table->getInsertRecord();
01093     table->getMutableShare()->null_fields= null_count+ hidden_null_count;
01094     table->getMutableShare()->null_bytes= null_pack_length;
01095   }
01096   null_count= (blob_count == 0) ? 1 : 0;
01097   hidden_field_count=param->hidden_field_count;
01098   for (i= 0,reg_field= table->getFields(); i < field_count; i++,reg_field++,recinfo++)
01099   {
01100     Field *field= *reg_field;
01101     uint32_t length;
01102     memset(recinfo, 0, sizeof(*recinfo));
01103 
01104     if (!(field->flags & NOT_NULL_FLAG))
01105     {
01106       if (field->flags & GROUP_FLAG && !using_unique_constraint)
01107       {
01108   /*
01109     We have to reserve one byte here for NULL bits,
01110     as this is updated by 'end_update()'
01111   */
01112   *pos++= '\0';       // Null is stored here
01113   recinfo->length= 1;
01114   recinfo->type=FIELD_NORMAL;
01115   recinfo++;
01116   memset(recinfo, 0, sizeof(*recinfo));
01117       }
01118       else
01119       {
01120   recinfo->null_bit= 1 << (null_count & 7);
01121   recinfo->null_pos= null_count/8;
01122       }
01123       field->move_field(pos,null_flags+null_count/8,
01124       1 << (null_count & 7));
01125       null_count++;
01126     }
01127     else
01128       field->move_field(pos,(unsigned char*) 0,0);
01129     field->reset();
01130 
01131     /*
01132       Test if there is a default field value. The test for ->ptr is to skip
01133       'offset' fields generated by initalize_tables
01134     */
01135     if (default_field[i] && default_field[i]->ptr)
01136     {
01137       /*
01138          default_field[i] is set only in the cases  when 'field' can
01139          inherit the default value that is defined for the field referred
01140          by the Item_field object from which 'field' has been created.
01141       */
01142       ptrdiff_t diff;
01143       Field *orig_field= default_field[i];
01144       /* Get the value from default_values */
01145       diff= (ptrdiff_t) (orig_field->getTable()->getDefaultValues() - orig_field->getTable()->getInsertRecord());
01146       orig_field->move_field_offset(diff);      // Points now at default_values
01147       if (orig_field->is_real_null())
01148         field->set_null();
01149       else
01150       {
01151         field->set_notnull();
01152         memcpy(field->ptr, orig_field->ptr, field->pack_length());
01153       }
01154       orig_field->move_field_offset(-diff);     // Back to getInsertRecord()
01155     }
01156 
01157     if (from_field[i])
01158     {           /* Not a table Item */
01159       copy->set(field,from_field[i],save_sum_fields);
01160       copy++;
01161     }
01162     length=field->pack_length();
01163     pos+= length;
01164 
01165     /* Make entry for create table */
01166     recinfo->length=length;
01167     if (field->flags & BLOB_FLAG)
01168       recinfo->type= (int) FIELD_BLOB;
01169     else
01170       recinfo->type=FIELD_NORMAL;
01171     if (!--hidden_field_count)
01172       null_count=(null_count+7) & ~7;   // move to next byte
01173   }
01174 
01175   param->copy_field_end=copy;
01176   param->recinfo=recinfo;
01177   table->storeRecordAsDefault();        // Make empty default record
01178 
01179   if (session->variables.tmp_table_size == ~ (uint64_t) 0)    // No limit
01180   {
01181     max_rows= ~(uint64_t) 0;
01182   }
01183   else
01184   {
01185     max_rows= (uint64_t) (((table->getMutableShare()->db_type() == heap_engine) ?
01186                            min(session->variables.tmp_table_size,
01187                                session->variables.max_heap_table_size) :
01188                            session->variables.tmp_table_size) /
01189                           table->getMutableShare()->getRecordLength());
01190   }
01191 
01192   set_if_bigger(max_rows, (uint64_t)1); // For dummy start options
01193   /*
01194     Push the LIMIT clause to the temporary table creation, so that we
01195     materialize only up to 'rows_limit' records instead of all result records.
01196   */
01197   set_if_smaller(max_rows, rows_limit);
01198 
01199   table->getMutableShare()->setMaxRows(max_rows);
01200 
01201   param->end_write_records= rows_limit;
01202 
01203   keyinfo= param->keyinfo;
01204 
01205   if (group)
01206   {
01207     table->group=group;       /* Table is grouped by key */
01208     param->group_buff=group_buff;
01209     table->getMutableShare()->keys=1;
01210     table->getMutableShare()->uniques= test(using_unique_constraint);
01211     table->key_info=keyinfo;
01212     keyinfo->key_part=key_part_info;
01213     keyinfo->flags=HA_NOSAME;
01214     keyinfo->usable_key_parts=keyinfo->key_parts= param->group_parts;
01215     keyinfo->key_length= 0;
01216     keyinfo->rec_per_key= 0;
01217     keyinfo->algorithm= HA_KEY_ALG_UNDEF;
01218     keyinfo->name= (char*) "group_key";
01219     Order *cur_group= group;
01220     for (; cur_group ; cur_group= cur_group->next, key_part_info++)
01221     {
01222       Field *field=(*cur_group->item)->get_tmp_table_field();
01223       bool maybe_null=(*cur_group->item)->maybe_null;
01224       key_part_info->null_bit= 0;
01225       key_part_info->field=  field;
01226       key_part_info->offset= field->offset(table->getInsertRecord());
01227       key_part_info->length= (uint16_t) field->key_length();
01228       key_part_info->type=   (uint8_t) field->key_type();
01229       key_part_info->key_type= 
01230   ((ha_base_keytype) key_part_info->type == HA_KEYTYPE_TEXT ||
01231    (ha_base_keytype) key_part_info->type == HA_KEYTYPE_VARTEXT1 ||
01232    (ha_base_keytype) key_part_info->type == HA_KEYTYPE_VARTEXT2) ?
01233   0 : 1;
01234       if (!using_unique_constraint)
01235       {
01236   cur_group->buff=(char*) group_buff;
01237   if (!(cur_group->field= field->new_key_field(session->mem_root,table,
01238                                                      group_buff +
01239                                                      test(maybe_null),
01240                                                      field->null_ptr,
01241                                                      field->null_bit)))
01242     goto err;
01243   if (maybe_null)
01244   {
01245     /*
01246       To be able to group on NULL, we reserved place in group_buff
01247       for the NULL flag just before the column. (see above).
01248       The field data is after this flag.
01249       The NULL flag is updated in 'end_update()' and 'end_write()'
01250     */
01251     keyinfo->flags|= HA_NULL_ARE_EQUAL; // def. that NULL == NULL
01252     key_part_info->null_bit=field->null_bit;
01253     key_part_info->null_offset= (uint32_t) (field->null_ptr -
01254                 (unsigned char*) table->getInsertRecord());
01255           cur_group->buff++;                        // Pointer to field data
01256     group_buff++;                         // Skipp null flag
01257   }
01258         /* In GROUP BY 'a' and 'a ' are equal for VARCHAR fields */
01259         key_part_info->key_part_flag|= HA_END_SPACE_ARE_EQUAL;
01260   group_buff+= cur_group->field->pack_length();
01261       }
01262       keyinfo->key_length+=  key_part_info->length;
01263     }
01264   }
01265 
01266   if (distinct && field_count != param->hidden_field_count)
01267   {
01268     /*
01269       Create an unique key or an unique constraint over all columns
01270       that should be in the result.  In the temporary table, there are
01271       'param->hidden_field_count' extra columns, whose null bits are stored
01272       in the first 'hidden_null_pack_length' bytes of the row.
01273     */
01274     if (blob_count)
01275     {
01276       /*
01277         Special mode for index creation in MyISAM used to support unique
01278         indexes on blobs with arbitrary length. Such indexes cannot be
01279         used for lookups.
01280       */
01281       table->getMutableShare()->uniques= 1;
01282     }
01283     null_pack_length-=hidden_null_pack_length;
01284     keyinfo->key_parts= ((field_count-param->hidden_field_count)+
01285        (table->getMutableShare()->uniques ? test(null_pack_length) : 0));
01286     table->distinct= 1;
01287     table->getMutableShare()->keys= 1;
01288     if (!(key_part_info= (KeyPartInfo*)
01289          table->alloc_root(keyinfo->key_parts * sizeof(KeyPartInfo))))
01290       goto err;
01291     memset(key_part_info, 0, keyinfo->key_parts * sizeof(KeyPartInfo));
01292     table->key_info=keyinfo;
01293     keyinfo->key_part=key_part_info;
01294     keyinfo->flags=HA_NOSAME | HA_NULL_ARE_EQUAL;
01295     keyinfo->key_length=(uint16_t) reclength;
01296     keyinfo->name= (char*) "distinct_key";
01297     keyinfo->algorithm= HA_KEY_ALG_UNDEF;
01298     keyinfo->rec_per_key= 0;
01299 
01300     /*
01301       Create an extra field to hold NULL bits so that unique indexes on
01302       blobs can distinguish NULL from 0. This extra field is not needed
01303       when we do not use UNIQUE indexes for blobs.
01304     */
01305     if (null_pack_length && table->getMutableShare()->uniques)
01306     {
01307       key_part_info->null_bit= 0;
01308       key_part_info->offset=hidden_null_pack_length;
01309       key_part_info->length=null_pack_length;
01310       table->setVariableWidth();
01311       key_part_info->field= new Field_varstring(table->getInsertRecord(),
01312                                                 (uint32_t) key_part_info->length,
01313                                                 0,
01314                                                 (unsigned char*) 0,
01315                                                 (uint32_t) 0,
01316                                                 NULL,
01317                                                 &my_charset_bin);
01318       if (!key_part_info->field)
01319         goto err;
01320       key_part_info->field->init(table);
01321       key_part_info->key_type= 1; /* binary comparison */
01322       key_part_info->type=    HA_KEYTYPE_BINARY;
01323       key_part_info++;
01324     }
01325     /* Create a distinct key over the columns we are going to return */
01326     for (i=param->hidden_field_count, reg_field=table->getFields() + i ;
01327    i < field_count;
01328    i++, reg_field++, key_part_info++)
01329     {
01330       key_part_info->null_bit= 0;
01331       key_part_info->field=    *reg_field;
01332       key_part_info->offset=   (*reg_field)->offset(table->getInsertRecord());
01333       key_part_info->length=   (uint16_t) (*reg_field)->pack_length();
01334       /* @todo The below method of computing the key format length of the
01335         key part is a copy/paste from optimizer/range.cc, and table.cc.
01336         This should be factored out, e.g. as a method of Field.
01337         In addition it is not clear if any of the Field::*_length
01338         methods is supposed to compute the same length. If so, it
01339         might be reused.
01340       */
01341       key_part_info->store_length= key_part_info->length;
01342 
01343       if ((*reg_field)->real_maybe_null())
01344         key_part_info->store_length+= HA_KEY_NULL_LENGTH;
01345       if ((*reg_field)->type() == DRIZZLE_TYPE_BLOB ||
01346           (*reg_field)->real_type() == DRIZZLE_TYPE_VARCHAR)
01347         key_part_info->store_length+= HA_KEY_BLOB_LENGTH;
01348 
01349       key_part_info->type=     (uint8_t) (*reg_field)->key_type();
01350       key_part_info->key_type =
01351   ((ha_base_keytype) key_part_info->type == HA_KEYTYPE_TEXT ||
01352    (ha_base_keytype) key_part_info->type == HA_KEYTYPE_VARTEXT1 ||
01353    (ha_base_keytype) key_part_info->type == HA_KEYTYPE_VARTEXT2) ?
01354   0 : 1;
01355     }
01356   }
01357 
01358   if (session->is_fatal_error)        // If end of memory
01359     goto err;
01360   table->getMutableShare()->db_record_offset= 1;
01361   if (table->getShare()->db_type() == myisam_engine)
01362   {
01363     if (table->create_myisam_tmp_table(param->keyinfo, param->start_recinfo,
01364                &param->recinfo, select_options))
01365       goto err;
01366   }
01367   assert(table->in_use);
01368   if (table->open_tmp_table())
01369     goto err;
01370 
01371   session->mem_root= mem_root_save;
01372 
01373   return(table);
01374 
01375 err:
01376   session->mem_root= mem_root_save;
01377   table= NULL;
01378 
01379   return NULL;
01380 }
01381 
01382 /****************************************************************************/
01383 
01384 void Table::column_bitmaps_set(boost::dynamic_bitset<>& read_set_arg,
01385                                boost::dynamic_bitset<>& write_set_arg)
01386 {
01387   read_set= &read_set_arg;
01388   write_set= &write_set_arg;
01389 }
01390 
01391 
01392 const boost::dynamic_bitset<> Table::use_all_columns(boost::dynamic_bitset<>& in_map)
01393 {
01394   const boost::dynamic_bitset<> old= in_map;
01395   in_map= getShare()->all_set;
01396   return old;
01397 }
01398 
01399 void Table::restore_column_map(const boost::dynamic_bitset<>& old)
01400 {
01401   for (boost::dynamic_bitset<>::size_type i= 0; i < old.size(); i++)
01402   {
01403     if (old.test(i))
01404     {
01405       read_set->set(i);
01406     }
01407     else
01408     {
01409       read_set->reset(i);
01410     }
01411   }
01412 }
01413 
01414 uint32_t Table::find_shortest_key(const key_map *usable_keys)
01415 {
01416   uint32_t min_length= UINT32_MAX;
01417   uint32_t best= MAX_KEY;
01418   if (usable_keys->any())
01419   {
01420     for (uint32_t nr= 0; nr < getShare()->sizeKeys() ; nr++)
01421     {
01422       if (usable_keys->test(nr))
01423       {
01424         if (key_info[nr].key_length < min_length)
01425         {
01426           min_length= key_info[nr].key_length;
01427           best=nr;
01428         }
01429       }
01430     }
01431   }
01432   return best;
01433 }
01434 
01435 /*****************************************************************************
01436   Remove duplicates from tmp table
01437   This should be recoded to add a unique index to the table and remove
01438   duplicates
01439   Table is a locked single thread table
01440   fields is the number of fields to check (from the end)
01441 *****************************************************************************/
01442 
01443 bool Table::compare_record(Field **ptr)
01444 {
01445   for (; *ptr ; ptr++)
01446   {
01447     if ((*ptr)->cmp_offset(getShare()->rec_buff_length))
01448       return true;
01449   }
01450   return false;
01451 }
01452 
01457 bool Table::records_are_comparable()
01458 {
01459   return ((getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) == 0) ||
01460           write_set->is_subset_of(*read_set));
01461 }
01462 
01476 bool Table::compare_records()
01477 {
01478   if (getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) != 0)
01479   {
01480     /*
01481       Storage engine may not have read all columns of the record.  Fields
01482       (including NULL bits) not in the write_set may not have been read and
01483       can therefore not be compared.
01484     */
01485     for (Field **ptr= this->field ; *ptr != NULL; ptr++)
01486     {
01487       Field *f= *ptr;
01488       if (write_set->test(f->position()))
01489       {
01490         if (f->real_maybe_null())
01491         {
01492           unsigned char null_byte_index= f->null_ptr - record[0];
01493 
01494           if (((record[0][null_byte_index]) & f->null_bit) !=
01495               ((record[1][null_byte_index]) & f->null_bit))
01496             return true;
01497         }
01498         if (f->cmp_binary_offset(getShare()->rec_buff_length))
01499           return true;
01500       }
01501     }
01502     return false;
01503   }
01504 
01505   /*
01506     The storage engine has read all columns, so it's safe to compare all bits
01507     including those not in the write_set. This is cheaper than the
01508     field-by-field comparison done above.
01509   */
01510   if (not getShare()->blob_fields + getShare()->hasVariableWidth())
01511     // Fixed-size record: do bitwise comparison of the records
01512     return memcmp(this->getInsertRecord(), this->getUpdateRecord(), (size_t) getShare()->getRecordLength());
01513 
01514   /* Compare null bits */
01515   if (memcmp(null_flags, null_flags + getShare()->rec_buff_length, getShare()->null_bytes))
01516     return true; /* Diff in NULL value */
01517 
01518   /* Compare updated fields */
01519   for (Field **ptr= field ; *ptr ; ptr++)
01520   {
01521     if (isWriteSet((*ptr)->position()) &&
01522   (*ptr)->cmp_binary_offset(getShare()->rec_buff_length))
01523       return true;
01524   }
01525   return false;
01526 }
01527 
01528 /*
01529  * Store a record from previous record into next
01530  *
01531  */
01532 void Table::storeRecord()
01533 {
01534   memcpy(getUpdateRecord(), getInsertRecord(), (size_t) getShare()->getRecordLength());
01535 }
01536 
01537 /*
01538  * Store a record as an insert
01539  *
01540  */
01541 void Table::storeRecordAsInsert()
01542 {
01543   assert(insert_values.size() >= getShare()->getRecordLength());
01544   memcpy(&insert_values[0], getInsertRecord(), (size_t) getShare()->getRecordLength());
01545 }
01546 
01547 /*
01548  * Store a record with default values
01549  *
01550  */
01551 void Table::storeRecordAsDefault()
01552 {
01553   memcpy(getMutableShare()->getDefaultValues(), getInsertRecord(), (size_t) getShare()->getRecordLength());
01554 }
01555 
01556 /*
01557  * Restore a record from previous record into next
01558  *
01559  */
01560 void Table::restoreRecord()
01561 {
01562   memcpy(getInsertRecord(), getUpdateRecord(), (size_t) getShare()->getRecordLength());
01563 }
01564 
01565 /*
01566  * Restore a record with default values
01567  *
01568  */
01569 void Table::restoreRecordAsDefault()
01570 {
01571   memcpy(getInsertRecord(), getMutableShare()->getDefaultValues(), (size_t) getShare()->getRecordLength());
01572 }
01573 
01574 /*
01575  * Empty a record
01576  *
01577  */
01578 void Table::emptyRecord()
01579 {
01580   restoreRecordAsDefault();
01581   memset(null_flags, 255, getShare()->null_bytes);
01582 }
01583 
01584 Table::Table() : 
01585   field(NULL),
01586   cursor(NULL),
01587   next(NULL),
01588   prev(NULL),
01589   read_set(NULL),
01590   write_set(NULL),
01591   tablenr(0),
01592   db_stat(0),
01593   def_read_set(),
01594   def_write_set(),
01595   tmp_set(),
01596   in_use(NULL),
01597   key_info(NULL),
01598   next_number_field(NULL),
01599   found_next_number_field(NULL),
01600   timestamp_field(NULL),
01601   pos_in_table_list(NULL),
01602   group(NULL),
01603   null_flags(NULL),
01604   lock_position(0),
01605   lock_data_start(0),
01606   lock_count(0),
01607   used_fields(0),
01608   status(0),
01609   derived_select_number(0),
01610   current_lock(F_UNLCK),
01611   copy_blobs(false),
01612   maybe_null(false),
01613   null_row(false),
01614   force_index(false),
01615   distinct(false),
01616   const_table(false),
01617   no_rows(false),
01618   key_read(false),
01619   no_keyread(false),
01620   open_placeholder(false),
01621   locked_by_name(false),
01622   no_cache(false),
01623   auto_increment_field_not_null(false),
01624   alias_name_used(false),
01625   query_id(0),
01626   quick_condition_rows(0),
01627   timestamp_field_type(TIMESTAMP_NO_AUTO_SET),
01628   map(0),
01629   quick_rows(),
01630   const_key_parts(),
01631   quick_key_parts(),
01632   quick_n_ranges()
01633 {
01634   record[0]= (unsigned char *) 0;
01635   record[1]= (unsigned char *) 0;
01636 }
01637 
01638 /*****************************************************************************
01639   The different ways to read a record
01640   Returns -1 if row was not found, 0 if row was found and 1 on errors
01641 *****************************************************************************/
01642 
01645 int Table::report_error(int error)
01646 {
01647   if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
01648   {
01649     status= STATUS_GARBAGE;
01650     return -1;          // key not found; ok
01651   }
01652   /*
01653     Locking reads can legally return also these errors, do not
01654     print them to the .err log
01655   */
01656   if (error != HA_ERR_LOCK_DEADLOCK && error != HA_ERR_LOCK_WAIT_TIMEOUT)
01657     errmsg_printf(error::ERROR, _("Got error %d when reading table '%s'"),
01658                   error, getShare()->getPath());
01659   print_error(error, MYF(0));
01660 
01661   return 1;
01662 }
01663 
01664 
01665 void Table::setup_table_map(TableList *table_list, uint32_t table_number)
01666 {
01667   used_fields= 0;
01668   const_table= 0;
01669   null_row= 0;
01670   status= STATUS_NO_RECORD;
01671   maybe_null= table_list->outer_join;
01672   TableList *embedding= table_list->getEmbedding();
01673   while (!maybe_null && embedding)
01674   {
01675     maybe_null= embedding->outer_join;
01676     embedding= embedding->getEmbedding();
01677   }
01678   tablenr= table_number;
01679   map= (table_map) 1 << table_number;
01680   force_index= table_list->force_index;
01681   covering_keys= getShare()->keys_for_keyread;
01682   merge_keys.reset();
01683 }
01684 
01685 
01686 bool Table::fill_item_list(List<Item> *item_list) const
01687 {
01688   /*
01689     All Item_field's created using a direct pointer to a field
01690     are fixed in Item_field constructor.
01691   */
01692   for (Field **ptr= field; *ptr; ptr++)
01693   {
01694     Item_field *item= new Item_field(*ptr);
01695     if (!item || item_list->push_back(item))
01696       return true;
01697   }
01698   return false;
01699 }
01700 
01701 
01702 void Table::filesort_free_buffers(bool full)
01703 {
01704   if (sort.record_pointers)
01705   {
01706     free((unsigned char*) sort.record_pointers);
01707     sort.record_pointers=0;
01708   }
01709   if (full)
01710   {
01711     if (sort.sort_keys )
01712     {
01713       if ((unsigned char*) sort.sort_keys)
01714         free((unsigned char*) sort.sort_keys);
01715       sort.sort_keys= 0;
01716     }
01717     if (sort.buffpek)
01718     {
01719       if ((unsigned char*) sort.buffpek)
01720         free((unsigned char*) sort.buffpek);
01721       sort.buffpek= 0;
01722       sort.buffpek_len= 0;
01723     }
01724   }
01725 
01726   if (sort.addon_buf)
01727   {
01728     free((char *) sort.addon_buf);
01729     free((char *) sort.addon_field);
01730     sort.addon_buf=0;
01731     sort.addon_field=0;
01732   }
01733 }
01734 
01735 /*
01736   Is this instance of the table should be reopen or represents a name-lock?
01737 */
01738 bool Table::needs_reopen_or_name_lock() const
01739 { 
01740   return getShare()->getVersion() != refresh_version;
01741 }
01742 
01743 uint32_t Table::index_flags(uint32_t idx) const
01744 {
01745   return getShare()->getEngine()->index_flags(getShare()->getKeyInfo(idx).algorithm);
01746 }
01747 
01748 void Table::print_error(int error, myf errflag) const
01749 {
01750   getShare()->getEngine()->print_error(error, errflag, *this);
01751 }
01752 
01753 } /* namespace drizzled */