Drizzled Public API Documentation

sql_table.cc
00001 /* Copyright (C) 2000-2004 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 /* drop and alter of tables */
00017 
00018 #include <config.h>
00019 #include <plugin/myisam/myisam.h>
00020 #include <drizzled/show.h>
00021 #include <drizzled/error.h>
00022 #include <drizzled/gettext.h>
00023 #include <drizzled/data_home.h>
00024 #include <drizzled/sql_parse.h>
00025 #include <drizzled/sql_lex.h>
00026 #include <drizzled/session.h>
00027 #include <drizzled/sql_base.h>
00028 #include <drizzled/lock.h>
00029 #include <drizzled/unireg.h>
00030 #include <drizzled/item/int.h>
00031 #include <drizzled/item/empty_string.h>
00032 #include <drizzled/transaction_services.h>
00033 #include <drizzled/transaction_services.h>
00034 #include <drizzled/table_proto.h>
00035 #include <drizzled/plugin/client.h>
00036 #include <drizzled/identifier.h>
00037 #include <drizzled/internal/m_string.h>
00038 #include <drizzled/global_charset_info.h>
00039 #include <drizzled/charset.h>
00040 
00041 #include <drizzled/definition/cache.h>
00042 
00043 #include <drizzled/statement/alter_table.h>
00044 #include <drizzled/sql_table.h>
00045 #include <drizzled/pthread_globals.h>
00046 #include <drizzled/typelib.h>
00047 #include <drizzled/plugin/storage_engine.h>
00048 
00049 #include <algorithm>
00050 #include <sstream>
00051 
00052 #include <boost/unordered_set.hpp>
00053 
00054 using namespace std;
00055 
00056 namespace drizzled
00057 {
00058 
00059 bool is_primary_key(KeyInfo *key_info)
00060 {
00061   static const char * primary_key_name="PRIMARY";
00062   return (strcmp(key_info->name, primary_key_name)==0);
00063 }
00064 
00065 const char* is_primary_key_name(const char* key_name)
00066 {
00067   static const char * primary_key_name="PRIMARY";
00068   if (strcmp(key_name, primary_key_name)==0)
00069     return key_name;
00070   else
00071     return NULL;
00072 }
00073 
00074 static bool check_if_keyname_exists(const char *name,KeyInfo *start, KeyInfo *end);
00075 static char *make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end);
00076 
00077 static bool prepare_blob_field(Session *session, CreateField *sql_field);
00078 
00079 void set_table_default_charset(HA_CREATE_INFO *create_info, const char *db)
00080 {
00081   /*
00082     If the table character set was not given explicitly,
00083     let's fetch the database default character set and
00084     apply it to the table.
00085   */
00086   identifier::Schema identifier(db);
00087   if (create_info->default_table_charset == NULL)
00088     create_info->default_table_charset= plugin::StorageEngine::getSchemaCollation(identifier);
00089 }
00090 
00091 /*
00092   Execute the drop of a normal or temporary table
00093 
00094   SYNOPSIS
00095     rm_table_part2()
00096     session     Thread Cursor
00097     tables    Tables to drop
00098     if_exists   If set, don't give an error if table doesn't exists.
00099       In this case we give an warning of level 'NOTE'
00100     drop_temporary  Only drop temporary tables
00101 
00102   @todo
00103     When logging to the binary log, we should log
00104     tmp_tables and transactional tables as separate statements if we
00105     are in a transaction;  This is needed to get these tables into the
00106     cached binary log that is only written on COMMIT.
00107 
00108    The current code only writes DROP statements that only uses temporary
00109    tables to the cache binary log.  This should be ok on most cases, but
00110    not all.
00111 
00112  RETURN
00113    0  ok
00114    1  Error
00115    -1 Thread was killed
00116 */
00117 
00118 int rm_table_part2(Session *session, TableList *tables, bool if_exists,
00119                    bool drop_temporary)
00120 {
00121   TableList *table;
00122   util::string::vector wrong_tables;
00123   int error= 0;
00124   bool foreign_key_error= false;
00125 
00126   do
00127   {
00128     boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
00129 
00130     if (not drop_temporary && session->lock_table_names_exclusively(tables))
00131     {
00132       return 1;
00133     }
00134 
00135     /* Don't give warnings for not found errors, as we already generate notes */
00136     session->no_warnings_for_error= 1;
00137 
00138     for (table= tables; table; table= table->next_local)
00139     {
00140       identifier::Table tmp_identifier(table->getSchemaName(), table->getTableName());
00141 
00142       error= session->drop_temporary_table(tmp_identifier);
00143 
00144       switch (error) {
00145       case  0:
00146         // removed temporary table
00147         continue;
00148       case -1:
00149         error= 1;
00150         break;
00151       default:
00152         // temporary table not found
00153         error= 0;
00154       }
00155 
00156       if (drop_temporary == false)
00157       {
00158         Table *locked_table;
00159         abort_locked_tables(session, tmp_identifier);
00160         table::Cache::singleton().removeTable(session, tmp_identifier,
00161                                               RTFC_WAIT_OTHER_THREAD_FLAG |
00162                                               RTFC_CHECK_KILLED_FLAG);
00163         /*
00164           If the table was used in lock tables, remember it so that
00165           unlock_table_names can free it
00166         */
00167         if ((locked_table= drop_locked_tables(session, tmp_identifier)))
00168           table->table= locked_table;
00169 
00170         if (session->getKilled())
00171         {
00172           error= -1;
00173           break;
00174         }
00175       }
00176       identifier::Table identifier(table->getSchemaName(), table->getTableName(), table->getInternalTmpTable() ? message::Table::INTERNAL : message::Table::STANDARD);
00177 
00178       message::table::shared_ptr message= plugin::StorageEngine::getTableMessage(*session, identifier, true);
00179 
00180       if (drop_temporary || not plugin::StorageEngine::doesTableExist(*session, identifier))
00181       {
00182         // Table was not found on disk and table can't be created from engine
00183         if (if_exists)
00184         {
00185           push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
00186                               ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
00187                               table->getTableName());
00188         }
00189         else
00190         {
00191           error= 1;
00192         }
00193       }
00194       else
00195       {
00196         drizzled::error_t local_error;
00197 
00198         /* Generate transaction event ONLY when we successfully drop */ 
00199         if (plugin::StorageEngine::dropTable(*session, identifier, local_error))
00200         {
00201           if (message) // If we have no definition, we don't know if the table should have been replicated
00202           {
00203             TransactionServices &transaction_services= TransactionServices::singleton();
00204             transaction_services.dropTable(*session, identifier, *message, if_exists);
00205           }
00206         }
00207         else
00208         {
00209           if (local_error == HA_ERR_NO_SUCH_TABLE and if_exists)
00210           {
00211             error= 0;
00212             session->clear_error();
00213           }
00214 
00215           if (local_error == HA_ERR_ROW_IS_REFERENCED)
00216           {
00217             /* the table is referenced by a foreign key constraint */
00218             foreign_key_error= true;
00219           }
00220           error= local_error;
00221         }
00222       }
00223 
00224       if (error)
00225       {
00226         wrong_tables.push_back(table->getTableName());
00227       }
00228     }
00229 
00230     tables->unlock_table_names();
00231 
00232   } while (0);
00233 
00234   if (wrong_tables.size())
00235   {
00236     if (not foreign_key_error)
00237     {
00238       std::string table_error;
00239 
00240       for (util::string::vector::iterator iter= wrong_tables.begin();
00241            iter != wrong_tables.end();
00242            iter++)
00243       {
00244         table_error+= *iter;
00245         table_error+= ',';
00246       }
00247       table_error.resize(table_error.size() -1);
00248 
00249       my_printf_error(ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR), MYF(0),
00250                       table_error.c_str());
00251     }
00252     else
00253     {
00254       my_message(ER_ROW_IS_REFERENCED, ER(ER_ROW_IS_REFERENCED), MYF(0));
00255     }
00256     error= 1;
00257   }
00258 
00259   session->no_warnings_for_error= 0;
00260 
00261   return error;
00262 }
00263 
00264 /*
00265   Sort keys in the following order:
00266   - PRIMARY KEY
00267   - UNIQUE keys where all column are NOT NULL
00268   - UNIQUE keys that don't contain partial segments
00269   - Other UNIQUE keys
00270   - Normal keys
00271   - Fulltext keys
00272 
00273   This will make checking for duplicated keys faster and ensure that
00274   PRIMARY keys are prioritized.
00275 */
00276 
00277 static int sort_keys(KeyInfo *a, KeyInfo *b)
00278 {
00279   ulong a_flags= a->flags, b_flags= b->flags;
00280 
00281   if (a_flags & HA_NOSAME)
00282   {
00283     if (!(b_flags & HA_NOSAME))
00284       return -1;
00285     if ((a_flags ^ b_flags) & (HA_NULL_PART_KEY))
00286     {
00287       /* Sort NOT NULL keys before other keys */
00288       return (a_flags & (HA_NULL_PART_KEY)) ? 1 : -1;
00289     }
00290     if (is_primary_key(a))
00291       return -1;
00292     if (is_primary_key(b))
00293       return 1;
00294     /* Sort keys don't containing partial segments before others */
00295     if ((a_flags ^ b_flags) & HA_KEY_HAS_PART_KEY_SEG)
00296       return (a_flags & HA_KEY_HAS_PART_KEY_SEG) ? 1 : -1;
00297   }
00298   else if (b_flags & HA_NOSAME)
00299     return 1;         // Prefer b
00300 
00301   /*
00302     Prefer original key order.  usable_key_parts contains here
00303     the original key position.
00304   */
00305   return ((a->usable_key_parts < b->usable_key_parts) ? -1 :
00306           (a->usable_key_parts > b->usable_key_parts) ? 1 :
00307           0);
00308 }
00309 
00310 /*
00311   Check TYPELIB (set or enum) for duplicates
00312 
00313   SYNOPSIS
00314     check_duplicates_in_interval()
00315     set_or_name   "SET" or "ENUM" string for warning message
00316     name    name of the checked column
00317     typelib   list of values for the column
00318     dup_val_count  returns count of duplicate elements
00319 
00320   DESCRIPTION
00321     This function prints an warning for each value in list
00322     which has some duplicates on its right
00323 
00324   RETURN VALUES
00325     0             ok
00326     1             Error
00327 */
00328 
00329 class typelib_set_member
00330 {
00331 public:
00332   string s;
00333   const CHARSET_INFO * const cs;
00334 
00335   typelib_set_member(const char* value, unsigned int length,
00336                      const CHARSET_INFO * const charset)
00337     : s(value, length),
00338       cs(charset)
00339   {}
00340 };
00341 
00342 static bool operator==(typelib_set_member const& a, typelib_set_member const& b)
00343 {
00344   return (my_strnncoll(a.cs,
00345                        (const unsigned char*)a.s.c_str(), a.s.length(),
00346                        (const unsigned char*)b.s.c_str(), b.s.length())==0);
00347 }
00348 
00349 
00350 namespace
00351 {
00352 class typelib_set_member_hasher
00353 {
00354   boost::hash<string> hasher;
00355 public:
00356   std::size_t operator()(const typelib_set_member& t) const
00357   {
00358     return hasher(t.s);
00359   }
00360 };
00361 }
00362 
00363 static bool check_duplicates_in_interval(const char *set_or_name,
00364                                          const char *name, TYPELIB *typelib,
00365                                          const CHARSET_INFO * const cs,
00366                                          unsigned int *dup_val_count)
00367 {
00368   TYPELIB tmp= *typelib;
00369   const char **cur_value= typelib->type_names;
00370   unsigned int *cur_length= typelib->type_lengths;
00371   *dup_val_count= 0;
00372 
00373   boost::unordered_set<typelib_set_member, typelib_set_member_hasher> interval_set;
00374 
00375   for ( ; tmp.count > 0; cur_value++, cur_length++)
00376   {
00377     tmp.type_names++;
00378     tmp.type_lengths++;
00379     tmp.count--;
00380     if (interval_set.count(typelib_set_member(*cur_value, *cur_length, cs)))
00381     {
00382       my_error(ER_DUPLICATED_VALUE_IN_TYPE, MYF(0),
00383                name,*cur_value,set_or_name);
00384       return 1;
00385     }
00386     else
00387       interval_set.insert(typelib_set_member(*cur_value, *cur_length, cs));
00388   }
00389   return 0;
00390 }
00391 
00392 
00393 /*
00394   Check TYPELIB (set or enum) max and total lengths
00395 
00396   SYNOPSIS
00397     calculate_interval_lengths()
00398     cs            charset+collation pair of the interval
00399     typelib       list of values for the column
00400     max_length    length of the longest item
00401     tot_length    sum of the item lengths
00402 
00403   DESCRIPTION
00404     After this function call:
00405     - ENUM uses max_length
00406     - SET uses tot_length.
00407 
00408   RETURN VALUES
00409     void
00410 */
00411 static void calculate_interval_lengths(const CHARSET_INFO * const cs,
00412                                        TYPELIB *interval,
00413                                        uint32_t *max_length,
00414                                        uint32_t *tot_length)
00415 {
00416   const char **pos;
00417   uint32_t *len;
00418   *max_length= *tot_length= 0;
00419   for (pos= interval->type_names, len= interval->type_lengths;
00420        *pos ; pos++, len++)
00421   {
00422     uint32_t length= cs->cset->numchars(cs, *pos, *pos + *len);
00423     *tot_length+= length;
00424     set_if_bigger(*max_length, (uint32_t)length);
00425   }
00426 }
00427 
00428 /*
00429   Prepare a create_table instance for packing
00430 
00431   SYNOPSIS
00432     prepare_create_field()
00433     sql_field     field to prepare for packing
00434     blob_columns  count for BLOBs
00435     timestamps    count for timestamps
00436 
00437   DESCRIPTION
00438     This function prepares a CreateField instance.
00439     Fields such as pack_flag are valid after this call.
00440 
00441   RETURN VALUES
00442    0  ok
00443    1  Error
00444 */
00445 int prepare_create_field(CreateField *sql_field,
00446                          uint32_t *blob_columns,
00447                          int *timestamps,
00448                          int *timestamps_with_niladic)
00449 {
00450   unsigned int dup_val_count;
00451 
00452   /*
00453     This code came from mysql_prepare_create_table.
00454     Indent preserved to make patching easier
00455   */
00456   assert(sql_field->charset);
00457 
00458   switch (sql_field->sql_type) {
00459   case DRIZZLE_TYPE_BLOB:
00460     sql_field->length= 8; // Unireg field length
00461     (*blob_columns)++;
00462     break;
00463 
00464   case DRIZZLE_TYPE_ENUM:
00465     {
00466       if (check_duplicates_in_interval("ENUM",
00467                sql_field->field_name,
00468                sql_field->interval,
00469                sql_field->charset,
00470                &dup_val_count))
00471       {
00472   return 1;
00473       }
00474     }
00475     break;
00476 
00477   case DRIZZLE_TYPE_MICROTIME:
00478   case DRIZZLE_TYPE_TIMESTAMP:
00479     /* We should replace old TIMESTAMP fields with their newer analogs */
00480     if (sql_field->unireg_check == Field::TIMESTAMP_OLD_FIELD)
00481     {
00482       if (!*timestamps)
00483       {
00484         sql_field->unireg_check= Field::TIMESTAMP_DNUN_FIELD;
00485         (*timestamps_with_niladic)++;
00486       }
00487       else
00488       {
00489         sql_field->unireg_check= Field::NONE;
00490       }
00491     }
00492     else if (sql_field->unireg_check != Field::NONE)
00493     {
00494       (*timestamps_with_niladic)++;
00495     }
00496 
00497     (*timestamps)++;
00498 
00499     break;
00500 
00501   case DRIZZLE_TYPE_BOOLEAN:
00502   case DRIZZLE_TYPE_DATE:  // Rest of string types
00503   case DRIZZLE_TYPE_DATETIME:
00504   case DRIZZLE_TYPE_DECIMAL:
00505   case DRIZZLE_TYPE_DOUBLE:
00506   case DRIZZLE_TYPE_LONG:
00507   case DRIZZLE_TYPE_LONGLONG:
00508   case DRIZZLE_TYPE_NULL:
00509   case DRIZZLE_TYPE_TIME:
00510   case DRIZZLE_TYPE_UUID:
00511   case DRIZZLE_TYPE_VARCHAR:
00512     break;
00513   }
00514 
00515   return 0;
00516 }
00517 
00518 static int prepare_create_table(Session *session,
00519                                 HA_CREATE_INFO *create_info,
00520                                 message::Table &create_proto,
00521                                 AlterInfo *alter_info,
00522                                 bool tmp_table,
00523                                 uint32_t *db_options,
00524                                 KeyInfo **key_info_buffer,
00525                                 uint32_t *key_count,
00526                                 int select_field_count)
00527 {
00528   const char  *key_name;
00529   CreateField *sql_field,*dup_field;
00530   uint    field,null_fields,blob_columns,max_key_length;
00531   ulong   record_offset= 0;
00532   KeyInfo   *key_info;
00533   KeyPartInfo *key_part_info;
00534   int   timestamps= 0, timestamps_with_niladic= 0;
00535   int   dup_no;
00536   int   select_field_pos,auto_increment=0;
00537   List<CreateField>::iterator it(alter_info->create_list.begin());
00538   List<CreateField>::iterator it2(alter_info->create_list.begin());
00539   uint32_t total_uneven_bit_length= 0;
00540 
00541   plugin::StorageEngine *engine= plugin::StorageEngine::findByName(create_proto.engine().name());
00542 
00543   select_field_pos= alter_info->create_list.size() - select_field_count;
00544   null_fields=blob_columns=0;
00545   max_key_length= engine->max_key_length();
00546 
00547   for (int32_t field_no=0; (sql_field=it++) ; field_no++)
00548   {
00549     const CHARSET_INFO *save_cs;
00550 
00551     /*
00552       Initialize length from its original value (number of characters),
00553       which was set in the parser. This is necessary if we're
00554       executing a prepared statement for the second time.
00555     */
00556     sql_field->length= sql_field->char_length;
00557 
00558     if (!sql_field->charset)
00559       sql_field->charset= create_info->default_table_charset;
00560 
00561     /*
00562       table_charset is set in ALTER Table if we want change character set
00563       for all varchar/char columns.
00564       But the table charset must not affect the BLOB fields, so don't
00565       allow to change my_charset_bin to somethig else.
00566     */
00567     if (create_info->table_charset && sql_field->charset != &my_charset_bin)
00568       sql_field->charset= create_info->table_charset;
00569 
00570     save_cs= sql_field->charset;
00571     if ((sql_field->flags & BINCMP_FLAG) &&
00572         !(sql_field->charset= get_charset_by_csname(sql_field->charset->csname, MY_CS_BINSORT)))
00573     {
00574       char tmp[64];
00575       char *tmp_pos= tmp;
00576       strncpy(tmp_pos, save_cs->csname, sizeof(tmp)-4);
00577       tmp_pos+= strlen(tmp);
00578       strncpy(tmp_pos, STRING_WITH_LEN("_bin"));
00579       my_error(ER_UNKNOWN_COLLATION, MYF(0), tmp);
00580       return(true);
00581     }
00582 
00583     /*
00584       Convert the default value from client character
00585       set into the column character set if necessary.
00586     */
00587     if (sql_field->def &&
00588         save_cs != sql_field->def->collation.collation &&
00589         (sql_field->sql_type == DRIZZLE_TYPE_ENUM))
00590     {
00591       /*
00592         Starting from 5.1 we work here with a copy of CreateField
00593         created by the caller, not with the instance that was
00594         originally created during parsing. It's OK to create
00595         a temporary item and initialize with it a member of the
00596         copy -- this item will be thrown away along with the copy
00597         at the end of execution, and thus not introduce a dangling
00598         pointer in the parsed tree of a prepared statement or a
00599         stored procedure statement.
00600       */
00601       sql_field->def= sql_field->def->safe_charset_converter(save_cs);
00602 
00603       if (sql_field->def == NULL)
00604       {
00605         /* Could not convert */
00606         my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00607         return(true);
00608       }
00609     }
00610 
00611     if (sql_field->sql_type == DRIZZLE_TYPE_ENUM)
00612     {
00613       size_t dummy;
00614       const CHARSET_INFO * const cs= sql_field->charset;
00615       TYPELIB *interval= sql_field->interval;
00616 
00617       /*
00618         Create typelib from interval_list, and if necessary
00619         convert strings from client character set to the
00620         column character set.
00621       */
00622       if (!interval)
00623       {
00624         /*
00625           Create the typelib in runtime memory - we will free the
00626           occupied memory at the same time when we free this
00627           sql_field -- at the end of execution.
00628         */
00629         interval= sql_field->interval= typelib(session->mem_root,
00630                                                sql_field->interval_list);
00631 
00632         List<String>::iterator int_it(sql_field->interval_list.begin());
00633         String conv, *tmp;
00634         char comma_buf[4];
00635         int comma_length= cs->cset->wc_mb(cs, ',', (unsigned char*) comma_buf,
00636                                           (unsigned char*) comma_buf +
00637                                           sizeof(comma_buf));
00638         assert(comma_length > 0);
00639 
00640         for (uint32_t i= 0; (tmp= int_it++); i++)
00641         {
00642           uint32_t lengthsp;
00643           if (String::needs_conversion(tmp->length(), tmp->charset(),
00644                                        cs, &dummy))
00645           {
00646             size_t cnv_errs;
00647             conv.copy(tmp->ptr(), tmp->length(), tmp->charset(), cs, &cnv_errs);
00648             interval->type_names[i]= session->mem_root->strmake_root(conv.ptr(), conv.length());
00649             interval->type_lengths[i]= conv.length();
00650           }
00651 
00652           // Strip trailing spaces.
00653           lengthsp= cs->cset->lengthsp(cs, interval->type_names[i],
00654                                        interval->type_lengths[i]);
00655           interval->type_lengths[i]= lengthsp;
00656           ((unsigned char *)interval->type_names[i])[lengthsp]= '\0';
00657         }
00658         sql_field->interval_list.clear(); // Don't need interval_list anymore
00659       }
00660 
00661       /* DRIZZLE_TYPE_ENUM */
00662       {
00663         uint32_t field_length;
00664         assert(sql_field->sql_type == DRIZZLE_TYPE_ENUM);
00665         if (sql_field->def != NULL)
00666         {
00667           String str, *def= sql_field->def->val_str(&str);
00668           if (def == NULL) /* SQL "NULL" maps to NULL */
00669           {
00670             if ((sql_field->flags & NOT_NULL_FLAG) != 0)
00671             {
00672               my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00673               return(true);
00674             }
00675 
00676             /* else, the defaults yield the correct length for NULLs. */
00677           }
00678           else /* not NULL */
00679           {
00680             def->length(cs->cset->lengthsp(cs, def->ptr(), def->length()));
00681             if (interval->find_type2(def->ptr(), def->length(), cs) == 0) /* not found */
00682             {
00683               my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
00684               return(true);
00685             }
00686           }
00687         }
00688         uint32_t new_dummy;
00689         calculate_interval_lengths(cs, interval, &field_length, &new_dummy);
00690         sql_field->length= field_length;
00691       }
00692       set_if_smaller(sql_field->length, (uint32_t)MAX_FIELD_WIDTH-1);
00693     }
00694 
00695     sql_field->create_length_to_internal_length();
00696     if (prepare_blob_field(session, sql_field))
00697       return(true);
00698 
00699     if (!(sql_field->flags & NOT_NULL_FLAG))
00700       null_fields++;
00701 
00702     if (check_column_name(sql_field->field_name))
00703     {
00704       my_error(ER_WRONG_COLUMN_NAME, MYF(0), sql_field->field_name);
00705       return(true);
00706     }
00707 
00708     /* Check if we have used the same field name before */
00709     for (dup_no=0; (dup_field=it2++) != sql_field; dup_no++)
00710     {
00711       if (my_strcasecmp(system_charset_info,
00712                         sql_field->field_name,
00713                         dup_field->field_name) == 0)
00714       {
00715   /*
00716     If this was a CREATE ... SELECT statement, accept a field
00717     redefinition if we are changing a field in the SELECT part
00718   */
00719   if (field_no < select_field_pos || dup_no >= select_field_pos)
00720   {
00721     my_error(ER_DUP_FIELDNAME, MYF(0), sql_field->field_name);
00722     return(true);
00723   }
00724   else
00725   {
00726     /* Field redefined */
00727     sql_field->def=   dup_field->def;
00728     sql_field->sql_type=    dup_field->sql_type;
00729     sql_field->charset=   (dup_field->charset ?
00730            dup_field->charset :
00731            create_info->default_table_charset);
00732     sql_field->length=    dup_field->char_length;
00733           sql_field->pack_length= dup_field->pack_length;
00734           sql_field->key_length=  dup_field->key_length;
00735     sql_field->decimals=    dup_field->decimals;
00736     sql_field->create_length_to_internal_length();
00737     sql_field->unireg_check=  dup_field->unireg_check;
00738           /*
00739             We're making one field from two, the result field will have
00740             dup_field->flags as flags. If we've incremented null_fields
00741             because of sql_field->flags, decrement it back.
00742           */
00743           if (!(sql_field->flags & NOT_NULL_FLAG))
00744             null_fields--;
00745     sql_field->flags=   dup_field->flags;
00746           sql_field->interval=          dup_field->interval;
00747     it2.remove();     // Remove first (create) definition
00748     select_field_pos--;
00749     break;
00750   }
00751       }
00752     }
00753 
00755     if (not create_proto.engine().name().compare("MyISAM") &&
00756         ((sql_field->flags & BLOB_FLAG) ||
00757          (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)))
00758     {
00759       (*db_options)|= HA_OPTION_PACK_RECORD;
00760     }
00761 
00762     it2= alter_info->create_list.begin();
00763   }
00764 
00765   /* record_offset will be increased with 'length-of-null-bits' later */
00766   record_offset= 0;
00767   null_fields+= total_uneven_bit_length;
00768 
00769   it= alter_info->create_list.begin();
00770   while ((sql_field=it++))
00771   {
00772     assert(sql_field->charset != 0);
00773 
00774     if (prepare_create_field(sql_field, &blob_columns,
00775            &timestamps, &timestamps_with_niladic))
00776       return(true);
00777     sql_field->offset= record_offset;
00778     if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
00779       auto_increment++;
00780   }
00781   if (timestamps_with_niladic > 1)
00782   {
00783     my_message(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS,
00784                ER(ER_TOO_MUCH_AUTO_TIMESTAMP_COLS), MYF(0));
00785     return(true);
00786   }
00787   if (auto_increment > 1)
00788   {
00789     my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
00790     return(true);
00791   }
00792   if (auto_increment &&
00793       (engine->check_flag(HTON_BIT_NO_AUTO_INCREMENT)))
00794   {
00795     my_message(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT,
00796                ER(ER_TABLE_CANT_HANDLE_AUTO_INCREMENT), MYF(0));
00797     return(true);
00798   }
00799 
00800   if (blob_columns && (engine->check_flag(HTON_BIT_NO_BLOBS)))
00801   {
00802     my_message(ER_TABLE_CANT_HANDLE_BLOB, ER(ER_TABLE_CANT_HANDLE_BLOB),
00803                MYF(0));
00804     return(true);
00805   }
00806 
00807   /* Create keys */
00808 
00809   List<Key>::iterator key_iterator(alter_info->key_list.begin());
00810   List<Key>::iterator key_iterator2(alter_info->key_list.begin());
00811   uint32_t key_parts=0, fk_key_count=0;
00812   bool primary_key=0,unique_key=0;
00813   Key *key, *key2;
00814   uint32_t tmp, key_number;
00815   /* special marker for keys to be ignored */
00816   static char ignore_key[1];
00817 
00818   /* Calculate number of key segements */
00819   *key_count= 0;
00820 
00821   while ((key=key_iterator++))
00822   {
00823     if (key->type == Key::FOREIGN_KEY)
00824     {
00825       fk_key_count++;
00826       if (((Foreign_key *)key)->validate(alter_info->create_list))
00827         return true;
00828 
00829       Foreign_key *fk_key= (Foreign_key*) key;
00830 
00831       add_foreign_key_to_table_message(&create_proto,
00832                                        fk_key->name.str,
00833                                        fk_key->columns,
00834                                        fk_key->ref_table,
00835                                        fk_key->ref_columns,
00836                                        fk_key->delete_opt,
00837                                        fk_key->update_opt,
00838                                        fk_key->match_opt);
00839 
00840       if (fk_key->ref_columns.size() &&
00841     fk_key->ref_columns.size() != fk_key->columns.size())
00842       {
00843         my_error(ER_WRONG_FK_DEF, MYF(0),
00844                  (fk_key->name.str ? fk_key->name.str :
00845                                      "foreign key without name"),
00846                  ER(ER_KEY_REF_DO_NOT_MATCH_TABLE_REF));
00847   return(true);
00848       }
00849       continue;
00850     }
00851     (*key_count)++;
00852     tmp= engine->max_key_parts();
00853     if (key->columns.size() > tmp)
00854     {
00855       my_error(ER_TOO_MANY_KEY_PARTS,MYF(0),tmp);
00856       return(true);
00857     }
00858     if (check_identifier_name(&key->name, ER_TOO_LONG_IDENT))
00859       return(true);
00860     key_iterator2= alter_info->key_list.begin();
00861     if (key->type != Key::FOREIGN_KEY)
00862     {
00863       while ((key2 = key_iterator2++) != key)
00864       {
00865   /*
00866           foreign_key_prefix(key, key2) returns 0 if key or key2, or both, is
00867           'generated', and a generated key is a prefix of the other key.
00868           Then we do not need the generated shorter key.
00869         */
00870         if ((key2->type != Key::FOREIGN_KEY &&
00871              key2->name.str != ignore_key &&
00872              !foreign_key_prefix(key, key2)))
00873         {
00874           /* @todo issue warning message */
00875           /* mark that the generated key should be ignored */
00876           if (!key2->generated ||
00877               (key->generated && key->columns.size() <
00878                key2->columns.size()))
00879             key->name.str= ignore_key;
00880           else
00881           {
00882             key2->name.str= ignore_key;
00883             key_parts-= key2->columns.size();
00884             (*key_count)--;
00885           }
00886           break;
00887         }
00888       }
00889     }
00890     if (key->name.str != ignore_key)
00891       key_parts+=key->columns.size();
00892     else
00893       (*key_count)--;
00894     if (key->name.str && !tmp_table && (key->type != Key::PRIMARY) &&
00895         is_primary_key_name(key->name.str))
00896     {
00897       my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key->name.str);
00898       return(true);
00899     }
00900   }
00901   tmp= engine->max_keys();
00902   if (*key_count > tmp)
00903   {
00904     my_error(ER_TOO_MANY_KEYS,MYF(0),tmp);
00905     return(true);
00906   }
00907 
00908   (*key_info_buffer)= key_info= (KeyInfo*) memory::sql_calloc(sizeof(KeyInfo) * (*key_count));
00909   key_part_info=(KeyPartInfo*) memory::sql_calloc(sizeof(KeyPartInfo)*key_parts);
00910   if (!*key_info_buffer || ! key_part_info)
00911     return(true);       // Out of memory
00912 
00913   key_iterator= alter_info->key_list.begin();
00914   key_number=0;
00915   for (; (key=key_iterator++) ; key_number++)
00916   {
00917     uint32_t key_length=0;
00918     Key_part_spec *column;
00919 
00920     if (key->name.str == ignore_key)
00921     {
00922       /* ignore redundant keys */
00923       do
00924   key=key_iterator++;
00925       while (key && key->name.str == ignore_key);
00926       if (!key)
00927   break;
00928     }
00929 
00930     switch (key->type) {
00931     case Key::MULTIPLE:
00932   key_info->flags= 0;
00933   break;
00934     case Key::FOREIGN_KEY:
00935       key_number--;       // Skip this key
00936       continue;
00937     default:
00938       key_info->flags = HA_NOSAME;
00939       break;
00940     }
00941     if (key->generated)
00942       key_info->flags|= HA_GENERATED_KEY;
00943 
00944     key_info->key_parts=(uint8_t) key->columns.size();
00945     key_info->key_part=key_part_info;
00946     key_info->usable_key_parts= key_number;
00947     key_info->algorithm= key->key_create_info.algorithm;
00948 
00949     uint32_t tmp_len= system_charset_info->cset->charpos(system_charset_info,
00950                                            key->key_create_info.comment.str,
00951                                            key->key_create_info.comment.str +
00952                                            key->key_create_info.comment.length,
00953                                            INDEX_COMMENT_MAXLEN);
00954 
00955     if (tmp_len < key->key_create_info.comment.length)
00956     {
00957       my_error(ER_WRONG_STRING_LENGTH, MYF(0),
00958                key->key_create_info.comment.str,"INDEX COMMENT",
00959                (uint32_t) INDEX_COMMENT_MAXLEN);
00960       return -1;
00961     }
00962 
00963     key_info->comment.length= key->key_create_info.comment.length;
00964     if (key_info->comment.length > 0)
00965     {
00966       key_info->flags|= HA_USES_COMMENT;
00967       key_info->comment.str= key->key_create_info.comment.str;
00968     }
00969 
00970     message::Table::Field *protofield= NULL;
00971 
00972     List<Key_part_spec>::iterator cols(key->columns.begin());
00973     List<Key_part_spec>::iterator cols2(key->columns.begin());
00974     for (uint32_t column_nr=0 ; (column=cols++) ; column_nr++)
00975     {
00976       uint32_t length;
00977       Key_part_spec *dup_column;
00978       int proto_field_nr= 0;
00979 
00980       it= alter_info->create_list.begin();
00981       field=0;
00982       while ((sql_field=it++) && ++proto_field_nr &&
00983        my_strcasecmp(system_charset_info,
00984          column->field_name.str,
00985          sql_field->field_name))
00986       {
00987   field++;
00988       }
00989 
00990       if (!sql_field)
00991       {
00992   my_error(ER_KEY_COLUMN_DOES_NOT_EXITS, MYF(0), column->field_name.str);
00993   return(true);
00994       }
00995 
00996       while ((dup_column= cols2++) != column)
00997       {
00998         if (!my_strcasecmp(system_charset_info,
00999                            column->field_name.str, dup_column->field_name.str))
01000   {
01001     my_printf_error(ER_DUP_FIELDNAME,
01002         ER(ER_DUP_FIELDNAME),MYF(0),
01003         column->field_name.str);
01004     return(true);
01005   }
01006       }
01007       cols2= key->columns.begin();
01008 
01009       if (create_proto.field_size() > 0)
01010         protofield= create_proto.mutable_field(proto_field_nr - 1);
01011 
01012       {
01013         column->length*= sql_field->charset->mbmaxlen;
01014 
01015         if (sql_field->sql_type == DRIZZLE_TYPE_BLOB)
01016         {
01017           if (! (engine->check_flag(HTON_BIT_CAN_INDEX_BLOBS)))
01018           {
01019             my_error(ER_BLOB_USED_AS_KEY, MYF(0), column->field_name.str);
01020             return true;
01021           }
01022           if (! column->length)
01023           {
01024             my_error(ER_BLOB_KEY_WITHOUT_LENGTH, MYF(0), column->field_name.str);
01025             return true;
01026           }
01027         }
01028 
01029         if (! (sql_field->flags & NOT_NULL_FLAG))
01030         {
01031           if (key->type == Key::PRIMARY)
01032           {
01033             /* Implicitly set primary key fields to NOT NULL for ISO conf. */
01034             sql_field->flags|= NOT_NULL_FLAG;
01035             null_fields--;
01036 
01037             if (protofield)
01038             {
01039               message::Table::Field::FieldConstraints *constraints;
01040               constraints= protofield->mutable_constraints();
01041               constraints->set_is_notnull(true);
01042             }
01043 
01044           }
01045           else
01046           {
01047             key_info->flags|= HA_NULL_PART_KEY;
01048             if (! (engine->check_flag(HTON_BIT_NULL_IN_KEY)))
01049             {
01050               my_error(ER_NULL_COLUMN_IN_INDEX, MYF(0), column->field_name.str);
01051               return true;
01052             }
01053           }
01054         }
01055 
01056         if (MTYP_TYPENR(sql_field->unireg_check) == Field::NEXT_NUMBER)
01057         {
01058           if (column_nr == 0 || (engine->check_flag(HTON_BIT_AUTO_PART_KEY)))
01059             auto_increment--;     // Field is used
01060         }
01061       }
01062 
01063       key_part_info->fieldnr= field;
01064       key_part_info->offset=  (uint16_t) sql_field->offset;
01065       key_part_info->key_type= 0;
01066       length= sql_field->key_length;
01067 
01068       if (column->length)
01069       {
01070   if (sql_field->sql_type == DRIZZLE_TYPE_BLOB)
01071   {
01072     if ((length=column->length) > max_key_length ||
01073         length > engine->max_key_part_length())
01074     {
01075       length= min(max_key_length, engine->max_key_part_length());
01076       if (key->type == Key::MULTIPLE)
01077       {
01078         /* not a critical problem */
01079         char warn_buff[DRIZZLE_ERRMSG_SIZE];
01080         snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
01081                        length);
01082         push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
01083          ER_TOO_LONG_KEY, warn_buff);
01084               /* Align key length to multibyte char boundary */
01085               length-= length % sql_field->charset->mbmaxlen;
01086       }
01087       else
01088       {
01089         my_error(ER_TOO_LONG_KEY,MYF(0),length);
01090         return(true);
01091       }
01092     }
01093   }
01094   else if ((column->length > length ||
01095             ! Field::type_can_have_key_part(sql_field->sql_type)))
01096   {
01097     my_message(ER_WRONG_SUB_KEY, ER(ER_WRONG_SUB_KEY), MYF(0));
01098     return(true);
01099   }
01100   else if (! (engine->check_flag(HTON_BIT_NO_PREFIX_CHAR_KEYS)))
01101         {
01102     length=column->length;
01103         }
01104       }
01105       else if (length == 0)
01106       {
01107   my_error(ER_WRONG_KEY_COLUMN, MYF(0), column->field_name.str);
01108     return(true);
01109       }
01110       if (length > engine->max_key_part_length())
01111       {
01112         length= engine->max_key_part_length();
01113   if (key->type == Key::MULTIPLE)
01114   {
01115     /* not a critical problem */
01116     char warn_buff[DRIZZLE_ERRMSG_SIZE];
01117     snprintf(warn_buff, sizeof(warn_buff), ER(ER_TOO_LONG_KEY),
01118                    length);
01119     push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
01120            ER_TOO_LONG_KEY, warn_buff);
01121           /* Align key length to multibyte char boundary */
01122           length-= length % sql_field->charset->mbmaxlen;
01123   }
01124   else
01125   {
01126     my_error(ER_TOO_LONG_KEY,MYF(0),length);
01127     return(true);
01128   }
01129       }
01130       key_part_info->length=(uint16_t) length;
01131       /* Use packed keys for long strings on the first column */
01132       if (!((*db_options) & HA_OPTION_NO_PACK_KEYS) &&
01133           (length >= KEY_DEFAULT_PACK_LENGTH &&
01134            (sql_field->sql_type == DRIZZLE_TYPE_VARCHAR ||
01135             sql_field->sql_type == DRIZZLE_TYPE_BLOB)))
01136       {
01137         if ((column_nr == 0 && sql_field->sql_type == DRIZZLE_TYPE_BLOB) ||
01138             sql_field->sql_type == DRIZZLE_TYPE_VARCHAR)
01139         {
01140           key_info->flags|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
01141         }
01142         else
01143         {
01144           key_info->flags|= HA_PACK_KEY;
01145         }
01146       }
01147       /* Check if the key segment is partial, set the key flag accordingly */
01148       if (length != sql_field->key_length)
01149         key_info->flags|= HA_KEY_HAS_PART_KEY_SEG;
01150 
01151       key_length+=length;
01152       key_part_info++;
01153 
01154       /* Create the key name based on the first column (if not given) */
01155       if (column_nr == 0)
01156       {
01157   if (key->type == Key::PRIMARY)
01158   {
01159     if (primary_key)
01160     {
01161       my_message(ER_MULTIPLE_PRI_KEY, ER(ER_MULTIPLE_PRI_KEY),
01162                        MYF(0));
01163       return(true);
01164     }
01165           static const char pkey_name[]= "PRIMARY";
01166     key_name=pkey_name;
01167     primary_key=1;
01168   }
01169   else if (!(key_name= key->name.str))
01170     key_name=make_unique_key_name(sql_field->field_name,
01171           *key_info_buffer, key_info);
01172   if (check_if_keyname_exists(key_name, *key_info_buffer, key_info))
01173   {
01174     my_error(ER_DUP_KEYNAME, MYF(0), key_name);
01175     return(true);
01176   }
01177   key_info->name=(char*) key_name;
01178       }
01179     }
01180 
01181     if (!key_info->name || check_column_name(key_info->name))
01182     {
01183       my_error(ER_WRONG_NAME_FOR_INDEX, MYF(0), key_info->name);
01184       return(true);
01185     }
01186 
01187     if (!(key_info->flags & HA_NULL_PART_KEY))
01188     {
01189       unique_key=1;
01190     }
01191 
01192     key_info->key_length=(uint16_t) key_length;
01193 
01194     if (key_length > max_key_length)
01195     {
01196       my_error(ER_TOO_LONG_KEY,MYF(0),max_key_length);
01197       return(true);
01198     }
01199 
01200     key_info++;
01201   }
01202 
01203   if (!unique_key && !primary_key &&
01204       (engine->check_flag(HTON_BIT_REQUIRE_PRIMARY_KEY)))
01205   {
01206     my_message(ER_REQUIRES_PRIMARY_KEY, ER(ER_REQUIRES_PRIMARY_KEY), MYF(0));
01207     return(true);
01208   }
01209 
01210   if (auto_increment > 0)
01211   {
01212     my_message(ER_WRONG_AUTO_KEY, ER(ER_WRONG_AUTO_KEY), MYF(0));
01213     return(true);
01214   }
01215   /* Sort keys in optimized order */
01216   internal::my_qsort((unsigned char*) *key_info_buffer, *key_count, sizeof(KeyInfo),
01217                (qsort_cmp) sort_keys);
01218 
01219   /* Check fields. */
01220   it= alter_info->create_list.begin();
01221   while ((sql_field=it++))
01222   {
01223     Field::utype type= (Field::utype) MTYP_TYPENR(sql_field->unireg_check);
01224 
01225     if (session->variables.sql_mode & MODE_NO_ZERO_DATE &&
01226         !sql_field->def &&
01227         (sql_field->sql_type == DRIZZLE_TYPE_TIMESTAMP  or sql_field->sql_type == DRIZZLE_TYPE_MICROTIME) &&
01228         (sql_field->flags & NOT_NULL_FLAG) &&
01229         (type == Field::NONE || type == Field::TIMESTAMP_UN_FIELD))
01230     {
01231       /*
01232         An error should be reported if:
01233           - NO_ZERO_DATE SQL mode is active;
01234           - there is no explicit DEFAULT clause (default column value);
01235           - this is a TIMESTAMP column;
01236           - the column is not NULL;
01237           - this is not the DEFAULT CURRENT_TIMESTAMP column.
01238 
01239         In other words, an error should be reported if
01240           - NO_ZERO_DATE SQL mode is active;
01241           - the column definition is equivalent to
01242             'column_name TIMESTAMP DEFAULT 0'.
01243       */
01244 
01245       my_error(ER_INVALID_DEFAULT, MYF(0), sql_field->field_name);
01246       return(true);
01247     }
01248   }
01249 
01250   return(false);
01251 }
01252 
01253 /*
01254   Extend long VARCHAR fields to blob & prepare field if it's a blob
01255 
01256   SYNOPSIS
01257     prepare_blob_field()
01258     sql_field   Field to check
01259 
01260   RETURN
01261     0 ok
01262     1 Error (sql_field can't be converted to blob)
01263         In this case the error is given
01264 */
01265 
01266 static bool prepare_blob_field(Session *,
01267                                CreateField *sql_field)
01268 {
01269 
01270   if (sql_field->length > MAX_FIELD_VARCHARLENGTH &&
01271       !(sql_field->flags & BLOB_FLAG))
01272   {
01273     my_error(ER_TOO_BIG_FIELDLENGTH, MYF(0), sql_field->field_name,
01274              MAX_FIELD_VARCHARLENGTH / sql_field->charset->mbmaxlen);
01275     return 1;
01276   }
01277 
01278   if ((sql_field->flags & BLOB_FLAG) && sql_field->length)
01279   {
01280     if (sql_field->sql_type == DRIZZLE_TYPE_BLOB)
01281     {
01282       /* The user has given a length to the blob column */
01283       sql_field->pack_length= calc_pack_length(sql_field->sql_type, 0);
01284     }
01285     sql_field->length= 0;
01286   }
01287   return 0;
01288 }
01289 
01290 static bool locked_create_event(Session *session,
01291                                 const identifier::Table &identifier,
01292                                 HA_CREATE_INFO *create_info,
01293                                 message::Table &table_proto,
01294                                 AlterInfo *alter_info,
01295                                 bool is_if_not_exists,
01296                                 bool internal_tmp_table,
01297                                 uint db_options,
01298                                 uint key_count,
01299                                 KeyInfo *key_info_buffer)
01300 {
01301   bool error= true;
01302 
01303   {
01304 
01305     /*
01306       @note if we are building a temp table we need to check to see if a temp table
01307       already exists, otherwise we just need to find out if a normal table exists (aka it is fine
01308       to create a table under a temporary table.
01309     */
01310     bool exists= 
01311       plugin::StorageEngine::doesTableExist(*session, identifier, 
01312                                             identifier.getType() != message::Table::STANDARD );
01313 
01314     if (exists)
01315     {
01316       if (is_if_not_exists)
01317       {
01318         error= false;
01319         push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
01320                             ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
01321                             identifier.getTableName().c_str());
01322         create_info->table_existed= 1;    // Mark that table existed
01323         return error;
01324       }
01325 
01326       my_error(ER_TABLE_EXISTS_ERROR, identifier);
01327 
01328       return error;
01329     }
01330 
01331     if (identifier.getType() == message::Table::STANDARD) // We have a real table
01332     {
01333       /*
01334         We don't assert here, but check the result, because the table could be
01335         in the table definition cache and in the same time the .frm could be
01336         missing from the disk, in case of manual intervention which deletes
01337         the .frm cursor. The user has to use FLUSH TABLES; to clear the cache.
01338         Then she could create the table. This case is pretty obscure and
01339         therefore we don't introduce a new error message only for it.
01340       */
01341       /*
01342         @todo improve this error condition.
01343       */
01344       if (definition::Cache::singleton().find(identifier.getKey()))
01345       {
01346         my_error(ER_TABLE_EXISTS_ERROR, identifier);
01347 
01348         return error;
01349       }
01350     }
01351   }
01352 
01353   session->set_proc_info("creating table");
01354   create_info->table_existed= 0;    // Mark that table is created
01355 
01356   create_info->table_options= db_options;
01357 
01358   if (not rea_create_table(session, identifier,
01359                            table_proto,
01360                            create_info, alter_info->create_list,
01361                            key_count, key_info_buffer))
01362   {
01363     return error;
01364   }
01365 
01366   if (identifier.getType() == message::Table::TEMPORARY)
01367   {
01368     /* Open table and put in temporary table list */
01369     if (not (session->open_temporary_table(identifier)))
01370     {
01371       (void) session->rm_temporary_table(identifier);
01372       return error;
01373     }
01374   }
01375 
01376   /* 
01377     We keep this behind the lock to make sure ordering is correct for a table.
01378     This is a very unlikely problem where before we would write out to the
01379     trans log, someone would do a delete/create operation.
01380   */
01381 
01382   if (table_proto.type() == message::Table::STANDARD && not internal_tmp_table)
01383   {
01384     TransactionServices &transaction_services= TransactionServices::singleton();
01385     transaction_services.createTable(*session, table_proto);
01386   }
01387 
01388   return false;
01389 }
01390 
01391 
01392 /*
01393   Ignore the name of this function... it locks :(
01394 
01395   Create a table
01396 
01397   SYNOPSIS
01398     create_table_no_lock()
01399     session     Thread object
01400     db      Database
01401     table_name    Table name
01402     create_info         Create information (like MAX_ROWS)
01403     fields    List of fields to create
01404     keys    List of keys to create
01405     internal_tmp_table  Set to 1 if this is an internal temporary table
01406       (From ALTER Table)
01407     select_field_count
01408 
01409   DESCRIPTION
01410     If one creates a temporary table, this is automatically opened
01411 
01412     Note that this function assumes that caller already have taken
01413     name-lock on table being created or used some other way to ensure
01414     that concurrent operations won't intervene. create_table()
01415     is a wrapper that can be used for this.
01416 
01417   RETURN VALUES
01418     false OK
01419     true  error
01420 */
01421 
01422 bool create_table_no_lock(Session *session,
01423                                 const identifier::Table &identifier,
01424                                 HA_CREATE_INFO *create_info,
01425         message::Table &table_proto,
01426                                 AlterInfo *alter_info,
01427                                 bool internal_tmp_table,
01428                                 uint32_t select_field_count,
01429                                 bool is_if_not_exists)
01430 {
01431   uint    db_options, key_count;
01432   KeyInfo   *key_info_buffer;
01433   bool    error= true;
01434 
01435   /* Check for duplicate fields and check type of table to create */
01436   if (not alter_info->create_list.size())
01437   {
01438     my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
01439                MYF(0));
01440     return true;
01441   }
01442   assert(identifier.getTableName() == table_proto.name());
01443   db_options= create_info->table_options;
01444 
01445   set_table_default_charset(create_info, identifier.getSchemaName().c_str());
01446 
01447   /* Build a Table object to pass down to the engine, and the do the actual create. */
01448   if (not prepare_create_table(session, create_info, table_proto, alter_info,
01449                                internal_tmp_table,
01450                                &db_options,
01451                                &key_info_buffer, &key_count,
01452                                select_field_count))
01453   {
01454     boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* CREATE TABLE (some confussion on naming, double check) */
01455     error= locked_create_event(session,
01456                                identifier,
01457                                create_info,
01458                                table_proto,
01459                                alter_info,
01460                                is_if_not_exists,
01461                                internal_tmp_table,
01462                                db_options, key_count,
01463                                key_info_buffer);
01464   }
01465 
01466   session->set_proc_info("After create");
01467 
01468   return(error);
01469 }
01470 
01474 static bool drizzle_create_table(Session *session,
01475                                  const identifier::Table &identifier,
01476                                  HA_CREATE_INFO *create_info,
01477                                  message::Table &table_proto,
01478                                  AlterInfo *alter_info,
01479                                  bool internal_tmp_table,
01480                                  uint32_t select_field_count,
01481                                  bool is_if_not_exists)
01482 {
01483   Table *name_lock= NULL;
01484   bool result;
01485 
01486   if (session->lock_table_name_if_not_cached(identifier, &name_lock))
01487   {
01488     result= true;
01489   }
01490   else if (name_lock == NULL)
01491   {
01492     if (is_if_not_exists)
01493     {
01494       push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
01495                           ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
01496                           identifier.getTableName().c_str());
01497       create_info->table_existed= 1;
01498       result= false;
01499     }
01500     else
01501     {
01502       my_error(ER_TABLE_EXISTS_ERROR, identifier);
01503       result= true;
01504     }
01505   }
01506   else
01507   {
01508     result= create_table_no_lock(session,
01509                                        identifier,
01510                                        create_info,
01511                                        table_proto,
01512                                        alter_info,
01513                                        internal_tmp_table,
01514                                        select_field_count,
01515                                        is_if_not_exists);
01516   }
01517 
01518   if (name_lock)
01519   {
01520     boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* Lock for removing name_lock during table create */
01521     session->unlink_open_table(name_lock);
01522   }
01523 
01524   return(result);
01525 }
01526 
01527 
01528 /*
01529   Database locking aware wrapper for create_table_no_lock(),
01530 */
01531 bool create_table(Session *session,
01532                         const identifier::Table &identifier,
01533                         HA_CREATE_INFO *create_info,
01534       message::Table &table_proto,
01535                         AlterInfo *alter_info,
01536                         bool internal_tmp_table,
01537                         uint32_t select_field_count,
01538                         bool is_if_not_exists)
01539 {
01540   if (identifier.isTmp())
01541   {
01542     return create_table_no_lock(session,
01543                                       identifier,
01544                                       create_info,
01545                                       table_proto,
01546                                       alter_info,
01547                                       internal_tmp_table,
01548                                       select_field_count,
01549                                       is_if_not_exists);
01550   }
01551 
01552   return drizzle_create_table(session,
01553                               identifier,
01554                               create_info,
01555                               table_proto,
01556                               alter_info,
01557                               internal_tmp_table,
01558                               select_field_count,
01559                               is_if_not_exists);
01560 }
01561 
01562 
01563 /*
01564 ** Give the key name after the first field with an optional '_#' after
01565 **/
01566 
01567 static bool
01568 check_if_keyname_exists(const char *name, KeyInfo *start, KeyInfo *end)
01569 {
01570   for (KeyInfo *key=start ; key != end ; key++)
01571     if (!my_strcasecmp(system_charset_info,name,key->name))
01572       return 1;
01573   return 0;
01574 }
01575 
01576 
01577 static char *
01578 make_unique_key_name(const char *field_name,KeyInfo *start,KeyInfo *end)
01579 {
01580   char buff[MAX_FIELD_NAME],*buff_end;
01581 
01582   if (!check_if_keyname_exists(field_name,start,end) &&
01583       !is_primary_key_name(field_name))
01584     return (char*) field_name;      // Use fieldname
01585 
01586   buff_end= strncpy(buff, field_name, sizeof(buff)-4);
01587   buff_end+= strlen(buff);
01588 
01589   /*
01590     Only 3 chars + '\0' left, so need to limit to 2 digit
01591     This is ok as we can't have more than 100 keys anyway
01592   */
01593   for (uint32_t i=2 ; i< 100; i++)
01594   {
01595     *buff_end= '_';
01596     internal::int10_to_str(i, buff_end+1, 10);
01597     if (!check_if_keyname_exists(buff,start,end))
01598       return memory::sql_strdup(buff);
01599   }
01600   return (char*) "not_specified";   // Should never happen
01601 }
01602 
01603 
01604 /****************************************************************************
01605 ** Alter a table definition
01606 ****************************************************************************/
01607 
01608 /*
01609   Rename a table.
01610 
01611   SYNOPSIS
01612     rename_table()
01613       session
01614       base                      The plugin::StorageEngine handle.
01615       old_db                    The old database name.
01616       old_name                  The old table name.
01617       new_db                    The new database name.
01618       new_name                  The new table name.
01619 
01620   RETURN
01621     false   OK
01622     true    Error
01623 */
01624 
01625 bool
01626 rename_table(Session &session,
01627                    plugin::StorageEngine *base,
01628                    const identifier::Table &from,
01629                    const identifier::Table &to)
01630 {
01631   int error= 0;
01632 
01633   assert(base);
01634 
01635   if (not plugin::StorageEngine::doesSchemaExist(to))
01636   {
01637     my_error(ER_NO_DB_ERROR, MYF(0), to.getSchemaName().c_str());
01638     return true;
01639   }
01640 
01641   error= base->renameTable(session, from, to);
01642 
01643   if (error == HA_ERR_WRONG_COMMAND)
01644   {
01645     my_error(ER_NOT_SUPPORTED_YET, MYF(0), "ALTER Table");
01646   }
01647   else if (error)
01648   {
01649     std::string from_path;
01650     std::string to_path;
01651 
01652     from.getSQLPath(from_path);
01653     to.getSQLPath(to_path);
01654 
01655     const char *from_identifier= from.isTmp() ? "#sql-temporary" : from_path.c_str();
01656     const char *to_identifier= to.isTmp() ? "#sql-temporary" : to_path.c_str();
01657 
01658     my_error(ER_ERROR_ON_RENAME, MYF(0), from_identifier, to_identifier, error);
01659   }
01660 
01661   return error ? true : false; 
01662 }
01663 
01664 
01665 /*
01666   Force all other threads to stop using the table
01667 
01668   SYNOPSIS
01669     wait_while_table_is_used()
01670     session     Thread Cursor
01671     table   Table to remove from cache
01672     function            HA_EXTRA_PREPARE_FOR_DROP if table is to be deleted
01673                         HA_EXTRA_FORCE_REOPEN if table is not be used
01674                         HA_EXTRA_PREPARE_FOR_RENAME if table is to be renamed
01675   NOTES
01676    When returning, the table will be unusable for other threads until
01677    the table is closed.
01678 
01679   PREREQUISITES
01680     Lock on table::Cache::singleton().mutex()
01681     Win32 clients must also have a WRITE LOCK on the table !
01682 */
01683 
01684 void wait_while_table_is_used(Session *session, Table *table,
01685                               enum ha_extra_function function)
01686 {
01687 
01688   safe_mutex_assert_owner(table::Cache::singleton().mutex().native_handle());
01689 
01690   table->cursor->extra(function);
01691   /* Mark all tables that are in use as 'old' */
01692   session->abortLock(table);  /* end threads waiting on lock */
01693 
01694   /* Wait until all there are no other threads that has this table open */
01695   identifier::Table identifier(table->getShare()->getSchemaName(), table->getShare()->getTableName());
01696   table::Cache::singleton().removeTable(session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG);
01697 }
01698 
01699 /*
01700   Close a cached table
01701 
01702   SYNOPSIS
01703     close_cached_table()
01704     session     Thread Cursor
01705     table   Table to remove from cache
01706 
01707   NOTES
01708     Function ends by signaling threads waiting for the table to try to
01709     reopen the table.
01710 
01711   PREREQUISITES
01712     Lock on table::Cache::singleton().mutex()
01713     Win32 clients must also have a WRITE LOCK on the table !
01714 */
01715 
01716 void Session::close_cached_table(Table *table)
01717 {
01718 
01719   wait_while_table_is_used(this, table, HA_EXTRA_FORCE_REOPEN);
01720   /* Close lock if this is not got with LOCK TABLES */
01721   if (lock)
01722   {
01723     unlockTables(lock);
01724     lock= NULL;     // Start locked threads
01725   }
01726   /* Close all copies of 'table'.  This also frees all LOCK TABLES lock */
01727   unlink_open_table(table);
01728 
01729   /* When lock on table::Cache::singleton().mutex() is freed other threads can continue */
01730   locking::broadcast_refresh();
01731 }
01732 
01733 /*
01734   RETURN VALUES
01735     false Message sent to net (admin operation went ok)
01736     true  Message should be sent by caller
01737           (admin operation or network communication failed)
01738 */
01739 static bool admin_table(Session* session, TableList* tables,
01740                               HA_CHECK_OPT* check_opt,
01741                               const char *operator_name,
01742                               thr_lock_type lock_type,
01743                               bool open_for_modify,
01744                               int (Cursor::*operator_func)(Session *,
01745                                                             HA_CHECK_OPT *))
01746 {
01747   TableList *table;
01748   Select_Lex *select= &session->lex().select_lex;
01749   List<Item> field_list;
01750   Item *item;
01751   int result_code= 0;
01752   TransactionServices &transaction_services= TransactionServices::singleton();
01753   const CHARSET_INFO * const cs= system_charset_info;
01754 
01755   if (! session->endActiveTransaction())
01756     return 1;
01757 
01758   field_list.push_back(item = new Item_empty_string("Table",
01759                                                     NAME_CHAR_LEN * 2,
01760                                                     cs));
01761   item->maybe_null = 1;
01762   field_list.push_back(item = new Item_empty_string("Op", 10, cs));
01763   item->maybe_null = 1;
01764   field_list.push_back(item = new Item_empty_string("Msg_type", 10, cs));
01765   item->maybe_null = 1;
01766   field_list.push_back(item = new Item_empty_string("Msg_text", 255, cs));
01767   item->maybe_null = 1;
01768   if (session->getClient()->sendFields(&field_list))
01769     return true;
01770 
01771   for (table= tables; table; table= table->next_local)
01772   {
01773     identifier::Table table_identifier(table->getSchemaName(), table->getTableName());
01774     std::string table_name;
01775     bool fatal_error=0;
01776 
01777     table_identifier.getSQLPath(table_name);
01778 
01779     table->lock_type= lock_type;
01780     /* open only one table from local list of command */
01781     {
01782       TableList *save_next_global, *save_next_local;
01783       save_next_global= table->next_global;
01784       table->next_global= 0;
01785       save_next_local= table->next_local;
01786       table->next_local= 0;
01787       select->table_list.first= (unsigned char*)table;
01788       /*
01789         Time zone tables and SP tables can be add to lex->query_tables list,
01790         so it have to be prepared.
01791         @todo Investigate if we can put extra tables into argument instead of using lex->query_tables
01792       */
01793       session->lex().query_tables= table;
01794       session->lex().query_tables_last= &table->next_global;
01795       session->lex().query_tables_own_last= 0;
01796       session->no_warnings_for_error= 0;
01797 
01798       session->openTablesLock(table);
01799       session->no_warnings_for_error= 0;
01800       table->next_global= save_next_global;
01801       table->next_local= save_next_local;
01802     }
01803 
01804     /*
01805       CHECK Table command is only command where VIEW allowed here and this
01806       command use only temporary teble method for VIEWs resolving => there
01807       can't be VIEW tree substitition of join view => if opening table
01808       succeed then table->table will have real Table pointer as value (in
01809       case of join view substitution table->table can be 0, but here it is
01810       impossible)
01811     */
01812     if (!table->table)
01813     {
01814       if (!session->warn_list.size())
01815         push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_ERROR,
01816                      ER_CHECK_NO_SUCH_TABLE, ER(ER_CHECK_NO_SUCH_TABLE));
01817       result_code= HA_ADMIN_CORRUPT;
01818       goto send_result;
01819     }
01820 
01821     if ((table->table->db_stat & HA_READ_ONLY) && open_for_modify)
01822     {
01823       char buff[FN_REFLEN + DRIZZLE_ERRMSG_SIZE];
01824       uint32_t length;
01825       session->getClient()->store(table_name.c_str());
01826       session->getClient()->store(operator_name);
01827       session->getClient()->store(STRING_WITH_LEN("error"));
01828       length= snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),
01829                        table_name.c_str());
01830       session->getClient()->store(buff, length);
01831       transaction_services.autocommitOrRollback(*session, false);
01832       session->endTransaction(COMMIT);
01833       session->close_thread_tables();
01834       session->lex().reset_query_tables_list(false);
01835       table->table=0;       // For query cache
01836       if (session->getClient()->flush())
01837   goto err;
01838       continue;
01839     }
01840 
01841     /* Close all instances of the table to allow repair to rename files */
01842     if (lock_type == TL_WRITE && table->table->getShare()->getVersion())
01843     {
01844       table::Cache::singleton().mutex().lock(); /* Lock type is TL_WRITE and we lock to repair the table */
01845       const char *old_message=session->enter_cond(COND_refresh, table::Cache::singleton().mutex(),
01846                                                   "Waiting to get writelock");
01847       session->abortLock(table->table);
01848       identifier::Table identifier(table->table->getShare()->getSchemaName(), table->table->getShare()->getTableName());
01849       table::Cache::singleton().removeTable(session, identifier, RTFC_WAIT_OTHER_THREAD_FLAG | RTFC_CHECK_KILLED_FLAG);
01850       session->exit_cond(old_message);
01851       if (session->getKilled())
01852   goto err;
01853       open_for_modify= 0;
01854     }
01855 
01856     result_code = (table->table->cursor->*operator_func)(session, check_opt);
01857 
01858 send_result:
01859 
01860     session->lex().cleanup_after_one_table_open();
01861     session->clear_error();  // these errors shouldn't get client
01862     {
01863       List<DRIZZLE_ERROR>::iterator it(session->warn_list.begin());
01864       DRIZZLE_ERROR *err;
01865       while ((err= it++))
01866       {
01867         session->getClient()->store(table_name.c_str());
01868         session->getClient()->store(operator_name);
01869         session->getClient()->store(warning_level_names[err->level].str,
01870                                warning_level_names[err->level].length);
01871         session->getClient()->store(err->msg);
01872         if (session->getClient()->flush())
01873           goto err;
01874       }
01875       drizzle_reset_errors(session, true);
01876     }
01877     session->getClient()->store(table_name.c_str());
01878     session->getClient()->store(operator_name);
01879 
01880     switch (result_code) {
01881     case HA_ADMIN_NOT_IMPLEMENTED:
01882       {
01883   char buf[ERRMSGSIZE+20];
01884   uint32_t length=snprintf(buf, ERRMSGSIZE,
01885                              ER(ER_CHECK_NOT_IMPLEMENTED), operator_name);
01886   session->getClient()->store(STRING_WITH_LEN("note"));
01887   session->getClient()->store(buf, length);
01888       }
01889       break;
01890 
01891     case HA_ADMIN_OK:
01892       session->getClient()->store(STRING_WITH_LEN("status"));
01893       session->getClient()->store(STRING_WITH_LEN("OK"));
01894       break;
01895 
01896     case HA_ADMIN_FAILED:
01897       session->getClient()->store(STRING_WITH_LEN("status"));
01898       session->getClient()->store(STRING_WITH_LEN("Operation failed"));
01899       break;
01900 
01901     case HA_ADMIN_REJECT:
01902       session->getClient()->store(STRING_WITH_LEN("status"));
01903       session->getClient()->store(STRING_WITH_LEN("Operation need committed state"));
01904       open_for_modify= false;
01905       break;
01906 
01907     case HA_ADMIN_ALREADY_DONE:
01908       session->getClient()->store(STRING_WITH_LEN("status"));
01909       session->getClient()->store(STRING_WITH_LEN("Table is already up to date"));
01910       break;
01911 
01912     case HA_ADMIN_CORRUPT:
01913       session->getClient()->store(STRING_WITH_LEN("error"));
01914       session->getClient()->store(STRING_WITH_LEN("Corrupt"));
01915       fatal_error=1;
01916       break;
01917 
01918     case HA_ADMIN_INVALID:
01919       session->getClient()->store(STRING_WITH_LEN("error"));
01920       session->getClient()->store(STRING_WITH_LEN("Invalid argument"));
01921       break;
01922 
01923     default:        // Probably HA_ADMIN_INTERNAL_ERROR
01924       {
01925         char buf[ERRMSGSIZE+20];
01926         uint32_t length=snprintf(buf, ERRMSGSIZE,
01927                              _("Unknown - internal error %d during operation"),
01928                              result_code);
01929         session->getClient()->store(STRING_WITH_LEN("error"));
01930         session->getClient()->store(buf, length);
01931         fatal_error=1;
01932         break;
01933       }
01934     }
01935     if (table->table)
01936     {
01937       if (fatal_error)
01938       {
01939         table->table->getMutableShare()->resetVersion();               // Force close of table
01940       }
01941       else if (open_for_modify)
01942       {
01943         if (table->table->getShare()->getType())
01944         {
01945           table->table->cursor->info(HA_STATUS_CONST);
01946         }
01947         else
01948         {
01949           boost::unique_lock<boost::mutex> lock(table::Cache::singleton().mutex());
01950     identifier::Table identifier(table->table->getShare()->getSchemaName(), table->table->getShare()->getTableName());
01951           table::Cache::singleton().removeTable(session, identifier, RTFC_NO_FLAG);
01952         }
01953       }
01954     }
01955     transaction_services.autocommitOrRollback(*session, false);
01956     session->endTransaction(COMMIT);
01957     session->close_thread_tables();
01958     table->table=0;       // For query cache
01959     if (session->getClient()->flush())
01960       goto err;
01961   }
01962 
01963   session->my_eof();
01964   return(false);
01965 
01966 err:
01967   transaction_services.autocommitOrRollback(*session, true);
01968   session->endTransaction(ROLLBACK);
01969   session->close_thread_tables();     // Shouldn't be needed
01970   if (table)
01971     table->table=0;
01972   return(true);
01973 }
01974 
01975   /*
01976     Create a new table by copying from source table
01977 
01978     Altough exclusive name-lock on target table protects us from concurrent
01979     DML and DDL operations on it we still want to wrap .FRM creation and call
01980     to plugin::StorageEngine::createTable() in critical section protected by
01981     table::Cache::singleton().mutex() in order to provide minimal atomicity against operations which
01982     disregard name-locks, like I_S implementation, for example. This is a
01983     temporary and should not be copied. Instead we should fix our code to
01984     always honor name-locks.
01985 
01986     Also some engines (e.g. NDB cluster) require that table::Cache::singleton().mutex() should be held
01987     during the call to plugin::StorageEngine::createTable().
01988     See bug #28614 for more info.
01989   */
01990 static bool create_table_wrapper(Session &session,
01991                                  const message::Table& create_table_proto,
01992                                  identifier::Table::const_reference destination_identifier,
01993                                  identifier::Table::const_reference source_identifier,
01994                                  bool is_engine_set)
01995 {
01996   // We require an additional table message because during parsing we used
01997   // a "new" message and it will not have all of the information that the
01998   // source table message would have.
01999   message::Table new_table_message;
02000 
02001   message::table::shared_ptr source_table_message= plugin::StorageEngine::getTableMessage(session, source_identifier);
02002 
02003   if (not source_table_message)
02004   {
02005     my_error(ER_TABLE_UNKNOWN, source_identifier);
02006     return false;
02007   }
02008 
02009   new_table_message.CopyFrom(*source_table_message);
02010 
02011   if (destination_identifier.isTmp())
02012   {
02013     new_table_message.set_type(message::Table::TEMPORARY);
02014   }
02015   else
02016   {
02017     new_table_message.set_type(message::Table::STANDARD);
02018   }
02019 
02020   if (is_engine_set)
02021   {
02022     new_table_message.mutable_engine()->set_name(create_table_proto.engine().name());
02023   }
02024 
02025   { // We now do a selective copy of elements on to the new table.
02026     new_table_message.set_name(create_table_proto.name());
02027     new_table_message.set_schema(create_table_proto.schema());
02028     new_table_message.set_catalog(create_table_proto.catalog());
02029   }
02030 
02031   /* Fix names of foreign keys being added */
02032   for (int32_t j= 0; j < new_table_message.fk_constraint_size(); j++)
02033   {
02034     if (new_table_message.fk_constraint(j).has_name())
02035     {
02036       std::string name(new_table_message.name());
02037       char number[20];
02038 
02039       name.append("_ibfk_");
02040       snprintf(number, sizeof(number), "%d", j+1);
02041       name.append(number);
02042 
02043       message::Table::ForeignKeyConstraint *pfkey= new_table_message.mutable_fk_constraint(j);
02044       pfkey->set_name(name);
02045     }
02046   }
02047 
02048   /*
02049     As mysql_truncate don't work on a new table at this stage of
02050     creation, instead create the table directly (for both normal and temporary tables).
02051   */
02052   bool success= plugin::StorageEngine::createTable(session,
02053                                                    destination_identifier,
02054                                                    new_table_message);
02055 
02056   if (success && not destination_identifier.isTmp())
02057   {
02058     TransactionServices &transaction_services= TransactionServices::singleton();
02059     transaction_services.createTable(session, new_table_message);
02060   }
02061 
02062   return success;
02063 }
02064 
02065 /*
02066   Create a table identical to the specified table
02067 
02068   SYNOPSIS
02069     create_like_table()
02070     session   Thread object
02071     table       Table list element for target table
02072     src_table   Table list element for source table
02073     create_info Create info
02074 
02075   RETURN VALUES
02076     false OK
02077     true  error
02078 */
02079 
02080 bool create_like_table(Session* session,
02081                        identifier::Table::const_reference destination_identifier,
02082                        identifier::Table::const_reference source_identifier,
02083                        message::Table &create_table_proto,
02084                        bool is_if_not_exists,
02085                        bool is_engine_set)
02086 {
02087   bool res= true;
02088   bool table_exists= false;
02089 
02090   /*
02091     Check that destination tables does not exist. Note that its name
02092     was already checked when it was added to the table list.
02093 
02094     For temporary tables we don't aim to grab locks.
02095   */
02096   if (destination_identifier.isTmp())
02097   {
02098     if (session->find_temporary_table(destination_identifier))
02099     {
02100       table_exists= true;
02101     }
02102     else
02103     {
02104       bool was_created= create_table_wrapper(*session,
02105                                              create_table_proto,
02106                                              destination_identifier,
02107                                              source_identifier,
02108                                              is_engine_set);
02109       if (not was_created) // This is pretty paranoid, but we assume something might not clean up after itself
02110       {
02111         (void) session->rm_temporary_table(destination_identifier, true);
02112       }
02113       else if (not session->open_temporary_table(destination_identifier))
02114       {
02115         // We created, but we can't open... also, a hack.
02116         (void) session->rm_temporary_table(destination_identifier, true);
02117       }
02118       else
02119       {
02120         res= false;
02121       }
02122     }
02123   }
02124   else // Standard table which will require locks.
02125   {
02126     Table *name_lock= 0;
02127 
02128     if (session->lock_table_name_if_not_cached(destination_identifier, &name_lock))
02129     {
02130       if (name_lock)
02131       {
02132         boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* unlink open tables for create table like*/
02133         session->unlink_open_table(name_lock);
02134       }
02135 
02136       return res;
02137     }
02138 
02139     if (not name_lock)
02140     {
02141       table_exists= true;
02142     }
02143     else if (plugin::StorageEngine::doesTableExist(*session, destination_identifier))
02144     {
02145       table_exists= true;
02146     }
02147     else // Otherwise we create the table
02148     {
02149       bool was_created;
02150       {
02151         boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* We lock for CREATE TABLE LIKE to copy table definition */
02152         was_created= create_table_wrapper(*session, create_table_proto, destination_identifier,
02153                                           source_identifier, is_engine_set);
02154       }
02155 
02156       // So we blew the creation of the table, and we scramble to clean up
02157       // anything that might have been created (read... it is a hack)
02158       if (not was_created)
02159       {
02160         plugin::StorageEngine::dropTable(*session, destination_identifier);
02161       } 
02162       else
02163       {
02164         res= false;
02165       }
02166     }
02167 
02168     if (name_lock)
02169     {
02170       boost_unique_lock_t lock(table::Cache::singleton().mutex()); /* unlink open tables for create table like*/
02171       session->unlink_open_table(name_lock);
02172     }
02173   }
02174 
02175   if (table_exists)
02176   {
02177     if (is_if_not_exists)
02178     {
02179       char warn_buff[DRIZZLE_ERRMSG_SIZE];
02180       snprintf(warn_buff, sizeof(warn_buff),
02181                ER(ER_TABLE_EXISTS_ERROR), destination_identifier.getTableName().c_str());
02182       push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
02183                    ER_TABLE_EXISTS_ERROR, warn_buff);
02184       return false;
02185     }
02186 
02187     my_error(ER_TABLE_EXISTS_ERROR, destination_identifier);
02188 
02189     return true;
02190   }
02191 
02192   return res;
02193 }
02194 
02195 
02196 bool analyze_table(Session* session, TableList* tables, HA_CHECK_OPT* check_opt)
02197 {
02198   thr_lock_type lock_type = TL_READ_NO_INSERT;
02199 
02200   return(admin_table(session, tables, check_opt,
02201         "analyze", lock_type, true,
02202         &Cursor::ha_analyze));
02203 }
02204 
02205 
02206 bool check_table(Session* session, TableList* tables,HA_CHECK_OPT* check_opt)
02207 {
02208   thr_lock_type lock_type = TL_READ_NO_INSERT;
02209 
02210   return(admin_table(session, tables, check_opt,
02211         "check", lock_type,
02212         false,
02213         &Cursor::ha_check));
02214 }
02215 
02216 } /* namespace drizzled */