Drizzled Public API Documentation

cursor.cc
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008 Sun Microsystems, Inc.
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; version 2 of the License.
00009  *
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018  */
00019 
00026 #include <config.h>
00027 #include <fcntl.h>
00028 #include <drizzled/error.h>
00029 #include <drizzled/field/epoch.h>
00030 #include <drizzled/gettext.h>
00031 #include <drizzled/internal/my_sys.h>
00032 #include <drizzled/item/empty_string.h>
00033 #include <drizzled/item/int.h>
00034 #include <drizzled/lock.h>
00035 #include <drizzled/message/table.h>
00036 #include <drizzled/optimizer/cost_vector.h>
00037 #include <drizzled/plugin/client.h>
00038 #include <drizzled/plugin/event_observer.h>
00039 #include <drizzled/plugin/storage_engine.h>
00040 #include <drizzled/probes.h>
00041 #include <drizzled/session.h>
00042 #include <drizzled/sql_base.h>
00043 #include <drizzled/sql_parse.h>
00044 #include <drizzled/transaction_services.h>
00045 #include <drizzled/key.h>
00046 #include <drizzled/sql_lex.h>
00047 
00048 using namespace std;
00049 
00050 namespace drizzled {
00051 
00052 /****************************************************************************
00053 ** General Cursor functions
00054 ****************************************************************************/
00055 Cursor::Cursor(plugin::StorageEngine &engine_arg,
00056                Table &arg)
00057   : table(arg),
00058     engine(engine_arg),
00059     estimation_rows_to_insert(0),
00060     ref(0),
00061     key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
00062     ref_length(sizeof(internal::my_off_t)),
00063     inited(NONE),
00064     locked(false),
00065     next_insert_id(0), insert_id_for_cur_row(0)
00066 { }
00067 
00068 Cursor::~Cursor(void)
00069 {
00070   assert(locked == false);
00071   /* TODO: assert(inited == NONE); */
00072 }
00073 
00074 
00075 /*
00076  * @note this only used in
00077  * optimizer::QuickRangeSelect::init_ror_merged_scan(bool reuse_handler) as
00078  * of the writing of this comment. -Brian
00079  */
00080 Cursor *Cursor::clone(memory::Root *mem_root)
00081 {
00082   Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
00083 
00084   /*
00085     Allocate Cursor->ref here because otherwise ha_open will allocate it
00086     on this->table->mem_root and we will not be able to reclaim that memory
00087     when the clone Cursor object is destroyed.
00088   */
00089   if (!(new_handler->ref= (unsigned char*) mem_root->alloc_root(ALIGN_SIZE(ref_length)*2)))
00090     return NULL;
00091 
00092   identifier::Table identifier(getTable()->getShare()->getSchemaName(),
00093                              getTable()->getShare()->getTableName(),
00094                              getTable()->getShare()->getType());
00095 
00096   if (new_handler && !new_handler->ha_open(identifier,
00097                                            getTable()->getDBStat(),
00098                                            HA_OPEN_IGNORE_IF_LOCKED))
00099     return new_handler;
00100   return NULL;
00101 }
00102 
00103 /*
00104   DESCRIPTION
00105     given a buffer with a key value, and a map of keyparts
00106     that are present in this value, returns the length of the value
00107 */
00108 uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
00109 {
00110   /* works only with key prefixes */
00111   assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
00112 
00113   const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
00114   const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
00115   uint32_t length= 0;
00116 
00117   while (key_part_found < end_key_part_found && keypart_map_arg)
00118   {
00119     length+= key_part_found->store_length;
00120     keypart_map_arg >>= 1;
00121     key_part_found++;
00122   }
00123   return length;
00124 }
00125 
00126 int Cursor::startIndexScan(uint32_t idx, bool sorted)
00127 {
00128   int result;
00129   assert(inited == NONE);
00130   if (!(result= doStartIndexScan(idx, sorted)))
00131     inited=INDEX;
00132   end_range= NULL;
00133   return result;
00134 }
00135 
00136 int Cursor::endIndexScan()
00137 {
00138   assert(inited==INDEX);
00139   inited=NONE;
00140   end_range= NULL;
00141   return(doEndIndexScan());
00142 }
00143 
00144 int Cursor::startTableScan(bool scan)
00145 {
00146   int result;
00147   assert(inited==NONE || (inited==RND && scan));
00148   inited= (result= doStartTableScan(scan)) ? NONE: RND;
00149 
00150   return result;
00151 }
00152 
00153 int Cursor::endTableScan()
00154 {
00155   assert(inited==RND);
00156   inited=NONE;
00157   return(doEndTableScan());
00158 }
00159 
00160 int Cursor::ha_index_or_rnd_end()
00161 {
00162   return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
00163 }
00164 
00165 void Cursor::ha_start_bulk_insert(ha_rows rows)
00166 {
00167   estimation_rows_to_insert= rows;
00168   start_bulk_insert(rows);
00169 }
00170 
00171 int Cursor::ha_end_bulk_insert()
00172 {
00173   estimation_rows_to_insert= 0;
00174   return end_bulk_insert();
00175 }
00176 
00177 const key_map *Cursor::keys_to_use_for_scanning()
00178 {
00179   return &key_map_empty;
00180 }
00181 
00182 bool Cursor::has_transactions()
00183 {
00184   return (getTable()->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
00185 }
00186 
00187 void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset) const
00188 {
00189   (getTable()->in_use->status_var.*offset)++;
00190 }
00191 
00192 void **Cursor::ha_data(Session *session) const
00193 {
00194   return session->getEngineData(getEngine());
00195 }
00196 
00197 bool Cursor::is_fatal_error(int error, uint32_t flags)
00198 {
00199   if (!error ||
00200       ((flags & HA_CHECK_DUP_KEY) &&
00201        (error == HA_ERR_FOUND_DUPP_KEY ||
00202         error == HA_ERR_FOUND_DUPP_UNIQUE)))
00203     return false;
00204   return true;
00205 }
00206 
00207 
00208 ha_rows Cursor::records() { return stats.records; }
00209 uint64_t Cursor::tableSize() { return stats.index_file_length + stats.data_file_length; }
00210 uint64_t Cursor::rowSize() { return getTable()->getRecordLength() + getTable()->sizeFields(); }
00211 
00212 int Cursor::doOpen(const identifier::Table &identifier, int mode, uint32_t test_if_locked)
00213 {
00214   return open(identifier.getPath().c_str(), mode, test_if_locked);
00215 }
00216 
00223 int Cursor::ha_open(const identifier::Table &identifier,
00224                     int mode,
00225                     int test_if_locked)
00226 {
00227   int error;
00228 
00229   if ((error= doOpen(identifier, mode, test_if_locked)))
00230   {
00231     if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
00232         (getTable()->db_stat & HA_TRY_READ_ONLY))
00233     {
00234       getTable()->db_stat|=HA_READ_ONLY;
00235       error= doOpen(identifier, O_RDONLY,test_if_locked);
00236     }
00237   }
00238   if (error)
00239   {
00240     errno= error;                            /* Safeguard */
00241   }
00242   else
00243   {
00244     if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
00245       getTable()->db_stat|=HA_READ_ONLY;
00246     (void) extra(HA_EXTRA_NO_READCHECK);  // Not needed in SQL
00247 
00248     /* ref is already allocated for us if we're called from Cursor::clone() */
00249     if (!ref && !(ref= (unsigned char*) getTable()->alloc_root(ALIGN_SIZE(ref_length)*2)))
00250     {
00251       close();
00252       error=HA_ERR_OUT_OF_MEM;
00253     }
00254     else
00255       dup_ref=ref+ALIGN_SIZE(ref_length);
00256   }
00257   return error;
00258 }
00259 
00266 int Cursor::read_first_row(unsigned char * buf, uint32_t primary_key)
00267 {
00268   int error;
00269 
00270   ha_statistic_increment(&system_status_var::ha_read_first_count);
00271 
00272   /*
00273     If there is very few deleted rows in the table, find the first row by
00274     scanning the table.
00275     @todo remove the test for HA_READ_ORDER
00276   */
00277   if (stats.deleted < 10 || primary_key >= MAX_KEY ||
00278       !(getTable()->index_flags(primary_key) & HA_READ_ORDER))
00279   {
00280     error= startTableScan(1);
00281     if (error == 0)
00282     {
00283       while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
00284       (void) endTableScan();
00285     }
00286   }
00287   else
00288   {
00289     /* Find the first row through the primary key */
00290     error= startIndexScan(primary_key, 0);
00291     if (error == 0)
00292     {
00293       error=index_first(buf);
00294       (void) endIndexScan();
00295     }
00296   }
00297   return error;
00298 }
00299 
00311 inline uint64_t
00312 compute_next_insert_id(uint64_t nr, drizzle_system_variables *variables)
00313 {
00314   if (variables->auto_increment_increment == 1)
00315     return (nr+1); // optimization of the formula below
00316   nr= (((nr+ variables->auto_increment_increment -
00317          variables->auto_increment_offset)) /
00318        (uint64_t) variables->auto_increment_increment);
00319   return (nr* (uint64_t) variables->auto_increment_increment +
00320           variables->auto_increment_offset);
00321 }
00322 
00323 
00324 void Cursor::adjust_next_insert_id_after_explicit_value(uint64_t nr)
00325 {
00326   /*
00327     If we have set Session::next_insert_id previously and plan to insert an
00328     explicitely-specified value larger than this, we need to increase
00329     Session::next_insert_id to be greater than the explicit value.
00330   */
00331   if ((next_insert_id > 0) && (nr >= next_insert_id))
00332     set_next_insert_id(compute_next_insert_id(nr, &getTable()->in_use->variables));
00333 }
00334 
00335 
00351 inline uint64_t
00352 prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
00353 {
00354   if (unlikely(nr < variables->auto_increment_offset))
00355   {
00356     /*
00357       There's nothing good we can do here. That is a pathological case, where
00358       the offset is larger than the column's max possible value, i.e. not even
00359       the first sequence value may be inserted. User will receive warning.
00360     */
00361     return nr;
00362   }
00363   if (variables->auto_increment_increment == 1)
00364     return nr; // optimization of the formula below
00365   nr= (((nr - variables->auto_increment_offset)) /
00366        (uint64_t) variables->auto_increment_increment);
00367   return (nr * (uint64_t) variables->auto_increment_increment +
00368           variables->auto_increment_offset);
00369 }
00370 
00371 
00441 #define AUTO_INC_DEFAULT_NB_ROWS 1 // Some prefer 1024 here
00442 #define AUTO_INC_DEFAULT_NB_MAX_BITS 16
00443 #define AUTO_INC_DEFAULT_NB_MAX ((1 << AUTO_INC_DEFAULT_NB_MAX_BITS) - 1)
00444 
00445 int Cursor::update_auto_increment()
00446 {
00447   uint64_t nr, nb_reserved_values;
00448   bool append= false;
00449   Session *session= getTable()->in_use;
00450   drizzle_system_variables *variables= &session->variables;
00451 
00452   /*
00453     next_insert_id is a "cursor" into the reserved interval, it may go greater
00454     than the interval, but not smaller.
00455   */
00456   assert(next_insert_id >= auto_inc_interval_for_cur_row.minimum());
00457 
00458   /* We check if auto_increment_field_not_null is false
00459      for an auto increment column, not a magic value like NULL is.
00460      same as sql_mode=NO_AUTO_VALUE_ON_ZERO */
00461 
00462   if ((nr= getTable()->next_number_field->val_int()) != 0
00463       || getTable()->auto_increment_field_not_null)
00464   {
00465     /*
00466       Update next_insert_id if we had already generated a value in this
00467       statement (case of INSERT VALUES(null),(3763),(null):
00468       the last NULL needs to insert 3764, not the value of the first NULL plus
00469       1).
00470     */
00471     adjust_next_insert_id_after_explicit_value(nr);
00472     insert_id_for_cur_row= 0; // didn't generate anything
00473 
00474     return 0;
00475   }
00476 
00477   if ((nr= next_insert_id) >= auto_inc_interval_for_cur_row.maximum())
00478   {
00479     /* next_insert_id is beyond what is reserved, so we reserve more. */
00480     const Discrete_interval *forced=
00481       session->auto_inc_intervals_forced.get_next();
00482     if (forced != NULL)
00483     {
00484       nr= forced->minimum();
00485       nb_reserved_values= forced->values();
00486     }
00487     else
00488     {
00489       /*
00490         Cursor::estimation_rows_to_insert was set by
00491         Cursor::ha_start_bulk_insert(); if 0 it means "unknown".
00492       */
00493       uint32_t nb_already_reserved_intervals=
00494         session->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements();
00495       uint64_t nb_desired_values;
00496       /*
00497         If an estimation was given to the engine:
00498         - use it.
00499         - if we already reserved numbers, it means the estimation was
00500         not accurate, then we'll reserve 2*AUTO_INC_DEFAULT_NB_ROWS the 2nd
00501         time, twice that the 3rd time etc.
00502         If no estimation was given, use those increasing defaults from the
00503         start, starting from AUTO_INC_DEFAULT_NB_ROWS.
00504         Don't go beyond a max to not reserve "way too much" (because
00505         reservation means potentially losing unused values).
00506       */
00507       if (nb_already_reserved_intervals == 0 &&
00508           (estimation_rows_to_insert > 0))
00509         nb_desired_values= estimation_rows_to_insert;
00510       else /* go with the increasing defaults */
00511       {
00512         /* avoid overflow in formula, with this if() */
00513         if (nb_already_reserved_intervals <= AUTO_INC_DEFAULT_NB_MAX_BITS)
00514         {
00515           nb_desired_values= AUTO_INC_DEFAULT_NB_ROWS *
00516             (1 << nb_already_reserved_intervals);
00517           set_if_smaller(nb_desired_values, (uint64_t)AUTO_INC_DEFAULT_NB_MAX);
00518         }
00519         else
00520           nb_desired_values= AUTO_INC_DEFAULT_NB_MAX;
00521       }
00522       /* This call ignores all its parameters but nr, currently */
00523       get_auto_increment(variables->auto_increment_offset,
00524                          variables->auto_increment_increment,
00525                          nb_desired_values, &nr,
00526                          &nb_reserved_values);
00527       if (nr == ~(uint64_t) 0)
00528         return HA_ERR_AUTOINC_READ_FAILED;  // Mark failure
00529 
00530       /*
00531         That rounding below should not be needed when all engines actually
00532         respect offset and increment in get_auto_increment(). But they don't
00533         so we still do it. Wonder if for the not-first-in-index we should do
00534         it. Hope that this rounding didn't push us out of the interval; even
00535         if it did we cannot do anything about it (calling the engine again
00536         will not help as we inserted no row).
00537       */
00538       nr= compute_next_insert_id(nr-1, variables);
00539     }
00540 
00541     if (getTable()->getShare()->next_number_keypart == 0)
00542     {
00543       /* We must defer the appending until "nr" has been possibly truncated */
00544       append= true;
00545     }
00546   }
00547 
00548   if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
00549   {
00550     /*
00551       first test if the query was aborted due to strict mode constraints
00552     */
00553     if (session->getKilled() == Session::KILL_BAD_DATA)
00554       return HA_ERR_AUTOINC_ERANGE;
00555 
00556     /*
00557       field refused this value (overflow) and truncated it, use the result of
00558       the truncation (which is going to be inserted); however we try to
00559       decrease it to honour auto_increment_* variables.
00560       That will shift the left bound of the reserved interval, we don't
00561       bother shifting the right bound (anyway any other value from this
00562       interval will cause a duplicate key).
00563     */
00564     nr= prev_insert_id(getTable()->next_number_field->val_int(), variables);
00565     if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
00566       nr= getTable()->next_number_field->val_int();
00567   }
00568   if (append)
00569   {
00570     auto_inc_interval_for_cur_row.replace(nr, nb_reserved_values,
00571                                           variables->auto_increment_increment);
00572   }
00573 
00574   /*
00575     Record this autogenerated value. If the caller then
00576     succeeds to insert this value, it will call
00577     record_first_successful_insert_id_in_cur_stmt()
00578     which will set first_successful_insert_id_in_cur_stmt if it's not
00579     already set.
00580   */
00581   insert_id_for_cur_row= nr;
00582   /*
00583     Set next insert id to point to next auto-increment value to be able to
00584     handle multi-row statements.
00585   */
00586   set_next_insert_id(compute_next_insert_id(nr, variables));
00587 
00588   return 0;
00589 }
00590 
00591 
00608 void Cursor::ha_release_auto_increment()
00609 {
00610   release_auto_increment();
00611   insert_id_for_cur_row= 0;
00612   auto_inc_interval_for_cur_row.replace(0, 0, 0);
00613   if (next_insert_id > 0)
00614   {
00615     next_insert_id= 0;
00616     /*
00617       this statement used forced auto_increment values if there were some,
00618       wipe them away for other statements.
00619     */
00620     getTable()->in_use->auto_inc_intervals_forced.empty();
00621   }
00622 }
00623 
00624 void Cursor::drop_table(const char *)
00625 {
00626   close();
00627 }
00628 
00629 
00645 int Cursor::ha_check(Session *, HA_CHECK_OPT *)
00646 {
00647   return HA_ADMIN_OK;
00648 }
00649 
00655 inline
00656 void
00657 Cursor::setTransactionReadWrite()
00658 {
00659   ResourceContext *resource_context;
00660 
00661   /*
00662    * If the cursor has not context for execution then there should be no
00663    * possible resource to gain (and if there is... then there is a bug such
00664    * that in_use should have been set.
00665  */
00666   if (not getTable()->in_use)
00667     return;
00668 
00669   resource_context= getTable()->in_use->getResourceContext(getEngine());
00670   /*
00671     When a storage engine method is called, the transaction must
00672     have been started, unless it's a DDL call, for which the
00673     storage engine starts the transaction internally, and commits
00674     it internally, without registering in the ha_list.
00675     Unfortunately here we can't know know for sure if the engine
00676     has registered the transaction or not, so we must check.
00677   */
00678   if (resource_context->isStarted())
00679   {
00680     resource_context->markModifiedData();
00681   }
00682 }
00683 
00684 
00695 int
00696 Cursor::ha_delete_all_rows()
00697 {
00698   setTransactionReadWrite();
00699 
00700   int result= delete_all_rows();
00701 
00702   if (result == 0)
00703   {
00710     Session *const session= getTable()->in_use;
00711     TransactionServices &transaction_services= TransactionServices::singleton();
00712     transaction_services.truncateTable(*session, *getTable());
00713   }
00714 
00715   return result;
00716 }
00717 
00718 
00725 int
00726 Cursor::ha_reset_auto_increment(uint64_t value)
00727 {
00728   setTransactionReadWrite();
00729 
00730   return reset_auto_increment(value);
00731 }
00732 
00733 
00740 int
00741 Cursor::ha_analyze(Session* session, HA_CHECK_OPT*)
00742 {
00743   setTransactionReadWrite();
00744 
00745   return analyze(session);
00746 }
00747 
00754 int
00755 Cursor::ha_disable_indexes(uint32_t mode)
00756 {
00757   setTransactionReadWrite();
00758 
00759   return disable_indexes(mode);
00760 }
00761 
00762 
00769 int
00770 Cursor::ha_enable_indexes(uint32_t mode)
00771 {
00772   setTransactionReadWrite();
00773 
00774   return enable_indexes(mode);
00775 }
00776 
00777 
00784 int
00785 Cursor::ha_discard_or_import_tablespace(bool discard)
00786 {
00787   setTransactionReadWrite();
00788 
00789   return discard_or_import_tablespace(discard);
00790 }
00791 
00798 void
00799 Cursor::closeMarkForDelete(const char *name)
00800 {
00801   setTransactionReadWrite();
00802 
00803   return drop_table(name);
00804 }
00805 
00806 int Cursor::index_next_same(unsigned char *buf, const unsigned char *key, uint32_t keylen)
00807 {
00808   int error;
00809   if (!(error=index_next(buf)))
00810   {
00811     ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
00812     unsigned char *save_record_0= NULL;
00813     KeyInfo *key_info= NULL;
00814     KeyPartInfo *key_part;
00815     KeyPartInfo *key_part_end= NULL;
00816 
00817     /*
00818       key_cmp_if_same() compares table->getInsertRecord() against 'key'.
00819       In parts it uses table->getInsertRecord() directly, in parts it uses
00820       field objects with their local pointers into table->getInsertRecord().
00821       If 'buf' is distinct from table->getInsertRecord(), we need to move
00822       all record references. This is table->getInsertRecord() itself and
00823       the field pointers of the fields used in this key.
00824     */
00825     if (ptrdiff)
00826     {
00827       save_record_0= getTable()->getInsertRecord();
00828       getTable()->record[0]= buf;
00829       key_info= getTable()->key_info + active_index;
00830       key_part= key_info->key_part;
00831       key_part_end= key_part + key_info->key_parts;
00832       for (; key_part < key_part_end; key_part++)
00833       {
00834         assert(key_part->field);
00835         key_part->field->move_field_offset(ptrdiff);
00836       }
00837     }
00838 
00839     if (key_cmp_if_same(getTable(), key, active_index, keylen))
00840     {
00841       getTable()->status=STATUS_NOT_FOUND;
00842       error=HA_ERR_END_OF_FILE;
00843     }
00844 
00845     /* Move back if necessary. */
00846     if (ptrdiff)
00847     {
00848       getTable()->record[0]= save_record_0;
00849       for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
00850         key_part->field->move_field_offset(-ptrdiff);
00851     }
00852   }
00853   return error;
00854 }
00855 
00856 
00857 /****************************************************************************
00858 ** Some general functions that isn't in the Cursor class
00859 ****************************************************************************/
00860 
00882 double Cursor::index_only_read_time(uint32_t keynr, double key_records)
00883 {
00884   uint32_t keys_per_block= (stats.block_size/2/
00885       (getTable()->key_info[keynr].key_length + ref_length) + 1);
00886   return ((double) (key_records + keys_per_block-1) /
00887           (double) keys_per_block);
00888 }
00889 
00890 
00891 /****************************************************************************
00892  * Default MRR implementation (MRR to non-MRR converter)
00893  ***************************************************************************/
00894 
00926 ha_rows
00927 Cursor::multi_range_read_info_const(uint32_t keyno, RANGE_SEQ_IF *seq,
00928                                      void *seq_init_param,
00929                                      uint32_t ,
00930                                      uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
00931 {
00932   KEY_MULTI_RANGE range;
00933   range_seq_t seq_it;
00934   ha_rows rows, total_rows= 0;
00935   uint32_t n_ranges=0;
00936 
00937   /* Default MRR implementation doesn't need buffer */
00938   *bufsz= 0;
00939 
00940   seq_it= seq->init(seq_init_param, n_ranges, *flags);
00941   while (!seq->next(seq_it, &range))
00942   {
00943     n_ranges++;
00944     key_range *min_endp, *max_endp;
00945     {
00946       min_endp= range.start_key.length? &range.start_key : NULL;
00947       max_endp= range.end_key.length? &range.end_key : NULL;
00948     }
00949     if ((range.range_flag & UNIQUE_RANGE) && !(range.range_flag & NULL_RANGE))
00950       rows= 1; /* there can be at most one row */
00951     else
00952     {
00953       if (HA_POS_ERROR == (rows= this->records_in_range(keyno, min_endp,
00954                                                         max_endp)))
00955       {
00956         /* Can't scan one range => can't do MRR scan at all */
00957         total_rows= HA_POS_ERROR;
00958         break;
00959       }
00960     }
00961     total_rows += rows;
00962   }
00963 
00964   if (total_rows != HA_POS_ERROR)
00965   {
00966     /* The following calculation is the same as in multi_range_read_info(): */
00967     *flags |= HA_MRR_USE_DEFAULT_IMPL;
00968     cost->zero();
00969     cost->setAvgIOCost(1); /* assume random seeks */
00970     if ((*flags & HA_MRR_INDEX_ONLY) && total_rows > 2)
00971       cost->setIOCount(index_only_read_time(keyno, (uint32_t)total_rows));
00972     else
00973       cost->setIOCount(read_time(keyno, n_ranges, total_rows));
00974     cost->setCpuCost((double) total_rows / TIME_FOR_COMPARE + 0.01);
00975   }
00976   return total_rows;
00977 }
00978 
00979 
01014 int Cursor::multi_range_read_info(uint32_t keyno, uint32_t n_ranges, uint32_t n_rows,
01015                                    uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
01016 {
01017   *bufsz= 0; /* Default implementation doesn't need a buffer */
01018 
01019   *flags |= HA_MRR_USE_DEFAULT_IMPL;
01020 
01021   cost->zero();
01022   cost->setAvgIOCost(1); /* assume random seeks */
01023 
01024   /* Produce the same cost as non-MRR code does */
01025   if (*flags & HA_MRR_INDEX_ONLY)
01026     cost->setIOCount(index_only_read_time(keyno, n_rows));
01027   else
01028     cost->setIOCount(read_time(keyno, n_ranges, n_rows));
01029   return 0;
01030 }
01031 
01032 
01074 int
01075 Cursor::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
01076                                uint32_t n_ranges, uint32_t mode)
01077 {
01078   mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
01079   mrr_funcs= *seq_funcs;
01080   mrr_is_output_sorted= test(mode & HA_MRR_SORTED);
01081   mrr_have_range= false;
01082 
01083   return 0;
01084 }
01085 
01086 
01100 int Cursor::multi_range_read_next(char **range_info)
01101 {
01102   int result= 0;
01103   int range_res= 0;
01104 
01105   if (not mrr_have_range)
01106   {
01107     mrr_have_range= true;
01108     goto start;
01109   }
01110 
01111   do
01112   {
01113     /* Save a call if there can be only one row in range. */
01114     if (mrr_cur_range.range_flag != (UNIQUE_RANGE | EQ_RANGE))
01115     {
01116       result= read_range_next();
01117       /* On success or non-EOF errors jump to the end. */
01118       if (result != HA_ERR_END_OF_FILE)
01119         break;
01120     }
01121     else
01122     {
01123       if (was_semi_consistent_read())
01124         goto scan_it_again;
01125       /*
01126         We need to set this for the last range only, but checking this
01127         condition is more expensive than just setting the result code.
01128       */
01129       result= HA_ERR_END_OF_FILE;
01130     }
01131 
01132 start:
01133     /* Try the next range(s) until one matches a record. */
01134     while (!(range_res= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
01135     {
01136 scan_it_again:
01137       result= read_range_first(mrr_cur_range.start_key.keypart_map ?
01138                                  &mrr_cur_range.start_key : 0,
01139                                mrr_cur_range.end_key.keypart_map ?
01140                                  &mrr_cur_range.end_key : 0,
01141                                test(mrr_cur_range.range_flag & EQ_RANGE),
01142                                mrr_is_output_sorted);
01143       if (result != HA_ERR_END_OF_FILE)
01144         break;
01145     }
01146   }
01147   while ((result == HA_ERR_END_OF_FILE) && !range_res);
01148 
01149   *range_info= mrr_cur_range.ptr;
01150   return result;
01151 }
01152 
01153 
01154 /* **************************************************************************
01155  * DS-MRR implementation ends
01156  ***************************************************************************/
01157 
01176 int Cursor::read_range_first(const key_range *start_key,
01177                              const key_range *end_key,
01178                              bool eq_range_arg,
01179                              bool )
01180 {
01181   int result;
01182 
01183   eq_range= eq_range_arg;
01184   end_range= 0;
01185   if (end_key)
01186   {
01187     end_range= &save_end_range;
01188     save_end_range= *end_key;
01189     key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
01190           (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
01191   }
01192   range_key_part= getTable()->key_info[active_index].key_part;
01193 
01194   if (!start_key)     // Read first record
01195     result= index_first(getTable()->getInsertRecord());
01196   else
01197     result= index_read_map(getTable()->getInsertRecord(),
01198                            start_key->key,
01199                            start_key->keypart_map,
01200                            start_key->flag);
01201   if (result)
01202     return((result == HA_ERR_KEY_NOT_FOUND)
01203     ? HA_ERR_END_OF_FILE
01204     : result);
01205 
01206   return (compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
01207 }
01208 
01209 
01223 int Cursor::read_range_next()
01224 {
01225   int result;
01226 
01227   if (eq_range)
01228   {
01229     /* We trust that index_next_same always gives a row in range */
01230     return(index_next_same(getTable()->getInsertRecord(),
01231                                 end_range->key,
01232                                 end_range->length));
01233   }
01234   result= index_next(getTable()->getInsertRecord());
01235   if (result)
01236     return result;
01237   return(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
01238 }
01239 
01240 
01256 int Cursor::compare_key(key_range *range)
01257 {
01258   int cmp;
01259   if (not range)
01260     return 0;         // No max range
01261   cmp= key_cmp(range_key_part, range->key, range->length);
01262   if (!cmp)
01263     cmp= key_compare_result_on_equal;
01264   return cmp;
01265 }
01266 
01267 int Cursor::index_read_idx_map(unsigned char * buf, uint32_t index,
01268                                 const unsigned char * key,
01269                                 key_part_map keypart_map,
01270                                 enum ha_rkey_function find_flag)
01271 {
01272   int error, error1;
01273   error= doStartIndexScan(index, 0);
01274   if (!error)
01275   {
01276     error= index_read_map(buf, key, keypart_map, find_flag);
01277     error1= doEndIndexScan();
01278   }
01279   return error ?  error : error1;
01280 }
01281 
01289 static bool log_row_for_replication(Table* table,
01290                                     const unsigned char *before_record,
01291                                     const unsigned char *after_record)
01292 {
01293   TransactionServices &transaction_services= TransactionServices::singleton();
01294   Session *const session= table->in_use;
01295 
01296   if (table->getShare()->getType() || not transaction_services.shouldConstructMessages())
01297     return false;
01298 
01299   bool result= false;
01300 
01301   switch (session->lex().sql_command)
01302   {
01303   case SQLCOM_CREATE_TABLE:
01304     /*
01305      * We are in a CREATE TABLE ... SELECT statement
01306      * and the kernel has already created the table
01307      * and put a CreateTableStatement in the active
01308      * Transaction message.  Here, we add a new InsertRecord
01309      * to a new Transaction message (because the above
01310      * CREATE TABLE will commit the transaction containing
01311      * it).
01312      */
01313     result= transaction_services.insertRecord(*session, *table);
01314     break;
01315   case SQLCOM_REPLACE:
01316   case SQLCOM_REPLACE_SELECT:
01317     /*
01318      * This is a total hack because of the code that is
01319      * in write_record() in sql_insert.cc. During
01320      * a REPLACE statement, a call to insertRecord() is
01321      * called.  If it fails, then a call to deleteRecord()
01322      * is called, followed by a repeat of the original
01323      * call to insertRecord().  So, log_row_for_replication
01324      * could be called multiple times for a REPLACE
01325      * statement.  The below looks at the values of before_record
01326      * and after_record to determine which call to this
01327      * function is for the delete or the insert, since NULL
01328      * is passed for after_record for the delete and NULL is
01329      * passed for before_record for the insert...
01330      *
01331      * In addition, there is an optimization that allows an
01332      * engine to convert the above delete + insert into an
01333      * update, so we must also check for this case below...
01334      */
01335     if (after_record == NULL)
01336     {
01337       /*
01338        * The storage engine is passed the record in table->record[1]
01339        * as the row to delete (this is the conflicting row), so
01340        * we need to notify TransactionService to use that row.
01341        */
01342       transaction_services.deleteRecord(*session, *table, true);
01343       /* 
01344        * We set the "current" statement message to NULL.  This triggers
01345        * the replication services component to generate a new statement
01346        * message for the inserted record which will come next.
01347        */
01348       transaction_services.finalizeStatementMessage(*session->getStatementMessage(), *session);
01349     }
01350     else
01351     {
01352       if (before_record == NULL)
01353         result= transaction_services.insertRecord(*session, *table);
01354       else
01355         transaction_services.updateRecord(*session, *table, before_record, after_record);
01356     }
01357     break;
01358   case SQLCOM_INSERT:
01359   case SQLCOM_INSERT_SELECT:
01360   case SQLCOM_LOAD:
01361     /*
01362      * The else block below represents an 
01363      * INSERT ... ON DUPLICATE KEY UPDATE that
01364      * has hit a key conflict and actually done
01365      * an update.
01366      */
01367     if (before_record == NULL)
01368       result= transaction_services.insertRecord(*session, *table);
01369     else
01370       transaction_services.updateRecord(*session, *table, before_record, after_record);
01371     break;
01372 
01373   case SQLCOM_UPDATE:
01374     transaction_services.updateRecord(*session, *table, before_record, after_record);
01375     break;
01376 
01377   case SQLCOM_DELETE:
01378     transaction_services.deleteRecord(*session, *table);
01379     break;
01380   default:
01381     break;
01382   }
01383 
01384   return result;
01385 }
01386 
01387 int Cursor::ha_external_lock(Session *session, int lock_type)
01388 {
01389   /*
01390     Whether this is lock or unlock, this should be true, and is to verify that
01391     if get_auto_increment() was called (thus may have reserved intervals or
01392     taken a table lock), ha_release_auto_increment() was too.
01393   */
01394   assert(next_insert_id == 0);
01395 
01396   if (DRIZZLE_CURSOR_RDLOCK_START_ENABLED() ||
01397       DRIZZLE_CURSOR_WRLOCK_START_ENABLED() ||
01398       DRIZZLE_CURSOR_UNLOCK_START_ENABLED())
01399   {
01400     if (lock_type == F_RDLCK)
01401     {
01402       DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
01403                                   getTable()->getShare()->getTableName());
01404     }
01405     else if (lock_type == F_WRLCK)
01406     {
01407       DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
01408                                   getTable()->getShare()->getTableName());
01409     }
01410     else if (lock_type == F_UNLCK)
01411     {
01412       DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
01413                                   getTable()->getShare()->getTableName());
01414     }
01415   }
01416 
01417   /*
01418     We cache the table flags if the locking succeeded. Otherwise, we
01419     keep them as they were when they were fetched in ha_open().
01420   */
01421 
01422   int error= external_lock(session, lock_type);
01423 
01424   if (DRIZZLE_CURSOR_RDLOCK_DONE_ENABLED() ||
01425       DRIZZLE_CURSOR_WRLOCK_DONE_ENABLED() ||
01426       DRIZZLE_CURSOR_UNLOCK_DONE_ENABLED())
01427   {
01428     if (lock_type == F_RDLCK)
01429     {
01430       DRIZZLE_CURSOR_RDLOCK_DONE(error);
01431     }
01432     else if (lock_type == F_WRLCK)
01433     {
01434       DRIZZLE_CURSOR_WRLOCK_DONE(error);
01435     }
01436     else if (lock_type == F_UNLCK)
01437     {
01438       DRIZZLE_CURSOR_UNLOCK_DONE(error);
01439     }
01440   }
01441 
01442   return error;
01443 }
01444 
01445 
01449 int Cursor::ha_reset()
01450 {
01451   /* Check that we have called all proper deallocation functions */
01452   assert(! getTable()->getShare()->all_set.none());
01453   assert(getTable()->key_read == 0);
01454   /* ensure that ha_index_end / endTableScan has been called */
01455   assert(inited == NONE);
01456   /* Free cache used by filesort */
01457   getTable()->free_io_cache();
01458   /* reset the bitmaps to point to defaults */
01459   getTable()->default_column_bitmaps();
01460   return(reset());
01461 }
01462 
01463 
01464 int Cursor::insertRecord(unsigned char *buf)
01465 {
01466   int error;
01467 
01468   /*
01469    * If we have a timestamp column, update it to the current time
01470    *
01471    * @TODO Technically, the below two lines can be take even further out of the
01472    * Cursor interface and into the fill_record() method.
01473    */
01474   if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
01475   {
01476     getTable()->timestamp_field->set_time();
01477   }
01478 
01479   DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
01480   setTransactionReadWrite();
01481   
01482   if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
01483   {
01484     error= ER_EVENT_OBSERVER_PLUGIN;
01485   }
01486   else
01487   {
01488     error= doInsertRecord(buf);
01489     if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error))) 
01490     {
01491       error= ER_EVENT_OBSERVER_PLUGIN;
01492     }
01493   }
01494 
01495   ha_statistic_increment(&system_status_var::ha_write_count);
01496 
01497   DRIZZLE_INSERT_ROW_DONE(error);
01498 
01499   if (unlikely(error))
01500   {
01501     return error;
01502   }
01503 
01504   if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
01505     return HA_ERR_RBR_LOGGING_FAILED;
01506 
01507   return 0;
01508 }
01509 
01510 
01511 int Cursor::updateRecord(const unsigned char *old_data, unsigned char *new_data)
01512 {
01513   int error;
01514 
01515   /*
01516     Some storage engines require that the new record is in getInsertRecord()
01517     (and the old record is in getUpdateRecord()).
01518    */
01519   assert(new_data == getTable()->getInsertRecord());
01520 
01521   DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
01522   setTransactionReadWrite();
01523   if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
01524   {
01525     error= ER_EVENT_OBSERVER_PLUGIN;
01526   }
01527   else
01528   {
01529     if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
01530     {
01531       getTable()->timestamp_field->set_time();
01532     }
01533 
01534     error= doUpdateRecord(old_data, new_data);
01535     if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
01536     {
01537       error= ER_EVENT_OBSERVER_PLUGIN;
01538     }
01539   }
01540 
01541   ha_statistic_increment(&system_status_var::ha_update_count);
01542 
01543   DRIZZLE_UPDATE_ROW_DONE(error);
01544 
01545   if (unlikely(error))
01546   {
01547     return error;
01548   }
01549 
01550   if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
01551     return HA_ERR_RBR_LOGGING_FAILED;
01552 
01553   return 0;
01554 }
01555 TableShare *Cursor::getShare()
01556 {
01557   return getTable()->getMutableShare();
01558 }
01559 
01560 int Cursor::deleteRecord(const unsigned char *buf)
01561 {
01562   int error;
01563 
01564   DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
01565   setTransactionReadWrite();
01566   if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
01567   {
01568     error= ER_EVENT_OBSERVER_PLUGIN;
01569   }
01570   else
01571   {
01572     error= doDeleteRecord(buf);
01573     if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
01574     {
01575       error= ER_EVENT_OBSERVER_PLUGIN;
01576     }
01577   }
01578 
01579   ha_statistic_increment(&system_status_var::ha_delete_count);
01580 
01581   DRIZZLE_DELETE_ROW_DONE(error);
01582 
01583   if (unlikely(error))
01584     return error;
01585 
01586   if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
01587     return HA_ERR_RBR_LOGGING_FAILED;
01588 
01589   return 0;
01590 }
01591 
01592 } /* namespace drizzled */