Drizzled Public API Documentation

sql_load.cc
00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 
00017 /* Copy data from a textfile to table */
00018 
00019 #include <config.h>
00020 
00021 #include <drizzled/sql_load.h>
00022 #include <drizzled/error.h>
00023 #include <drizzled/data_home.h>
00024 #include <drizzled/session.h>
00025 #include <drizzled/sql_base.h>
00026 #include <drizzled/field/epoch.h>
00027 #include <drizzled/internal/my_sys.h>
00028 #include <drizzled/internal/iocache.h>
00029 #include <drizzled/plugin/storage_engine.h>
00030 #include <drizzled/sql_lex.h>
00031 
00032 #include <sys/stat.h>
00033 #include <fcntl.h>
00034 #include <algorithm>
00035 #include <climits>
00036 #include <boost/filesystem.hpp>
00037 
00038 namespace fs=boost::filesystem;
00039 using namespace std;
00040 namespace drizzled
00041 {
00042 
00043 class READ_INFO {
00044   int cursor;
00045   unsigned char *buffer;                /* Buffer for read text */
00046   unsigned char *end_of_buff;           /* Data in bufferts ends here */
00047   size_t buff_length;                   /* Length of buffert */
00048   size_t max_length;                    /* Max length of row */
00049   char  *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
00050   uint  field_term_length,line_term_length,enclosed_length;
00051   int field_term_char,line_term_char,enclosed_char,escape_char;
00052   int *stack,*stack_pos;
00053   bool  found_end_of_line,start_of_line,eof;
00054   bool  need_end_io_cache;
00055   internal::IO_CACHE cache;
00056 
00057 public:
00058   bool error,line_cuted,found_null,enclosed;
00059   unsigned char *row_start,     /* Found row starts here */
00060   *row_end;     /* Found row ends here */
00061   const CHARSET_INFO *read_charset;
00062 
00063   READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
00064       String &field_term,String &line_start,String &line_term,
00065       String &enclosed,int escape, bool is_fifo);
00066   ~READ_INFO();
00067   int read_field();
00068   int read_fixed_length(void);
00069   int next_line(void);
00070   char unescape(char chr);
00071   int terminator(char *ptr,uint32_t length);
00072   bool find_start_of_fields();
00073 
00074   /*
00075     We need to force cache close before destructor is invoked to log
00076     the last read block
00077   */
00078   void end_io_cache()
00079   {
00080     cache.end_io_cache();
00081     need_end_io_cache = 0;
00082   }
00083 
00084   /*
00085     Either this method, or we need to make cache public
00086     Arg must be set from load() since constructor does not see
00087     either the table or Session value
00088   */
00089   void set_io_cache_arg(void* arg) { cache.arg = arg; }
00090 };
00091 
00092 static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
00093                              List<Item> &fields_vars, List<Item> &set_fields,
00094                              List<Item> &set_values, READ_INFO &read_info,
00095            uint32_t skip_lines,
00096            bool ignore_check_option_errors);
00097 static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
00098                           List<Item> &fields_vars, List<Item> &set_fields,
00099                           List<Item> &set_values, READ_INFO &read_info,
00100         String &enclosed, uint32_t skip_lines,
00101         bool ignore_check_option_errors);
00102 
00103 
00104 /*
00105   Execute LOAD DATA query
00106 
00107   SYNOPSYS
00108     load()
00109       session - current thread
00110       ex  - file_exchange object representing source cursor and its parsing rules
00111       table_list  - list of tables to which we are loading data
00112       fields_vars - list of fields and variables to which we read
00113                     data from cursor
00114       set_fields  - list of fields mentioned in set clause
00115       set_values  - expressions to assign to fields in previous list
00116       handle_duplicates - indicates whenever we should emit error or
00117                           replace row if we will meet duplicates.
00118       ignore -          - indicates whenever we should ignore duplicates
00119 
00120   RETURN VALUES
00121     true - error / false - success
00122 */
00123 
00124 int load(Session *session,file_exchange *ex,TableList *table_list,
00125           List<Item> &fields_vars, List<Item> &set_fields,
00126                 List<Item> &set_values,
00127                 enum enum_duplicates handle_duplicates, bool ignore)
00128 {
00129   int file;
00130   Table *table= NULL;
00131   int error;
00132   String *field_term=ex->field_term,*escaped=ex->escaped;
00133   String *enclosed=ex->enclosed;
00134   bool is_fifo=0;
00135 
00136   assert(table_list->getSchemaName()); // This should never be null
00137 
00138   /*
00139     If path for cursor is not defined, we will use the current database.
00140     If this is not set, we will use the directory where the table to be
00141     loaded is located
00142   */
00143   util::string::const_shared_ptr schema(session->schema());
00144   const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
00145   assert(tdb);
00146   uint32_t skip_lines= ex->skip_lines;
00147   bool transactional_table;
00148   Session::killed_state_t killed_status= Session::NOT_KILLED;
00149 
00150   /* Escape and enclosed character may be a utf8 4-byte character */
00151   if (escaped->length() > 4 || enclosed->length() > 4)
00152   {
00153     my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
00154     return(true);
00155   }
00156 
00157   if (session->openTablesLock(table_list))
00158     return(true);
00159 
00160   if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
00161                                     &session->lex().select_lex.top_join_list,
00162                                     table_list,
00163                                     &session->lex().select_lex.leaf_tables, true))
00164      return(-1);
00165 
00166   /*
00167     Let us emit an error if we are loading data to table which is used
00168     in subselect in SET clause like we do it for INSERT.
00169 
00170     The main thing to fix to remove this restriction is to ensure that the
00171     table is marked to be 'used for insert' in which case we should never
00172     mark this table as 'const table' (ie, one that has only one row).
00173   */
00174   if (unique_table(table_list, table_list->next_global))
00175   {
00176     my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
00177     return(true);
00178   }
00179 
00180   table= table_list->table;
00181   transactional_table= table->cursor->has_transactions();
00182 
00183   if (!fields_vars.size())
00184   {
00185     Field **field;
00186     for (field= table->getFields(); *field ; field++)
00187       fields_vars.push_back(new Item_field(*field));
00188     table->setWriteSet();
00189     table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
00190     /*
00191       Let us also prepare SET clause, altough it is probably empty
00192       in this case.
00193     */
00194     if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
00195         setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
00196       return(true);
00197   }
00198   else
00199   {           // Part field list
00200     /* TODO: use this conds for 'WITH CHECK OPTIONS' */
00201     if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
00202         setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
00203         check_that_all_fields_are_given_values(session, table, table_list))
00204       return(true);
00205     /*
00206       Check whenever TIMESTAMP field with auto-set feature specified
00207       explicitly.
00208     */
00209     if (table->timestamp_field)
00210     {
00211       if (table->isWriteSet(table->timestamp_field->position()))
00212       {
00213         table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
00214       }
00215       else
00216       {
00217         table->setWriteSet(table->timestamp_field->position());
00218       }
00219     }
00220     /* Fix the expressions in SET clause */
00221     if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
00222       return(true);
00223   }
00224 
00225   table->mark_columns_needed_for_insert();
00226 
00227   size_t tot_length=0;
00228   bool use_blobs= 0, use_vars= 0;
00229   List<Item>::iterator it(fields_vars.begin());
00230   Item *item;
00231 
00232   while ((item= it++))
00233   {
00234     Item *real_item= item->real_item();
00235 
00236     if (real_item->type() == Item::FIELD_ITEM)
00237     {
00238       Field *field= ((Item_field*)real_item)->field;
00239       if (field->flags & BLOB_FLAG)
00240       {
00241         use_blobs= 1;
00242         tot_length+= 256;     // Will be extended if needed
00243       }
00244       else
00245         tot_length+= field->field_length;
00246     }
00247     else if (item->type() == Item::STRING_ITEM)
00248       use_vars= 1;
00249   }
00250   if (use_blobs && !ex->line_term->length() && !field_term->length())
00251   {
00252     my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
00253          MYF(0));
00254     return(true);
00255   }
00256   if (use_vars && !field_term->length() && !enclosed->length())
00257   {
00258     my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
00259     return(true);
00260   }
00261 
00262   fs::path to_file(ex->file_name);
00263   fs::path target_path(fs::system_complete(getDataHomeCatalog()));
00264   if (not to_file.has_root_directory())
00265   {
00266     int count_elements= 0;
00267     for (fs::path::iterator iter= to_file.begin();
00268          iter != to_file.end();
00269          ++iter, ++count_elements)
00270     { }
00271 
00272     if (count_elements == 1)
00273     {
00274       target_path /= tdb;
00275     }
00276     target_path /= to_file;
00277   }
00278   else
00279   {
00280     target_path= to_file;
00281   }
00282 
00283   if (not secure_file_priv.string().empty())
00284   {
00285     if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
00286     {
00287       /* Read only allowed from within dir specified by secure_file_priv */
00288       my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
00289       return(true);
00290     }
00291   }
00292 
00293   struct stat stat_info;
00294   if (stat(target_path.file_string().c_str(), &stat_info))
00295   {
00296     my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
00297     return(true);
00298   }
00299 
00300   // if we are not in slave thread, the cursor must be:
00301   if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&  // readable by others
00302         (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
00303         ((stat_info.st_mode & S_IFREG) == S_IFREG ||
00304          (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
00305   {
00306     my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
00307     return(true);
00308   }
00309   if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
00310     is_fifo = 1;
00311 
00312 
00313   if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
00314   {
00315     my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
00316     return(true);
00317   }
00318   CopyInfo info;
00319   memset(&info, 0, sizeof(info));
00320   info.ignore= ignore;
00321   info.handle_duplicates=handle_duplicates;
00322   info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
00323 
00324   identifier::Schema identifier(*schema);
00325   READ_INFO read_info(file, tot_length,
00326                       ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
00327           *field_term, *ex->line_start, *ex->line_term, *enclosed,
00328           info.escape_char, is_fifo);
00329   if (read_info.error)
00330   {
00331     if  (file >= 0)
00332       internal::my_close(file,MYF(0));      // no files in net reading
00333     return(true);       // Can't allocate buffers
00334   }
00335 
00336   /*
00337    * Per the SQL standard, inserting NULL into a NOT NULL
00338    * field requires an error to be thrown.
00339    *
00340    * @NOTE
00341    *
00342    * NULL check and handling occurs in field_conv.cc
00343    */
00344   session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
00345   session->cuted_fields=0L;
00346   /* Skip lines if there is a line terminator */
00347   if (ex->line_term->length())
00348   {
00349     /* ex->skip_lines needs to be preserved for logging */
00350     while (skip_lines > 0)
00351     {
00352       skip_lines--;
00353       if (read_info.next_line())
00354   break;
00355     }
00356   }
00357 
00358   if (!(error=test(read_info.error)))
00359   {
00360 
00361     table->next_number_field=table->found_next_number_field;
00362     if (ignore ||
00363   handle_duplicates == DUP_REPLACE)
00364       table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
00365     if (handle_duplicates == DUP_REPLACE)
00366         table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
00367     table->cursor->ha_start_bulk_insert((ha_rows) 0);
00368     table->copy_blobs=1;
00369 
00370     session->setAbortOnWarning(true);
00371 
00372     if (!field_term->length() && !enclosed->length())
00373       error= read_fixed_length(session, info, table_list, fields_vars,
00374                                set_fields, set_values, read_info,
00375              skip_lines, ignore);
00376     else
00377       error= read_sep_field(session, info, table_list, fields_vars,
00378                             set_fields, set_values, read_info,
00379           *enclosed, skip_lines, ignore);
00380     if (table->cursor->ha_end_bulk_insert() && !error)
00381     {
00382       table->print_error(errno, MYF(0));
00383       error= 1;
00384     }
00385     table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
00386     table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
00387     table->next_number_field=0;
00388   }
00389   if (file >= 0)
00390     internal::my_close(file,MYF(0));
00391   free_blobs(table);        /* if pack_blob was used */
00392   table->copy_blobs=0;
00393   session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
00394   /*
00395      simulated killing in the middle of per-row loop
00396      must be effective for binlogging
00397   */
00398   killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
00399   if (error)
00400   {
00401     error= -1;        // Error on read
00402     goto err;
00403   }
00404 
00405   char msg[FN_REFLEN];
00406   snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
00407      (info.records - info.copied), session->cuted_fields);
00408 
00409   if (session->transaction.stmt.hasModifiedNonTransData())
00410     session->transaction.all.markModifiedNonTransData();
00411 
00412   /* ok to client sent only after binlog write and engine commit */
00413   session->my_ok(info.copied + info.deleted, 0, 0L, msg);
00414 err:
00415   assert(transactional_table || !(info.copied || info.deleted) ||
00416               session->transaction.stmt.hasModifiedNonTransData());
00417   table->cursor->ha_release_auto_increment();
00418   table->auto_increment_field_not_null= false;
00419   session->setAbortOnWarning(false);
00420 
00421   return(error);
00422 }
00423 
00424 
00425 /****************************************************************************
00426 ** Read of rows of fixed size + optional garage + optonal newline
00427 ****************************************************************************/
00428 
00429 static int
00430 read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
00431                   List<Item> &fields_vars, List<Item> &set_fields,
00432                   List<Item> &set_values, READ_INFO &read_info,
00433                   uint32_t skip_lines, bool ignore_check_option_errors)
00434 {
00435   List<Item>::iterator it(fields_vars.begin());
00436   Item_field *sql_field;
00437   Table *table= table_list->table;
00438   uint64_t id;
00439   bool err;
00440 
00441   id= 0;
00442 
00443   while (!read_info.read_fixed_length())
00444   {
00445     if (session->getKilled())
00446     {
00447       session->send_kill_message();
00448       return(1);
00449     }
00450     if (skip_lines)
00451     {
00452       /*
00453   We could implement this with a simple seek if:
00454   - We are not using DATA INFILE LOCAL
00455   - escape character is  ""
00456   - line starting prefix is ""
00457       */
00458       skip_lines--;
00459       continue;
00460     }
00461     it= fields_vars.begin();
00462     unsigned char *pos=read_info.row_start;
00463 #ifdef HAVE_VALGRIND
00464     read_info.row_end[0]=0;
00465 #endif
00466 
00467     table->restoreRecordAsDefault();
00468     /*
00469       There is no variables in fields_vars list in this format so
00470       this conversion is safe.
00471     */
00472     while ((sql_field= (Item_field*) it++))
00473     {
00474       Field *field= sql_field->field;
00475       if (field == table->next_number_field)
00476         table->auto_increment_field_not_null= true;
00477       /*
00478         No fields specified in fields_vars list can be null in this format.
00479         Mark field as not null, we should do this for each row because of
00480         restore_record...
00481       */
00482       field->set_notnull();
00483 
00484       if (pos == read_info.row_end)
00485       {
00486         session->cuted_fields++;      /* Not enough fields */
00487         push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00488                             ER_WARN_TOO_FEW_RECORDS,
00489                             ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
00490 
00491         if (not field->maybe_null() and field->is_timestamp())
00492             ((field::Epoch::pointer) field)->set_time();
00493       }
00494       else
00495       {
00496   uint32_t length;
00497   unsigned char save_chr;
00498   if ((length=(uint32_t) (read_info.row_end-pos)) >
00499       field->field_length)
00500         {
00501     length=field->field_length;
00502         }
00503   save_chr=pos[length];
00504         pos[length]='\0'; // Add temp null terminator for store()
00505         field->store((char*) pos,length,read_info.read_charset);
00506   pos[length]=save_chr;
00507   if ((pos+=length) > read_info.row_end)
00508     pos= read_info.row_end; /* Fills rest with space */
00509       }
00510     }
00511     if (pos != read_info.row_end)
00512     {
00513       session->cuted_fields++;      /* To long row */
00514       push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00515                           ER_WARN_TOO_MANY_RECORDS,
00516                           ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
00517     }
00518 
00519     if (session->getKilled() ||
00520         fill_record(session, set_fields, set_values,
00521                     ignore_check_option_errors))
00522       return(1);
00523 
00524     err= write_record(session, table, &info);
00525     table->auto_increment_field_not_null= false;
00526     if (err)
00527       return(1);
00528 
00529     /*
00530       We don't need to reset auto-increment field since we are restoring
00531       its default value at the beginning of each loop iteration.
00532     */
00533     if (read_info.next_line())      // Skip to next line
00534       break;
00535     if (read_info.line_cuted)
00536     {
00537       session->cuted_fields++;      /* To long row */
00538       push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00539                           ER_WARN_TOO_MANY_RECORDS,
00540                           ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
00541     }
00542     session->row_count++;
00543   }
00544   return(test(read_info.error));
00545 }
00546 
00547 
00548 
00549 static int
00550 read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
00551                List<Item> &fields_vars, List<Item> &set_fields,
00552                List<Item> &set_values, READ_INFO &read_info,
00553          String &enclosed, uint32_t skip_lines,
00554          bool ignore_check_option_errors)
00555 {
00556   List<Item>::iterator it(fields_vars.begin());
00557   Item *item;
00558   Table *table= table_list->table;
00559   uint32_t enclosed_length;
00560   uint64_t id;
00561   bool err;
00562 
00563   enclosed_length=enclosed.length();
00564   id= 0;
00565 
00566   for (;;it= fields_vars.begin())
00567   {
00568     if (session->getKilled())
00569     {
00570       session->send_kill_message();
00571       return(1);
00572     }
00573 
00574     table->restoreRecordAsDefault();
00575 
00576     while ((item= it++))
00577     {
00578       uint32_t length;
00579       unsigned char *pos;
00580       Item *real_item;
00581 
00582       if (read_info.read_field())
00583   break;
00584 
00585       /* If this line is to be skipped we don't want to fill field or var */
00586       if (skip_lines)
00587         continue;
00588 
00589       pos=read_info.row_start;
00590       length=(uint32_t) (read_info.row_end-pos);
00591 
00592       real_item= item->real_item();
00593 
00594       if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
00595     (length == 1 && read_info.found_null))
00596       {
00597 
00598         if (real_item->type() == Item::FIELD_ITEM)
00599         {
00600           Field *field= ((Item_field *)real_item)->field;
00601           if (field->reset())
00602           {
00603             my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
00604                      session->row_count);
00605             return(1);
00606           }
00607           field->set_null();
00608           if (not field->maybe_null())
00609           {
00610             if (field->is_timestamp())
00611             {
00612               ((field::Epoch::pointer) field)->set_time();
00613             }
00614             else if (field != table->next_number_field)
00615             {
00616               field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
00617             }
00618           }
00619   }
00620         else if (item->type() == Item::STRING_ITEM)
00621         {
00622           ((Item_user_var_as_out_param *)item)->set_null_value(
00623                                                   read_info.read_charset);
00624         }
00625         else
00626         {
00627           my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
00628           return(1);
00629         }
00630 
00631   continue;
00632       }
00633 
00634       if (real_item->type() == Item::FIELD_ITEM)
00635       {
00636         Field *field= ((Item_field *)real_item)->field;
00637         field->set_notnull();
00638         read_info.row_end[0]=0;     // Safe to change end marker
00639         if (field == table->next_number_field)
00640           table->auto_increment_field_not_null= true;
00641         field->store((char*) pos, length, read_info.read_charset);
00642       }
00643       else if (item->type() == Item::STRING_ITEM)
00644       {
00645         ((Item_user_var_as_out_param *)item)->set_value((char*) pos, length,
00646                                                         read_info.read_charset);
00647       }
00648       else
00649       {
00650         my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
00651         return(1);
00652       }
00653     }
00654     if (read_info.error)
00655       break;
00656     if (skip_lines)
00657     {
00658       skip_lines--;
00659       continue;
00660     }
00661     if (item)
00662     {
00663       /* Have not read any field, thus input cursor is simply ended */
00664       if (item == &fields_vars.front())
00665   break;
00666       for (; item ; item= it++)
00667       {
00668         Item *real_item= item->real_item();
00669         if (real_item->type() == Item::FIELD_ITEM)
00670         {
00671           Field *field= ((Item_field *)real_item)->field;
00672           if (field->reset())
00673           {
00674             my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
00675                      session->row_count);
00676             return(1);
00677           }
00678           if (not field->maybe_null() and field->is_timestamp())
00679               ((field::Epoch::pointer) field)->set_time();
00680           /*
00681             QQ: We probably should not throw warning for each field.
00682             But how about intention to always have the same number
00683             of warnings in Session::cuted_fields (and get rid of cuted_fields
00684             in the end ?)
00685           */
00686           session->cuted_fields++;
00687           push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00688                               ER_WARN_TOO_FEW_RECORDS,
00689                               ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
00690         }
00691         else if (item->type() == Item::STRING_ITEM)
00692         {
00693           ((Item_user_var_as_out_param *)item)->set_null_value(
00694                                                   read_info.read_charset);
00695         }
00696         else
00697         {
00698           my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
00699           return(1);
00700         }
00701       }
00702     }
00703 
00704     if (session->getKilled() ||
00705         fill_record(session, set_fields, set_values,
00706                     ignore_check_option_errors))
00707       return(1);
00708 
00709     err= write_record(session, table, &info);
00710     table->auto_increment_field_not_null= false;
00711     if (err)
00712       return(1);
00713     /*
00714       We don't need to reset auto-increment field since we are restoring
00715       its default value at the beginning of each loop iteration.
00716     */
00717     if (read_info.next_line())      // Skip to next line
00718       break;
00719     if (read_info.line_cuted)
00720     {
00721       session->cuted_fields++;      /* To long row */
00722       push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00723                           ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
00724                           session->row_count);
00725       if (session->getKilled())
00726         return(1);
00727     }
00728     session->row_count++;
00729   }
00730   return(test(read_info.error));
00731 }
00732 
00733 
00734 /* Unescape all escape characters, mark \N as null */
00735 
00736 char
00737 READ_INFO::unescape(char chr)
00738 {
00739   /* keep this switch synchornous with the ESCAPE_CHARS macro */
00740   switch(chr) {
00741   case 'n': return '\n';
00742   case 't': return '\t';
00743   case 'r': return '\r';
00744   case 'b': return '\b';
00745   case '0': return 0;       // Ascii null
00746   case 'Z': return '\032';      // Win32 end of cursor
00747   case 'N': found_null=1;
00748 
00749     /* fall through */
00750   default:  return chr;
00751   }
00752 }
00753 
00754 
00755 /*
00756   Read a line using buffering
00757   If last line is empty (in line mode) then it isn't outputed
00758 */
00759 
00760 
00761 READ_INFO::READ_INFO(int file_par, size_t tot_length,
00762                      const CHARSET_INFO * const cs,
00763          String &field_term, String &line_start, String &line_term,
00764          String &enclosed_par, int escape, bool is_fifo)
00765   :cursor(file_par),escape_char(escape)
00766 {
00767   read_charset= cs;
00768   field_term_ptr=(char*) field_term.ptr();
00769   field_term_length= field_term.length();
00770   line_term_ptr=(char*) line_term.ptr();
00771   line_term_length= line_term.length();
00772   if (line_start.length() == 0)
00773   {
00774     line_start_ptr=0;
00775     start_of_line= 0;
00776   }
00777   else
00778   {
00779     line_start_ptr=(char*) line_start.ptr();
00780     line_start_end=line_start_ptr+line_start.length();
00781     start_of_line= 1;
00782   }
00783   /* If field_terminator == line_terminator, don't use line_terminator */
00784   if (field_term_length == line_term_length &&
00785       !memcmp(field_term_ptr,line_term_ptr,field_term_length))
00786   {
00787     line_term_length=0;
00788     line_term_ptr=(char*) "";
00789   }
00790   enclosed_char= (enclosed_length=enclosed_par.length()) ?
00791     (unsigned char) enclosed_par[0] : INT_MAX;
00792   field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
00793   line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
00794   error=eof=found_end_of_line=found_null=line_cuted=0;
00795   buff_length=tot_length;
00796 
00797 
00798   /* Set of a stack for unget if long terminators */
00799   size_t length= max(field_term_length,line_term_length)+1;
00800   set_if_bigger(length, line_start.length());
00801   stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
00802 
00803   if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
00804     error=1;
00805   else
00806   {
00807     end_of_buff=buffer+buff_length;
00808     if (cache.init_io_cache((false) ? -1 : cursor, 0,
00809                             (false) ? internal::READ_NET :
00810                             (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
00811                             MYF(MY_WME)))
00812     {
00813       free((unsigned char*) buffer);
00814       error=1;
00815     }
00816     else
00817     {
00818       /*
00819   init_io_cache() will not initialize read_function member
00820   if the cache is READ_NET. So we work around the problem with a
00821   manual assignment
00822       */
00823       need_end_io_cache = 1;
00824     }
00825   }
00826 }
00827 
00828 
00829 READ_INFO::~READ_INFO()
00830 {
00831   if (!error)
00832   {
00833     if (need_end_io_cache)
00834       cache.end_io_cache();
00835     free(buffer);
00836     error=1;
00837   }
00838 }
00839 
00840 
00841 #define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache))
00842 #define PUSH(A) *(stack_pos++)=(A)
00843 
00844 
00845 inline int READ_INFO::terminator(char *ptr,uint32_t length)
00846 {
00847   int chr=0;          // Keep gcc happy
00848   uint32_t i;
00849   for (i=1 ; i < length ; i++)
00850   {
00851     if ((chr=GET) != *++ptr)
00852     {
00853       break;
00854     }
00855   }
00856   if (i == length)
00857     return 1;
00858   PUSH(chr);
00859   while (i-- > 1)
00860     PUSH((unsigned char) *--ptr);
00861   return 0;
00862 }
00863 
00864 
00865 int READ_INFO::read_field()
00866 {
00867   int chr,found_enclosed_char;
00868   unsigned char *to,*new_buffer;
00869 
00870   found_null=0;
00871   if (found_end_of_line)
00872     return 1;         // One have to call next_line
00873 
00874   /* Skip until we find 'line_start' */
00875 
00876   if (start_of_line)
00877   {           // Skip until line_start
00878     start_of_line=0;
00879     if (find_start_of_fields())
00880       return 1;
00881   }
00882   if ((chr=GET) == my_b_EOF)
00883   {
00884     found_end_of_line=eof=1;
00885     return 1;
00886   }
00887   to=buffer;
00888   if (chr == enclosed_char)
00889   {
00890     found_enclosed_char=enclosed_char;
00891     *to++=(unsigned char) chr;        // If error
00892   }
00893   else
00894   {
00895     found_enclosed_char= INT_MAX;
00896     PUSH(chr);
00897   }
00898 
00899   for (;;)
00900   {
00901     while ( to < end_of_buff)
00902     {
00903       chr = GET;
00904       if ((my_mbcharlen(read_charset, chr) > 1) &&
00905           to+my_mbcharlen(read_charset, chr) <= end_of_buff)
00906       {
00907         unsigned char* p = (unsigned char*)to;
00908         *to++ = chr;
00909         int ml = my_mbcharlen(read_charset, chr);
00910         int i;
00911         for (i=1; i<ml; i++) {
00912           chr = GET;
00913           if (chr == my_b_EOF)
00914             goto found_eof;
00915           *to++ = chr;
00916         }
00917         if (my_ismbchar(read_charset,
00918               (const char *)p,
00919               (const char *)to))
00920           continue;
00921         for (i=0; i<ml; i++)
00922           PUSH((unsigned char) *--to);
00923         chr = GET;
00924       }
00925       if (chr == my_b_EOF)
00926         goto found_eof;
00927       if (chr == escape_char)
00928       {
00929         if ((chr=GET) == my_b_EOF)
00930         {
00931           *to++= (unsigned char) escape_char;
00932           goto found_eof;
00933         }
00934         /*
00935           When escape_char == enclosed_char, we treat it like we do for
00936           handling quotes in SQL parsing -- you can double-up the
00937           escape_char to include it literally, but it doesn't do escapes
00938           like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
00939           with data like: "fie""ld1", "field2"
00940          */
00941         if (escape_char != enclosed_char || chr == escape_char)
00942         {
00943           *to++ = (unsigned char) unescape((char) chr);
00944           continue;
00945         }
00946         PUSH(chr);
00947         chr= escape_char;
00948       }
00949 #ifdef ALLOW_LINESEPARATOR_IN_STRINGS
00950       if (chr == line_term_char)
00951 #else
00952         if (chr == line_term_char && found_enclosed_char == INT_MAX)
00953 #endif
00954         {
00955           if (terminator(line_term_ptr,line_term_length))
00956           {         // Maybe unexpected linefeed
00957             enclosed=0;
00958             found_end_of_line=1;
00959             row_start=buffer;
00960             row_end=  to;
00961             return 0;
00962           }
00963         }
00964       if (chr == found_enclosed_char)
00965       {
00966         if ((chr=GET) == found_enclosed_char)
00967         {         // Remove dupplicated
00968           *to++ = (unsigned char) chr;
00969           continue;
00970         }
00971         // End of enclosed field if followed by field_term or line_term
00972         if (chr == my_b_EOF ||
00973             (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
00974         {         // Maybe unexpected linefeed
00975           enclosed=1;
00976           found_end_of_line=1;
00977           row_start=buffer+1;
00978           row_end=  to;
00979           return 0;
00980         }
00981         if (chr == field_term_char &&
00982             terminator(field_term_ptr,field_term_length))
00983         {
00984           enclosed=1;
00985           row_start=buffer+1;
00986           row_end=  to;
00987           return 0;
00988         }
00989         /*
00990            The string didn't terminate yet.
00991            Store back next character for the loop
00992          */
00993         PUSH(chr);
00994         /* copy the found term character to 'to' */
00995         chr= found_enclosed_char;
00996       }
00997       else if (chr == field_term_char && found_enclosed_char == INT_MAX)
00998       {
00999         if (terminator(field_term_ptr,field_term_length))
01000         {
01001           enclosed=0;
01002           row_start=buffer;
01003           row_end=  to;
01004           return 0;
01005         }
01006       }
01007       *to++ = (unsigned char) chr;
01008     }
01009     /*
01010      ** We come here if buffer is too small. Enlarge it and continue
01011      */
01012     if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
01013       return (error=1);
01014     to=new_buffer + (to-buffer);
01015     buffer=new_buffer;
01016     buff_length+=IO_SIZE;
01017     end_of_buff=buffer+buff_length;
01018   }
01019 
01020 found_eof:
01021   enclosed=0;
01022   found_end_of_line=eof=1;
01023   row_start=buffer;
01024   row_end=to;
01025   return 0;
01026 }
01027 
01028 /*
01029   Read a row with fixed length.
01030 
01031   NOTES
01032     The row may not be fixed size on disk if there are escape
01033     characters in the cursor.
01034 
01035   IMPLEMENTATION NOTE
01036     One can't use fixed length with multi-byte charset **
01037 
01038   RETURN
01039     0  ok
01040     1  error
01041 */
01042 
01043 int READ_INFO::read_fixed_length()
01044 {
01045   int chr;
01046   unsigned char *to;
01047   if (found_end_of_line)
01048     return 1;         // One have to call next_line
01049 
01050   if (start_of_line)
01051   {           // Skip until line_start
01052     start_of_line=0;
01053     if (find_start_of_fields())
01054       return 1;
01055   }
01056 
01057   to=row_start=buffer;
01058   while (to < end_of_buff)
01059   {
01060     if ((chr=GET) == my_b_EOF)
01061       goto found_eof;
01062     if (chr == escape_char)
01063     {
01064       if ((chr=GET) == my_b_EOF)
01065       {
01066   *to++= (unsigned char) escape_char;
01067   goto found_eof;
01068       }
01069       *to++ =(unsigned char) unescape((char) chr);
01070       continue;
01071     }
01072     if (chr == line_term_char)
01073     {
01074       if (terminator(line_term_ptr,line_term_length))
01075       {           // Maybe unexpected linefeed
01076   found_end_of_line=1;
01077   row_end=  to;
01078   return 0;
01079       }
01080     }
01081     *to++ = (unsigned char) chr;
01082   }
01083   row_end=to;         // Found full line
01084   return 0;
01085 
01086 found_eof:
01087   found_end_of_line=eof=1;
01088   row_start=buffer;
01089   row_end=to;
01090   return to == buffer ? 1 : 0;
01091 }
01092 
01093 
01094 int READ_INFO::next_line()
01095 {
01096   line_cuted=0;
01097   start_of_line= line_start_ptr != 0;
01098   if (found_end_of_line || eof)
01099   {
01100     found_end_of_line=0;
01101     return eof;
01102   }
01103   found_end_of_line=0;
01104   if (!line_term_length)
01105     return 0;         // No lines
01106   for (;;)
01107   {
01108     int chr = GET;
01109     if (my_mbcharlen(read_charset, chr) > 1)
01110     {
01111       for (uint32_t i=1;
01112           chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
01113           i++)
01114         chr = GET;
01115       if (chr == escape_char)
01116         continue;
01117     }
01118     if (chr == my_b_EOF)
01119     {
01120       eof=1;
01121       return 1;
01122     }
01123     if (chr == escape_char)
01124     {
01125       line_cuted=1;
01126       if (GET == my_b_EOF)
01127         return 1;
01128       continue;
01129     }
01130     if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
01131       return 0;
01132     line_cuted=1;
01133   }
01134 }
01135 
01136 
01137 bool READ_INFO::find_start_of_fields()
01138 {
01139   int chr;
01140  try_again:
01141   do
01142   {
01143     if ((chr=GET) == my_b_EOF)
01144     {
01145       found_end_of_line=eof=1;
01146       return 1;
01147     }
01148   } while ((char) chr != line_start_ptr[0]);
01149   for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
01150   {
01151     chr=GET;          // Eof will be checked later
01152     if ((char) chr != *ptr)
01153     {           // Can't be line_start
01154       PUSH(chr);
01155       while (--ptr != line_start_ptr)
01156       {           // Restart with next char
01157   PUSH((unsigned char) *ptr);
01158       }
01159       goto try_again;
01160     }
01161   }
01162   return 0;
01163 }
01164 
01165 
01166 } /* namespace drizzled */