Drizzled Public API Documentation

alter_table.cc
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2009 Sun Microsystems, Inc.
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00021 #include <config.h>
00022 
00023 #include <fcntl.h>
00024 
00025 #include <sstream>
00026 
00027 #include <drizzled/show.h>
00028 #include <drizzled/lock.h>
00029 #include <drizzled/session.h>
00030 #include <drizzled/statement/alter_table.h>
00031 #include <drizzled/global_charset_info.h>
00032 #include <drizzled/gettext.h>
00033 #include <drizzled/data_home.h>
00034 #include <drizzled/sql_table.h>
00035 #include <drizzled/table_proto.h>
00036 #include <drizzled/optimizer/range.h>
00037 #include <drizzled/time_functions.h>
00038 #include <drizzled/records.h>
00039 #include <drizzled/pthread_globals.h>
00040 #include <drizzled/internal/my_sys.h>
00041 #include <drizzled/internal/iocache.h>
00042 #include <drizzled/plugin/storage_engine.h>
00043 #include <drizzled/copy_field.h>
00044 #include <drizzled/transaction_services.h>
00045 #include <drizzled/filesort.h>
00046 #include <drizzled/message.h>
00047 #include <drizzled/alter_column.h>
00048 #include <drizzled/alter_drop.h>
00049 #include <drizzled/alter_info.h>
00050 
00051 using namespace std;
00052 
00053 namespace drizzled
00054 {
00055 
00056 extern pid_t current_pid;
00057 
00058 static int copy_data_between_tables(Session *session,
00059                                     Table *from,Table *to,
00060                                     List<CreateField> &create,
00061                                     bool ignore,
00062                                     uint32_t order_num,
00063                                     Order *order,
00064                                     ha_rows *copied,
00065                                     ha_rows *deleted,
00066                                     enum enum_enable_or_disable keys_onoff,
00067                                     bool error_if_not_empty);
00068 
00069 static bool prepare_alter_table(Session *session,
00070                                       Table *table,
00071                                       HA_CREATE_INFO *create_info,
00072                                       const message::Table &original_proto,
00073                                       message::Table &table_message,
00074                                       AlterInfo *alter_info);
00075 
00076 static Table *open_alter_table(Session *session, Table *table, identifier::Table &identifier);
00077 
00078 namespace statement {
00079 
00080 AlterTable::AlterTable(Session *in_session, Table_ident *, drizzled::ha_build_method build_arg) :
00081   CreateTable(in_session)
00082 {
00083   set_command(SQLCOM_ALTER_TABLE);
00084   alter_info.build_method= build_arg;
00085 }
00086 
00087 } // namespace statement
00088 
00089 bool statement::AlterTable::execute()
00090 {
00091   TableList *first_table= (TableList *) lex().select_lex.table_list.first;
00092   TableList *all_tables= lex().query_tables;
00093   assert(first_table == all_tables && first_table != 0);
00094   Select_Lex *select_lex= &lex().select_lex;
00095   bool need_start_waiting= false;
00096 
00097   is_engine_set= not createTableMessage().engine().name().empty();
00098 
00099   if (is_engine_set)
00100   {
00101     create_info().db_type=
00102       plugin::StorageEngine::findByName(session(), createTableMessage().engine().name());
00103 
00104     if (create_info().db_type == NULL)
00105     {
00106       my_error(createTableMessage().engine().name(), ER_UNKNOWN_STORAGE_ENGINE, MYF(0));
00107 
00108       return true;
00109     }
00110   }
00111 
00112   /* Must be set in the parser */
00113   assert(select_lex->db);
00114 
00115   /* Chicken/Egg... we need to search for the table, to know if the table exists, so we can build a full identifier from it */
00116   message::table::shared_ptr original_table_message;
00117   {
00118     identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName());
00119     if (not (original_table_message= plugin::StorageEngine::getTableMessage(session(), identifier)))
00120     {
00121       my_error(ER_BAD_TABLE_ERROR, identifier);
00122       return true;
00123     }
00124 
00125     if (not  create_info().db_type)
00126     {
00127       create_info().db_type=
00128         plugin::StorageEngine::findByName(session(), original_table_message->engine().name());
00129 
00130       if (not create_info().db_type)
00131       {
00132         my_error(ER_BAD_TABLE_ERROR, identifier);
00133         return true;
00134       }
00135     }
00136   }
00137 
00138   if (not validateCreateTableOption())
00139     return true;
00140 
00141   if (session().inTransaction())
00142   {
00143     my_error(ER_TRANSACTIONAL_DDL_NOT_SUPPORTED, MYF(0));
00144     return true;
00145   }
00146 
00147   if (not (need_start_waiting= not session().wait_if_global_read_lock(0, 1)))
00148     return true;
00149 
00150   bool res;
00151   if (original_table_message->type() == message::Table::STANDARD )
00152   {
00153     identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName());
00154     identifier::Table new_identifier(select_lex->db ? select_lex->db : first_table->getSchemaName(),
00155                                    lex().name.str ? lex().name.str : first_table->getTableName());
00156 
00157     res= alter_table(&session(),
00158                      identifier,
00159                      new_identifier,
00160                      &create_info(),
00161                      *original_table_message,
00162                      createTableMessage(),
00163                      first_table,
00164                      &alter_info,
00165                      select_lex->order_list.size(),
00166                      (Order *) select_lex->order_list.first,
00167                      lex().ignore);
00168   }
00169   else
00170   {
00171     identifier::Table catch22(first_table->getSchemaName(), first_table->getTableName());
00172     Table *table= session().find_temporary_table(catch22);
00173     assert(table);
00174     {
00175       identifier::Table identifier(first_table->getSchemaName(), first_table->getTableName(), table->getMutableShare()->getPath());
00176       identifier::Table new_identifier(select_lex->db ? select_lex->db : first_table->getSchemaName(),
00177                                        lex().name.str ? lex().name.str : first_table->getTableName(),
00178                                        table->getMutableShare()->getPath());
00179 
00180       res= alter_table(&session(),
00181                        identifier,
00182                        new_identifier,
00183                        &create_info(),
00184                        *original_table_message,
00185                        createTableMessage(),
00186                        first_table,
00187                        &alter_info,
00188                        select_lex->order_list.size(),
00189                        (Order *) select_lex->order_list.first,
00190                        lex().ignore);
00191     }
00192   }
00193 
00194   /*
00195      Release the protection against the global read lock and wake
00196      everyone, who might want to set a global read lock.
00197    */
00198   session().startWaitingGlobalReadLock();
00199 
00200   return res;
00201 }
00202 
00203 
00244 static bool prepare_alter_table(Session *session,
00245                                 Table *table,
00246                                 HA_CREATE_INFO *create_info,
00247                                 const message::Table &original_proto,
00248                                 message::Table &table_message,
00249                                 AlterInfo *alter_info)
00250 {
00251   uint32_t used_fields= create_info->used_fields;
00252 
00253   /* Let new create options override the old ones */
00254   message::Table::TableOptions *table_options= table_message.mutable_options();
00255 
00256   if (not (used_fields & HA_CREATE_USED_DEFAULT_CHARSET))
00257     create_info->default_table_charset= table->getShare()->table_charset;
00258 
00259   if (not (used_fields & HA_CREATE_USED_AUTO) && table->found_next_number_field)
00260   {
00261     /* Table has an autoincrement, copy value to new table */
00262     table->cursor->info(HA_STATUS_AUTO);
00263     create_info->auto_increment_value= table->cursor->stats.auto_increment_value;
00264     if (create_info->auto_increment_value != original_proto.options().auto_increment_value())
00265       table_options->set_has_user_set_auto_increment_value(false);
00266   }
00267 
00268   table->restoreRecordAsDefault(); /* Empty record for DEFAULT */
00269 
00270   List<CreateField> new_create_list;
00271   List<Key> new_key_list;
00272   /* First collect all fields from table which isn't in drop_list */
00273   Field *field;
00274   for (Field **f_ptr= table->getFields(); (field= *f_ptr); f_ptr++)
00275   {
00276     /* Check if field should be dropped */
00277     AlterInfo::drop_list_t::iterator drop(alter_info->drop_list.begin());
00278     for (; drop != alter_info->drop_list.end(); drop++)
00279     {
00280       if (drop->type == AlterDrop::COLUMN &&
00281           ! my_strcasecmp(system_charset_info, field->field_name, drop->name))
00282       {
00283         /* Reset auto_increment value if it was dropped */
00284         if (MTYP_TYPENR(field->unireg_check) == Field::NEXT_NUMBER &&
00285             not (used_fields & HA_CREATE_USED_AUTO))
00286         {
00287           create_info->auto_increment_value= 0;
00288           create_info->used_fields|= HA_CREATE_USED_AUTO;
00289         }
00290         break;
00291       }
00292     }
00293 
00294     if (drop != alter_info->drop_list.end())
00295     {
00296       alter_info->drop_list.erase(drop);
00297       continue;
00298     }
00299 
00300     /* Mark that we will read the field */
00301     field->setReadSet();
00302 
00303     CreateField *def;
00304     /* Check if field is changed */
00305     List<CreateField>::iterator def_it= alter_info->create_list.begin();
00306     while ((def= def_it++))
00307     {
00308       if (def->change &&
00309           ! my_strcasecmp(system_charset_info, field->field_name, def->change))
00310         break;
00311     }
00312 
00313     if (def)
00314     {
00315       /* Field is changed */
00316       def->field= field;
00317       if (! def->after)
00318       {
00319         new_create_list.push_back(def);
00320         def_it.remove();
00321       }
00322     }
00323     else
00324     {
00325       /*
00326         This field was not dropped and not changed, add it to the list
00327         for the new table.
00328       */
00329       def= new CreateField(field, field);
00330       new_create_list.push_back(def);
00331       AlterInfo::alter_list_t::iterator alter(alter_info->alter_list.begin());
00332 
00333       for (; alter != alter_info->alter_list.end(); alter++)
00334       {
00335         if (not my_strcasecmp(system_charset_info,field->field_name, alter->name))
00336           break;
00337       }
00338 
00339       if (alter != alter_info->alter_list.end())
00340       {
00341         if (def->sql_type == DRIZZLE_TYPE_BLOB)
00342         {
00343           my_error(ER_BLOB_CANT_HAVE_DEFAULT, MYF(0), def->change);
00344           return true;
00345         }
00346 
00347         if ((def->def= alter->def))
00348         {
00349           /* Use new default */
00350           def->flags&= ~NO_DEFAULT_VALUE_FLAG;
00351         }
00352         else
00353         {
00354           def->flags|= NO_DEFAULT_VALUE_FLAG;
00355         }
00356         alter_info->alter_list.erase(alter);
00357       }
00358     }
00359   }
00360 
00361   CreateField *def;
00362   List<CreateField>::iterator def_it= alter_info->create_list.begin();
00363   while ((def= def_it++)) /* Add new columns */
00364   {
00365     if (def->change && ! def->field)
00366     {
00367       my_error(ER_BAD_FIELD_ERROR, MYF(0), def->change, table->getMutableShare()->getTableName());
00368       return true;
00369     }
00370     /*
00371       If we have been given a field which has no default value, and is not null then we need to bail.
00372     */
00373     if (not (~def->flags & (NO_DEFAULT_VALUE_FLAG | NOT_NULL_FLAG)) and not def->change)
00374     {
00375       alter_info->error_if_not_empty= true;
00376     }
00377     if (! def->after)
00378     {
00379       new_create_list.push_back(def);
00380     }
00381     else if (def->after == first_keyword)
00382     {
00383       new_create_list.push_front(def);
00384     }
00385     else
00386     {
00387       CreateField *find;
00388       List<CreateField>::iterator find_it= new_create_list.begin();
00389 
00390       while ((find= find_it++)) /* Add new columns */
00391       {
00392         if (not my_strcasecmp(system_charset_info,def->after, find->field_name))
00393           break;
00394       }
00395 
00396       if (not find)
00397       {
00398         my_error(ER_BAD_FIELD_ERROR, MYF(0), def->after, table->getMutableShare()->getTableName());
00399         return true;
00400       }
00401 
00402       find_it.after(def); /* Put element after this */
00403 
00404       /*
00405         XXX: hack for Bug#28427.
00406         If column order has changed, force OFFLINE ALTER Table
00407         without querying engine capabilities.  If we ever have an
00408         engine that supports online ALTER Table CHANGE COLUMN
00409         <name> AFTER <name1> (Falcon?), this fix will effectively
00410         disable the capability.
00411         TODO: detect the situation in compare_tables, behave based
00412         on engine capabilities.
00413       */
00414       if (alter_info->build_method == HA_BUILD_ONLINE)
00415       {
00416         my_error(*session->getQueryString(), ER_NOT_SUPPORTED_YET);
00417         return true;
00418       }
00419       alter_info->build_method= HA_BUILD_OFFLINE;
00420     }
00421   }
00422 
00423   if (not alter_info->alter_list.empty())
00424   {
00425     my_error(ER_BAD_FIELD_ERROR, MYF(0), alter_info->alter_list.front().name, table->getMutableShare()->getTableName());
00426     return true;
00427   }
00428 
00429   if (new_create_list.is_empty())
00430   {
00431     my_message(ER_CANT_REMOVE_ALL_FIELDS, ER(ER_CANT_REMOVE_ALL_FIELDS), MYF(0));
00432     return true;
00433   }
00434 
00435   /*
00436     Collect all keys which isn't in drop list. Add only those
00437     for which some fields exists.
00438   */
00439   KeyInfo *key_info= table->key_info;
00440   for (uint32_t i= 0; i < table->getShare()->sizeKeys(); i++, key_info++)
00441   {
00442     char *key_name= key_info->name;
00443     AlterInfo::drop_list_t::iterator drop(alter_info->drop_list.begin());
00444     for (; drop != alter_info->drop_list.end(); drop++)
00445     {
00446       if (drop->type == AlterDrop::KEY &&
00447           not my_strcasecmp(system_charset_info, key_name, drop->name))
00448         break;
00449     }
00450 
00451     if (drop != alter_info->drop_list.end())
00452     {
00453       alter_info->drop_list.erase(drop);
00454       continue;
00455     }
00456 
00457     KeyPartInfo *key_part= key_info->key_part;
00458     List<Key_part_spec> key_parts;
00459     for (uint32_t j= 0; j < key_info->key_parts; j++, key_part++)
00460     {
00461       if (! key_part->field)
00462         continue; /* Wrong field (from UNIREG) */
00463 
00464       const char *key_part_name= key_part->field->field_name;
00465       CreateField *cfield;
00466       List<CreateField>::iterator field_it= new_create_list.begin();
00467       while ((cfield= field_it++))
00468       {
00469         if (cfield->change)
00470         {
00471           if (not my_strcasecmp(system_charset_info, key_part_name, cfield->change))
00472             break;
00473         }
00474         else if (not my_strcasecmp(system_charset_info, key_part_name, cfield->field_name))
00475           break;
00476       }
00477 
00478       if (not cfield)
00479         continue; /* Field is removed */
00480 
00481       uint32_t key_part_length= key_part->length;
00482       if (cfield->field) /* Not new field */
00483       {
00484         /*
00485           If the field can't have only a part used in a key according to its
00486           new type, or should not be used partially according to its
00487           previous type, or the field length is less than the key part
00488           length, unset the key part length.
00489 
00490           We also unset the key part length if it is the same as the
00491           old field's length, so the whole new field will be used.
00492 
00493           BLOBs may have cfield->length == 0, which is why we test it before
00494           checking whether cfield->length < key_part_length (in chars).
00495          */
00496         if (! Field::type_can_have_key_part(cfield->field->type()) ||
00497             ! Field::type_can_have_key_part(cfield->sql_type) ||
00498             (cfield->field->field_length == key_part_length) ||
00499             (cfield->length &&
00500              (cfield->length < key_part_length / key_part->field->charset()->mbmaxlen)))
00501           key_part_length= 0; /* Use whole field */
00502       }
00503       key_part_length/= key_part->field->charset()->mbmaxlen;
00504       key_parts.push_back(new Key_part_spec(cfield->field_name,
00505                                             strlen(cfield->field_name),
00506                                             key_part_length));
00507     }
00508     if (key_parts.size())
00509     {
00510       key_create_information_st key_create_info= default_key_create_info;
00511       Key *key;
00512       Key::Keytype key_type;
00513 
00514       key_create_info.algorithm= key_info->algorithm;
00515 
00516       if (key_info->flags & HA_USES_BLOCK_SIZE)
00517         key_create_info.block_size= key_info->block_size;
00518 
00519       if (key_info->flags & HA_USES_COMMENT)
00520         key_create_info.comment= key_info->comment;
00521 
00522       if (key_info->flags & HA_NOSAME)
00523       {
00524         if (is_primary_key_name(key_name))
00525           key_type= Key::PRIMARY;
00526         else
00527           key_type= Key::UNIQUE;
00528       }
00529       else
00530       {
00531         key_type= Key::MULTIPLE;
00532       }
00533 
00534       key= new Key(key_type,
00535                    key_name,
00536                    strlen(key_name),
00537                    &key_create_info,
00538                    test(key_info->flags & HA_GENERATED_KEY),
00539                    key_parts);
00540       new_key_list.push_back(key);
00541     }
00542   }
00543 
00544   /* Copy over existing foreign keys */
00545   for (int32_t j= 0; j < original_proto.fk_constraint_size(); j++)
00546   {
00547     AlterInfo::drop_list_t::iterator drop(alter_info->drop_list.begin());
00548     for (; drop != alter_info->drop_list.end(); drop++)
00549     {
00550       if (drop->type == AlterDrop::FOREIGN_KEY &&
00551           not my_strcasecmp(system_charset_info, original_proto.fk_constraint(j).name().c_str(), drop->name))
00552       {
00553         break;
00554       }
00555     }
00556     if (drop != alter_info->drop_list.end())
00557     {
00558       alter_info->drop_list.erase(drop);
00559       continue;
00560     }
00561 
00562     message::Table::ForeignKeyConstraint *pfkey= table_message.add_fk_constraint();
00563     *pfkey= original_proto.fk_constraint(j);
00564   }
00565 
00566   {
00567     Key *key;
00568     List<Key>::iterator key_it(alter_info->key_list.begin());
00569     while ((key= key_it++)) /* Add new keys */
00570     {
00571       if (key->type == Key::FOREIGN_KEY)
00572       {
00573         if (((Foreign_key *)key)->validate(new_create_list))
00574         {
00575           return true;
00576         }
00577 
00578         Foreign_key *fkey= (Foreign_key*)key;
00579         add_foreign_key_to_table_message(&table_message,
00580                                          fkey->name.str,
00581                                          fkey->columns,
00582                                          fkey->ref_table,
00583                                          fkey->ref_columns,
00584                                          fkey->delete_opt,
00585                                          fkey->update_opt,
00586                                          fkey->match_opt);
00587       }
00588 
00589       if (key->type != Key::FOREIGN_KEY)
00590         new_key_list.push_back(key);
00591 
00592       if (key->name.str && is_primary_key_name(key->name.str))
00593       {
00594         my_error(ER_WRONG_NAME_FOR_INDEX,
00595                  MYF(0),
00596                  key->name.str);
00597         return true;
00598       }
00599     }
00600   }
00601 
00602   /* Fix names of foreign keys being added */
00603   for (int j= 0; j < table_message.fk_constraint_size(); j++)
00604   {
00605     if (! table_message.fk_constraint(j).has_name())
00606     {
00607       std::string name(table->getMutableShare()->getTableName());
00608       char number[20];
00609 
00610       name.append("_ibfk_");
00611       snprintf(number, sizeof(number), "%d", j+1);
00612       name.append(number);
00613 
00614       message::Table::ForeignKeyConstraint *pfkey= table_message.mutable_fk_constraint(j);
00615       pfkey->set_name(name);
00616     }
00617   }
00618 
00619   if (not alter_info->drop_list.empty())
00620   {
00621     my_error(ER_CANT_DROP_FIELD_OR_KEY,
00622              MYF(0),
00623              alter_info->drop_list.front().name);
00624     return true;
00625   }
00626 
00627   if (not alter_info->alter_list.empty())
00628   {
00629     my_error(ER_CANT_DROP_FIELD_OR_KEY,
00630              MYF(0),
00631              alter_info->alter_list.front().name);
00632     return true;
00633   }
00634 
00635   if (not table_message.options().has_comment()
00636       && table->getMutableShare()->hasComment())
00637   {
00638     table_options->set_comment(table->getMutableShare()->getComment());
00639   }
00640 
00641   if (table->getShare()->getType())
00642   {
00643     table_message.set_type(message::Table::TEMPORARY);
00644   }
00645 
00646   table_message.set_creation_timestamp(table->getShare()->getTableMessage()->creation_timestamp());
00647   table_message.set_version(table->getShare()->getTableMessage()->version());
00648   table_message.set_uuid(table->getShare()->getTableMessage()->uuid());
00649 
00650   alter_info->create_list.swap(new_create_list);
00651   alter_info->key_list.swap(new_key_list);
00652 
00653   size_t num_engine_options= table_message.engine().options_size();
00654   size_t original_num_engine_options= original_proto.engine().options_size();
00655   for (size_t x= 0; x < original_num_engine_options; ++x)
00656   {
00657     bool found= false;
00658 
00659     for (size_t y= 0; y < num_engine_options; ++y)
00660     {
00661       found= not table_message.engine().options(y).name().compare(original_proto.engine().options(x).name());
00662 
00663       if (found)
00664         break;
00665     }
00666 
00667     if (not found)
00668     {
00669       message::Engine::Option *opt= table_message.mutable_engine()->add_options();
00670 
00671       opt->set_name(original_proto.engine().options(x).name());
00672       opt->set_state(original_proto.engine().options(x).state());
00673     }
00674   }
00675 
00676   drizzled::message::update(table_message);
00677 
00678   return false;
00679 }
00680 
00681 /* table_list should contain just one table */
00682 static int discard_or_import_tablespace(Session *session, TableList *table_list, tablespace_op_type tablespace_op)
00683 {
00684   /*
00685     Note that DISCARD/IMPORT TABLESPACE always is the only operation in an
00686     ALTER Table
00687   */
00688   TransactionServices &transaction_services= TransactionServices::singleton();
00689   session->set_proc_info("discard_or_import_tablespace");
00690 
00691  /*
00692    We set this flag so that ha_innobase::open and ::external_lock() do
00693    not complain when we lock the table
00694  */
00695   session->setDoingTablespaceOperation(true);
00696   Table* table= session->openTableLock(table_list, TL_WRITE);
00697   if (not table)
00698   {
00699     session->setDoingTablespaceOperation(false);
00700     return -1;
00701   }
00702 
00703   int error;
00704   do {
00705     error= table->cursor->ha_discard_or_import_tablespace(tablespace_op == DISCARD_TABLESPACE);
00706 
00707     session->set_proc_info("end");
00708 
00709     if (error)
00710       break;
00711 
00712     /* The ALTER Table is always in its own transaction */
00713     error= transaction_services.autocommitOrRollback(*session, false);
00714     if (not session->endActiveTransaction())
00715       error= 1;
00716 
00717     if (error)
00718       break;
00719 
00720     transaction_services.rawStatement(*session,
00721                                       *session->getQueryString(),
00722                                       *session->schema());
00723 
00724   } while(0);
00725 
00726   (void) transaction_services.autocommitOrRollback(*session, error);
00727   session->setDoingTablespaceOperation(false);
00728 
00729   if (error == 0)
00730   {
00731     session->my_ok();
00732     return 0;
00733   }
00734 
00735   table->print_error(error, MYF(0));
00736 
00737   return -1;
00738 }
00739 
00754 static bool alter_table_manage_keys(Session *session,
00755                                     Table *table, int indexes_were_disabled,
00756                                     enum enum_enable_or_disable keys_onoff)
00757 {
00758   int error= 0;
00759   switch (keys_onoff) {
00760   case ENABLE:
00761     error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
00762     break;
00763   case LEAVE_AS_IS:
00764     if (not indexes_were_disabled)
00765       break;
00766     /* fall-through: disabled indexes */
00767   case DISABLE:
00768     error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
00769   }
00770 
00771   if (error == HA_ERR_WRONG_COMMAND)
00772   {
00773     push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
00774                         ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
00775                         table->getMutableShare()->getTableName());
00776     error= 0;
00777   }
00778   else if (error)
00779   {
00780     table->print_error(error, MYF(0));
00781   }
00782 
00783   return(error);
00784 }
00785 
00786 static bool lockTableIfDifferent(Session &session,
00787                                  identifier::Table &original_table_identifier,
00788                                  identifier::Table &new_table_identifier,
00789                                  Table *name_lock)
00790 {
00791   /* Check that we are not trying to rename to an existing table */
00792   if (not (original_table_identifier == new_table_identifier))
00793   {
00794     if (original_table_identifier.isTmp())
00795     {
00796 
00797       if (session.find_temporary_table(new_table_identifier))
00798       {
00799         my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
00800         return false;
00801       }
00802     }
00803     else
00804     {
00805       if (session.lock_table_name_if_not_cached(new_table_identifier, &name_lock))
00806       {
00807         return false;
00808       }
00809 
00810       if (not name_lock)
00811       {
00812         my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
00813         return false;
00814       }
00815 
00816       if (plugin::StorageEngine::doesTableExist(session, new_table_identifier))
00817       {
00818         /* Table will be closed by Session::executeCommand() */
00819         my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
00820 
00821         {
00822           boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
00823           session.unlink_open_table(name_lock);
00824         }
00825 
00826         return false;
00827       }
00828     }
00829   }
00830 
00831   return true;
00832 }
00833 
00876 static bool internal_alter_table(Session *session,
00877                                  Table *table,
00878                                  identifier::Table &original_table_identifier,
00879                                  identifier::Table &new_table_identifier,
00880                                  HA_CREATE_INFO *create_info,
00881                                  const message::Table &original_proto,
00882                                  message::Table &create_proto,
00883                                  TableList *table_list,
00884                                  AlterInfo *alter_info,
00885                                  uint32_t order_num,
00886                                  Order *order,
00887                                  bool ignore)
00888 {
00889   int error= 0;
00890   char tmp_name[80];
00891   char old_name[32];
00892   ha_rows copied= 0;
00893   ha_rows deleted= 0;
00894 
00895   if (not original_table_identifier.isValid())
00896     return true;
00897 
00898   if (not new_table_identifier.isValid())
00899     return true;
00900 
00901   session->set_proc_info("init");
00902 
00903   table->use_all_columns();
00904 
00905   plugin::StorageEngine *new_engine;
00906   plugin::StorageEngine *original_engine;
00907 
00908   original_engine= table->getMutableShare()->getEngine();
00909 
00910   if (not create_info->db_type)
00911   {
00912     create_info->db_type= original_engine;
00913   }
00914   new_engine= create_info->db_type;
00915 
00916 
00917   create_proto.set_schema(new_table_identifier.getSchemaName());
00918   create_proto.set_type(new_table_identifier.getType());
00919 
00924   if (new_engine != original_engine &&
00925       not table->cursor->can_switch_engines())
00926   {
00927     assert(0);
00928     my_error(ER_ROW_IS_REFERENCED, MYF(0));
00929 
00930     return true;
00931   }
00932 
00933   if (original_engine->check_flag(HTON_BIT_ALTER_NOT_SUPPORTED) ||
00934       new_engine->check_flag(HTON_BIT_ALTER_NOT_SUPPORTED))
00935   {
00936     my_error(ER_ILLEGAL_HA, new_table_identifier);
00937 
00938     return true;
00939   }
00940 
00941   session->set_proc_info("setup");
00942 
00943   /*
00944    * test if no other bits except ALTER_RENAME and ALTER_KEYS_ONOFF are set
00945  */
00946   {
00947     bitset<32> tmp;
00948 
00949     tmp.set();
00950     tmp.reset(ALTER_RENAME);
00951     tmp.reset(ALTER_KEYS_ONOFF);
00952     tmp&= alter_info->flags;
00953 
00954     if (not (tmp.any()) && not table->getShare()->getType()) // no need to touch frm
00955     {
00956       switch (alter_info->keys_onoff)
00957       {
00958       case LEAVE_AS_IS:
00959         break;
00960 
00961       case ENABLE:
00962         /*
00963           wait_while_table_is_used() ensures that table being altered is
00964           opened only by this thread and that Table::TableShare::version
00965           of Table object corresponding to this table is 0.
00966           The latter guarantees that no DML statement will open this table
00967           until ALTER Table finishes (i.e. until close_thread_tables())
00968           while the fact that the table is still open gives us protection
00969           from concurrent DDL statements.
00970         */
00971         {
00972           boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* DDL wait for/blocker */
00973           wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
00974         }
00975         error= table->cursor->ha_enable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
00976 
00977         /* COND_refresh will be signaled in close_thread_tables() */
00978         break;
00979 
00980       case DISABLE:
00981         {
00982           boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* DDL wait for/blocker */
00983           wait_while_table_is_used(session, table, HA_EXTRA_FORCE_REOPEN);
00984         }
00985         error= table->cursor->ha_disable_indexes(HA_KEY_SWITCH_NONUNIQ_SAVE);
00986 
00987         /* COND_refresh will be signaled in close_thread_tables() */
00988         break;
00989       }
00990 
00991       if (error == HA_ERR_WRONG_COMMAND)
00992       {
00993         error= EE_OK;
00994         push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
00995                             ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
00996                             table->getAlias());
00997       }
00998 
00999       boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* Lock to remove all instances of table from table cache before ALTER */
01000       /*
01001         Unlike to the above case close_cached_table() below will remove ALL
01002         instances of Table from table cache (it will also remove table lock
01003         held by this thread). So to make actual table renaming and writing
01004         to binlog atomic we have to put them into the same critical section
01005         protected by table::Cache::singleton().mutex() mutex. This also removes gap for races between
01006         access() and rename_table() calls.
01007       */
01008 
01009       if (not error &&  not (original_table_identifier == new_table_identifier))
01010       {
01011         session->set_proc_info("rename");
01012         /*
01013           Then do a 'simple' rename of the table. First we need to close all
01014           instances of 'source' table.
01015         */
01016         session->close_cached_table(table);
01017         /*
01018           Then, we want check once again that target table does not exist.
01019           Actually the order of these two steps does not matter since
01020           earlier we took name-lock on the target table, so we do them
01021           in this particular order only to be consistent with 5.0, in which
01022           we don't take this name-lock and where this order really matters.
01023           @todo Investigate if we need this access() check at all.
01024         */
01025         if (plugin::StorageEngine::doesTableExist(*session, new_table_identifier))
01026         {
01027           my_error(ER_TABLE_EXISTS_ERROR, new_table_identifier);
01028           error= -1;
01029         }
01030         else
01031         {
01032           if (rename_table(*session, original_engine, original_table_identifier, new_table_identifier))
01033           {
01034             error= -1;
01035           }
01036         }
01037       }
01038 
01039       if (error == HA_ERR_WRONG_COMMAND)
01040       {
01041         error= EE_OK;
01042         push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
01043                             ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
01044                             table->getAlias());
01045       }
01046 
01047       if (not error)
01048       {
01049         TransactionServices &transaction_services= TransactionServices::singleton();
01050         transaction_services.allocateNewTransactionId();
01051         transaction_services.rawStatement(*session,
01052                                           *session->getQueryString(),
01053                                           *session->schema());
01054         session->my_ok();
01055       }
01056       else if (error > EE_OK) // If we have already set the error, we pass along -1
01057       {
01058         table->print_error(error, MYF(0));
01059       }
01060 
01061       table_list->table= NULL;
01062 
01063       return error;
01064     }
01065   }
01066 
01067   /* We have to do full alter table. */
01068   new_engine= create_info->db_type;
01069 
01070   if (prepare_alter_table(session, table, create_info, original_proto, create_proto, alter_info))
01071   {
01072     return true;
01073   }
01074 
01075   set_table_default_charset(create_info, new_table_identifier.getSchemaName().c_str());
01076 
01077   alter_info->build_method= HA_BUILD_OFFLINE;
01078 
01079   snprintf(tmp_name, sizeof(tmp_name), "%s-%lx_%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
01080 
01081   /* Create a temporary table with the new format */
01087   identifier::Table new_table_as_temporary(original_table_identifier.getSchemaName(),
01088                                          tmp_name,
01089                                          create_proto.type() != message::Table::TEMPORARY ? message::Table::INTERNAL :
01090                                          message::Table::TEMPORARY);
01091 
01092   /*
01093     Create a table with a temporary name.
01094     We don't log the statement, it will be logged later.
01095   */
01096   create_proto.set_name(new_table_as_temporary.getTableName());
01097   create_proto.mutable_engine()->set_name(create_info->db_type->getName());
01098 
01099   error= create_table(session,
01100                       new_table_as_temporary,
01101                       create_info, create_proto, alter_info, true, 0, false);
01102 
01103   if (error != 0)
01104   {
01105     return true;
01106   }
01107 
01108   /* Open the table so we need to copy the data to it. */
01109   Table *new_table= open_alter_table(session, table, new_table_as_temporary);
01110 
01111 
01112   if (not new_table)
01113   {
01114     plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
01115     return true;
01116   }
01117 
01118   /* Copy the data if necessary. */
01119   {
01120     /* We must not ignore bad input! */
01121     session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;  // calc cuted fields
01122     session->cuted_fields= 0L;
01123     session->set_proc_info("copy to tmp table");
01124     copied= deleted= 0;
01125 
01126     /* We don't want update TIMESTAMP fields during ALTER Table. */
01127     new_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
01128     new_table->next_number_field= new_table->found_next_number_field;
01129     error= copy_data_between_tables(session,
01130                                     table,
01131                                     new_table,
01132                                     alter_info->create_list,
01133                                     ignore,
01134                                     order_num,
01135                                     order,
01136                                     &copied,
01137                                     &deleted,
01138                                     alter_info->keys_onoff,
01139                                     alter_info->error_if_not_empty);
01140 
01141     /* We must not ignore bad input! */
01142     assert(session->count_cuted_fields == CHECK_FIELD_ERROR_FOR_NULL);
01143   }
01144 
01145   /* Now we need to resolve what just happened with the data copy. */
01146 
01147   if (error)
01148   {
01149 
01150     /*
01151       No default value was provided for new fields.
01152     */
01153     if (alter_info->error_if_not_empty && session->row_count)
01154     {
01155       my_error(ER_INVALID_ALTER_TABLE_FOR_NOT_NULL, MYF(0));
01156     }
01157 
01158     if (original_table_identifier.isTmp())
01159     {
01160       if (new_table)
01161       {
01162         /* close_temporary_table() frees the new_table pointer. */
01163         session->close_temporary_table(new_table);
01164       }
01165       else
01166       {
01167         plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
01168       }
01169 
01170       return true;
01171     }
01172     else
01173     {
01174       if (new_table)
01175       {
01176         /*
01177           Close the intermediate table that will be the new table.
01178           Note that MERGE tables do not have their children attached here.
01179         */
01180         new_table->intern_close_table();
01181         if (new_table->hasShare())
01182         {
01183           delete new_table->getMutableShare();
01184         }
01185 
01186         delete new_table;
01187       }
01188 
01189       boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
01190 
01191       plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
01192 
01193       return true;
01194     }
01195   }
01196   // Temporary table and success
01197   else if (original_table_identifier.isTmp())
01198   {
01199     /* Close lock if this is a transactional table */
01200     if (session->lock)
01201     {
01202       session->unlockTables(session->lock);
01203       session->lock= 0;
01204     }
01205 
01206     /* Remove link to old table and rename the new one */
01207     session->close_temporary_table(table);
01208 
01209     /* Should pass the 'new_name' as we store table name in the cache */
01210     new_table->getMutableShare()->setIdentifier(new_table_identifier);
01211 
01212     new_table_identifier.setPath(new_table_as_temporary.getPath());
01213 
01214     if (rename_table(*session, new_engine, new_table_as_temporary, new_table_identifier) != 0)
01215     {
01216       return true;
01217     }
01218   }
01219   // Normal table success
01220   else
01221   {
01222     if (new_table)
01223     {
01224       /*
01225         Close the intermediate table that will be the new table.
01226         Note that MERGE tables do not have their children attached here.
01227       */
01228       new_table->intern_close_table();
01229 
01230       if (new_table->hasShare())
01231       {
01232         delete new_table->getMutableShare();
01233       }
01234 
01235       delete new_table;
01236     }
01237 
01238     {
01239       boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex()); /* ALTER TABLE */
01240       /*
01241         Data is copied. Now we:
01242         1) Wait until all other threads close old version of table.
01243         2) Close instances of table open by this thread and replace them
01244         with exclusive name-locks.
01245         3) Rename the old table to a temp name, rename the new one to the
01246         old name.
01247         4) If we are under LOCK TABLES and don't do ALTER Table ... RENAME
01248         we reopen new version of table.
01249         5) Write statement to the binary log.
01250         6) If we are under LOCK TABLES and do ALTER Table ... RENAME we
01251         remove name-locks from list of open tables and table cache.
01252         7) If we are not not under LOCK TABLES we rely on close_thread_tables()
01253         call to remove name-locks from table cache and list of open table.
01254       */
01255 
01256       session->set_proc_info("rename result table");
01257 
01258       snprintf(old_name, sizeof(old_name), "%s2-%lx-%"PRIx64, TMP_FILE_PREFIX, (unsigned long) current_pid, session->thread_id);
01259 
01260       my_casedn_str(files_charset_info, old_name);
01261 
01262       wait_while_table_is_used(session, table, HA_EXTRA_PREPARE_FOR_RENAME);
01263       session->close_data_files_and_morph_locks(original_table_identifier);
01264 
01265       assert(not error);
01266 
01267       /*
01268         This leads to the storage engine (SE) not being notified for renames in
01269         rename_table(), because we just juggle with the FRM and nothing
01270         more. If we have an intermediate table, then we notify the SE that
01271         it should become the actual table. Later, we will recycle the old table.
01272         However, in case of ALTER Table RENAME there might be no intermediate
01273         table. This is when the old and new tables are compatible, according to
01274         compare_table(). Then, we need one additional call to
01275       */
01276       identifier::Table original_table_to_drop(original_table_identifier.getSchemaName(),
01277                                              old_name, create_proto.type() != message::Table::TEMPORARY ? message::Table::INTERNAL :
01278                                              message::Table::TEMPORARY);
01279 
01280       drizzled::error_t rename_error= EE_OK;
01281       if (rename_table(*session, original_engine, original_table_identifier, original_table_to_drop))
01282       {
01283         error= ER_ERROR_ON_RENAME;
01284         plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
01285       }
01286       else
01287       {
01288         if (rename_table(*session, new_engine, new_table_as_temporary, new_table_identifier) != 0)
01289         {
01290           /* Try to get everything back. */
01291           rename_error= ER_ERROR_ON_RENAME;
01292 
01293           plugin::StorageEngine::dropTable(*session, new_table_identifier);
01294 
01295           plugin::StorageEngine::dropTable(*session, new_table_as_temporary);
01296 
01297           rename_table(*session, original_engine, original_table_to_drop, original_table_identifier);
01298         }
01299         else
01300         {
01301           plugin::StorageEngine::dropTable(*session, original_table_to_drop);
01302         }
01303       }
01304 
01305       if (rename_error)
01306       {
01307         /*
01308           An error happened while we were holding exclusive name-lock on table
01309           being altered. To be safe under LOCK TABLES we should remove placeholders
01310           from list of open tables list and table cache.
01311         */
01312         session->unlink_open_table(table);
01313 
01314         return true;
01315       }
01316     }
01317 
01318     session->set_proc_info("end");
01319 
01320     TransactionServices &transaction_services= TransactionServices::singleton();
01321     transaction_services.rawStatement(*session,
01322                                       *session->getQueryString(),
01323                                       *session->schema());
01324     table_list->table= NULL;
01325   }
01326 
01327   /*
01328    * Field::store() may have called my_error().  If this is
01329    * the case, we must not send an ok packet, since
01330    * Diagnostics_area::is_set() will fail an assert.
01331  */
01332   if (session->is_error())
01333   {
01334     /* my_error() was called.  Return true (which means error...) */
01335     return true;
01336   }
01337 
01338   snprintf(tmp_name, sizeof(tmp_name), ER(ER_INSERT_INFO),
01339            (ulong) (copied + deleted), (ulong) deleted,
01340            (ulong) session->cuted_fields);
01341   session->my_ok(copied + deleted, 0, 0L, tmp_name);
01342   session->some_tables_deleted= false;
01343 
01344   return false;
01345 }
01346 
01347 bool alter_table(Session *session,
01348                  identifier::Table &original_table_identifier,
01349                  identifier::Table &new_table_identifier,
01350                  HA_CREATE_INFO *create_info,
01351                  const message::Table &original_proto,
01352                  message::Table &create_proto,
01353                  TableList *table_list,
01354                  AlterInfo *alter_info,
01355                  uint32_t order_num,
01356                  Order *order,
01357                  bool ignore)
01358 {
01359   bool error;
01360   Table *table;
01361 
01362   if (alter_info->tablespace_op != NO_TABLESPACE_OP)
01363   {
01364     /* DISCARD/IMPORT TABLESPACE is always alone in an ALTER Table */
01365     return discard_or_import_tablespace(session, table_list, alter_info->tablespace_op);
01366   }
01367 
01368   session->set_proc_info("init");
01369 
01370   if (not (table= session->openTableLock(table_list, TL_WRITE_ALLOW_READ)))
01371     return true;
01372 
01373   session->set_proc_info("gained write lock on table");
01374 
01375   /*
01376     Check that we are not trying to rename to an existing table,
01377     if one existed we get a lock, if we can't we error.
01378   */
01379   {
01380     Table *name_lock= NULL;
01381 
01382     if (not lockTableIfDifferent(*session, original_table_identifier, new_table_identifier, name_lock))
01383     {
01384       return true;
01385     }
01386 
01387     error= internal_alter_table(session,
01388                                 table,
01389                                 original_table_identifier,
01390                                 new_table_identifier,
01391                                 create_info,
01392                                 original_proto,
01393                                 create_proto,
01394                                 table_list,
01395                                 alter_info,
01396                                 order_num,
01397                                 order,
01398                                 ignore);
01399 
01400     if (name_lock)
01401     {
01402       boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
01403       session->unlink_open_table(name_lock);
01404     }
01405   }
01406 
01407   return error;
01408 }
01409 /* alter_table */
01410 
01411 static int
01412 copy_data_between_tables(Session *session,
01413                          Table *from, Table *to,
01414                          List<CreateField> &create,
01415                          bool ignore,
01416                          uint32_t order_num, Order *order,
01417                          ha_rows *copied,
01418                          ha_rows *deleted,
01419                          enum enum_enable_or_disable keys_onoff,
01420                          bool error_if_not_empty)
01421 {
01422   int error= 0;
01423   CopyField *copy,*copy_end;
01424   ulong found_count,delete_count;
01425   uint32_t length= 0;
01426   SortField *sortorder;
01427   ReadRecord info;
01428   TableList   tables;
01429   List<Item>   fields;
01430   List<Item>   all_fields;
01431   ha_rows examined_rows;
01432   bool auto_increment_field_copied= 0;
01433   uint64_t prev_insert_id;
01434 
01435   /*
01436     Turn off recovery logging since rollback of an alter table is to
01437     delete the new table so there is no need to log the changes to it.
01438 
01439     This needs to be done before external_lock
01440   */
01441   TransactionServices &transaction_services= TransactionServices::singleton();
01442 
01443   /*
01444    * LP Bug #552420
01445    *
01446    * Since open_temporary_table() doesn't invoke lockTables(), we
01447    * don't get the usual automatic call to StorageEngine::startStatement(), so
01448    * we manually call it here...
01449    */
01450   to->getMutableShare()->getEngine()->startStatement(session);
01451 
01452   if (!(copy= new CopyField[to->getShare()->sizeFields()]))
01453     return -1;
01454 
01455   if (to->cursor->ha_external_lock(session, F_WRLCK))
01456     return -1;
01457 
01458   /* We need external lock before we can disable/enable keys */
01459   alter_table_manage_keys(session, to, from->cursor->indexes_are_disabled(), keys_onoff);
01460 
01461   /* We can abort alter table for any table type */
01462   session->setAbortOnWarning(not ignore);
01463 
01464   from->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
01465   to->cursor->ha_start_bulk_insert(from->cursor->stats.records);
01466 
01467   List<CreateField>::iterator it(create.begin());
01468   copy_end= copy;
01469   for (Field **ptr= to->getFields(); *ptr ; ptr++)
01470   {
01471     CreateField* def=it++;
01472     if (def->field)
01473     {
01474       if (*ptr == to->next_number_field)
01475         auto_increment_field_copied= true;
01476 
01477       (copy_end++)->set(*ptr,def->field,0);
01478     }
01479 
01480   }
01481 
01482   found_count=delete_count=0;
01483 
01484   do
01485   {
01486     if (order)
01487     {
01488       if (to->getShare()->hasPrimaryKey() && to->cursor->primary_key_is_clustered())
01489       {
01490         char warn_buff[DRIZZLE_ERRMSG_SIZE];
01491         snprintf(warn_buff, sizeof(warn_buff),
01492                  _("order_st BY ignored because there is a user-defined clustered "
01493                    "index in the table '%-.192s'"),
01494                  from->getMutableShare()->getTableName());
01495         push_warning(session, DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
01496                      warn_buff);
01497       }
01498       else
01499       {
01500         FileSort filesort(*session);
01501         from->sort.io_cache= new internal::IO_CACHE;
01502 
01503         tables.table= from;
01504         tables.setTableName(from->getMutableShare()->getTableName());
01505         tables.alias= tables.getTableName();
01506         tables.setSchemaName(const_cast<char *>(from->getMutableShare()->getSchemaName()));
01507         error= 1;
01508 
01509         if (session->lex().select_lex.setup_ref_array(session, order_num) ||
01510             setup_order(session, session->lex().select_lex.ref_pointer_array,
01511                         &tables, fields, all_fields, order) ||
01512             !(sortorder= make_unireg_sortorder(order, &length, NULL)) ||
01513             (from->sort.found_records= filesort.run(from, sortorder, length,
01514                                                     (optimizer::SqlSelect *) 0, HA_POS_ERROR,
01515                                                     1, examined_rows)) == HA_POS_ERROR)
01516         {
01517           break;
01518         }
01519       }
01520     }
01521 
01522     /* Tell handler that we have values for all columns in the to table */
01523     to->use_all_columns();
01524 
01525     error= info.init_read_record(session, from, (optimizer::SqlSelect *) 0, 1, true);
01526     if (error)
01527     {
01528       to->print_error(errno, MYF(0));
01529 
01530       break;
01531     }
01532 
01533     if (ignore)
01534     {
01535       to->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
01536     }
01537 
01538     session->row_count= 0;
01539     to->restoreRecordAsDefault();        // Create empty record
01540     while (not (error=info.read_record(&info)))
01541     {
01542       if (session->getKilled())
01543       {
01544         session->send_kill_message();
01545         error= 1;
01546         break;
01547       }
01548       session->row_count++;
01549       /* Return error if source table isn't empty. */
01550       if (error_if_not_empty)
01551       {
01552         error= 1;
01553         break;
01554       }
01555       if (to->next_number_field)
01556       {
01557         if (auto_increment_field_copied)
01558           to->auto_increment_field_not_null= true;
01559         else
01560           to->next_number_field->reset();
01561       }
01562 
01563       for (CopyField *copy_ptr= copy; copy_ptr != copy_end ; copy_ptr++)
01564       {
01565         if (not copy->to_field->hasDefault() and copy->from_null_ptr and  *copy->from_null_ptr & copy->from_bit)
01566         {
01567           copy->to_field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN,
01568                                       ER_WARN_DATA_TRUNCATED, 1);
01569           copy->to_field->reset();
01570           error= 1;
01571           break;
01572         }
01573 
01574         copy_ptr->do_copy(copy_ptr);
01575       }
01576 
01577       if (error)
01578       {
01579         break;
01580       }
01581 
01582       prev_insert_id= to->cursor->next_insert_id;
01583       error= to->cursor->insertRecord(to->record[0]);
01584       to->auto_increment_field_not_null= false;
01585 
01586       if (error)
01587       {
01588         if (!ignore || to->cursor->is_fatal_error(error, HA_CHECK_DUP))
01589         {
01590           to->print_error(error, MYF(0));
01591           break;
01592         }
01593         to->cursor->restore_auto_increment(prev_insert_id);
01594         delete_count++;
01595       }
01596       else
01597       {
01598         found_count++;
01599       }
01600     }
01601 
01602     info.end_read_record();
01603     from->free_io_cache();
01604     delete [] copy;       // This is never 0
01605 
01606     if (to->cursor->ha_end_bulk_insert() && error <= 0)
01607     {
01608       to->print_error(errno, MYF(0));
01609       error= 1;
01610     }
01611     to->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01612 
01613     /*
01614       Ensure that the new table is saved properly to disk so that we
01615       can do a rename
01616     */
01617     if (transaction_services.autocommitOrRollback(*session, false))
01618       error= 1;
01619 
01620     if (not session->endActiveTransaction())
01621       error= 1;
01622 
01623   } while (0);
01624 
01625   session->setAbortOnWarning(false);
01626   from->free_io_cache();
01627   *copied= found_count;
01628   *deleted=delete_count;
01629   to->cursor->ha_release_auto_increment();
01630 
01631   if (to->cursor->ha_external_lock(session, F_UNLCK))
01632   {
01633     error=1;
01634   }
01635 
01636   return(error > 0 ? -1 : 0);
01637 }
01638 
01639 static Table *open_alter_table(Session *session, Table *table, identifier::Table &identifier)
01640 {
01641   /* Open the table so we need to copy the data to it. */
01642   if (table->getShare()->getType())
01643   {
01644     TableList tbl;
01645     tbl.setSchemaName(const_cast<char *>(identifier.getSchemaName().c_str()));
01646     tbl.alias= const_cast<char *>(identifier.getTableName().c_str());
01647     tbl.setTableName(const_cast<char *>(identifier.getTableName().c_str()));
01648 
01649     /* Table is in session->temporary_tables */
01650     return session->openTable(&tbl, (bool*) 0, DRIZZLE_LOCK_IGNORE_FLUSH);
01651   }
01652   else
01653   {
01654     /* Open our intermediate table */
01655     return session->open_temporary_table(identifier, false);
01656   }
01657 }
01658 
01659 } /* namespace drizzled */