Drizzled Public API Documentation

statement_transform.cc
Go to the documentation of this file.
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2009 Sun Microsystems, Inc.
00005  *  Copyright (C) 2010 Jay Pipes
00006  *
00007  *  Authors:
00008  *
00009  *    Jay Pipes <jaypipes@gmail.com>
00010  *
00011  *  This program is free software; you can redistribute it and/or modify
00012  *  it under the terms of the GNU General Public License as published by
00013  *  the Free Software Foundation; version 2 of the License.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU General Public License for more details.
00019  *
00020  *  You should have received a copy of the GNU General Public License
00021  *  along with this program; if not, write to the Free Software
00022  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00023  */
00024 
00032 #include <config.h>
00033 
00034 #include <boost/lexical_cast.hpp>
00035 
00036 #include <drizzled/charset.h>
00037 #include <drizzled/charset_info.h>
00038 #include <drizzled/global_charset_info.h>
00039 #include <drizzled/message.h>
00040 #include <drizzled/message/statement_transform.h>
00041 #include <drizzled/message/transaction.pb.h>
00042 
00043 #include <string>
00044 #include <vector>
00045 #include <sstream>
00046 #include <cstdio>
00047 
00048 using namespace std;
00049 
00050 namespace drizzled
00051 {
00052 
00053 namespace message
00054 {
00055 
00056 static void escapeEmbeddedQuotes(string &s, const char quote='\'')
00057 {
00058   string::iterator it;
00059 
00060   for (it= s.begin(); it != s.end(); ++it)
00061   {
00062     if (*it == quote)
00063     {
00064       it= s.insert(it, quote);
00065       ++it;  // advance back to the quote
00066     }
00067   }
00068 }
00069 
00070 /* Incredibly similar to append_unescaped() in table.cc, but for std::string */
00071 static void append_escaped_string(std::string *res, const std::string &input, const char quote='\'')
00072 {
00073   const char *pos= input.c_str();
00074   const char *end= input.c_str()+input.length();
00075   res->push_back(quote);
00076 
00077   for (; pos != end ; pos++)
00078   {
00079     uint32_t mblen;
00080     if (use_mb(default_charset_info) &&
00081         (mblen= my_ismbchar(default_charset_info, pos, end)))
00082     {
00083       res->append(pos, mblen);
00084       pos+= mblen - 1;
00085       if (pos >= end)
00086         break;
00087       continue;
00088     }
00089 
00090     switch (*pos) {
00091     case 0:       /* Must be escaped for 'mysql' */
00092       res->push_back('\\');
00093       res->push_back('0');
00094       break;
00095     case '\n':        /* Must be escaped for logs */
00096       res->push_back('\\');
00097       res->push_back('n');
00098       break;
00099     case '\r':
00100       res->push_back('\\');   /* This gives better readability */
00101       res->push_back('r');
00102       break;
00103     case '\\':
00104       res->push_back('\\');   /* Because of the sql syntax */
00105       res->push_back('\\');
00106       break;
00107     default:
00108       if (*pos == quote) /* SQL syntax for quoting a quote */
00109       {
00110         res->push_back(quote);
00111         res->push_back(quote);
00112       }
00113       else
00114         res->push_back(*pos);
00115       break;
00116     }
00117   }
00118   res->push_back(quote);
00119 }
00120 
00121 enum TransformSqlError
00122 transformStatementToSql(const Statement &source,
00123                         vector<string> &sql_strings,
00124                         enum TransformSqlVariant sql_variant,
00125                         bool already_in_transaction)
00126 {
00127   TransformSqlError error= NONE;
00128 
00129   switch (source.type())
00130   {
00131   case Statement::ROLLBACK_STATEMENT:
00132     {
00133       break;
00134     }
00135   case Statement::ROLLBACK:
00136     {
00137       sql_strings.push_back("ROLLBACK");
00138       break;
00139     }
00140   case Statement::INSERT:
00141     {
00142       if (! source.has_insert_header())
00143       {
00144         error= MISSING_HEADER;
00145         return error;
00146       }
00147       if (! source.has_insert_data())
00148       {
00149         error= MISSING_DATA;
00150         return error;
00151       }
00152 
00153       const InsertHeader &insert_header= source.insert_header();
00154       const InsertData &insert_data= source.insert_data();
00155       size_t num_keys= insert_data.record_size();
00156 
00157       if (num_keys > 1 && ! already_in_transaction)
00158         sql_strings.push_back("START TRANSACTION");
00159 
00160       for (size_t x= 0; x < num_keys; ++x)
00161       {
00162         string destination;
00163 
00164         error= transformInsertRecordToSql(insert_header,
00165                                           insert_data.record(x),
00166                                           destination,
00167                                           sql_variant);
00168         if (error != NONE)
00169           break;
00170 
00171         sql_strings.push_back(destination);
00172       }
00173 
00174       if (num_keys > 1 && ! already_in_transaction)
00175       {
00176         if (error == NONE)
00177           sql_strings.push_back("COMMIT");
00178         else
00179           sql_strings.push_back("ROLLBACK");
00180       }
00181     }
00182     break;
00183   case Statement::UPDATE:
00184     {
00185       if (! source.has_update_header())
00186       {
00187         error= MISSING_HEADER;
00188         return error;
00189       }
00190       if (! source.has_update_data())
00191       {
00192         error= MISSING_DATA;
00193         return error;
00194       }
00195 
00196       const UpdateHeader &update_header= source.update_header();
00197       const UpdateData &update_data= source.update_data();
00198       size_t num_keys= update_data.record_size();
00199       size_t x;
00200 
00201       if (num_keys > 1 && ! already_in_transaction)
00202         sql_strings.push_back("START TRANSACTION");
00203 
00204       for (x= 0; x < num_keys; ++x)
00205       {
00206         string destination;
00207 
00208         error= transformUpdateRecordToSql(update_header,
00209                                           update_data.record(x),
00210                                           destination,
00211                                           sql_variant);
00212         if (error != NONE)
00213           break;
00214 
00215         sql_strings.push_back(destination);
00216       }
00217 
00218       if (num_keys > 1 && ! already_in_transaction)
00219       {
00220         if (error == NONE)
00221           sql_strings.push_back("COMMIT");
00222         else
00223           sql_strings.push_back("ROLLBACK");
00224       }
00225     }
00226     break;
00227   case Statement::DELETE:
00228     {
00229       if (! source.has_delete_header())
00230       {
00231         error= MISSING_HEADER;
00232         return error;
00233       }
00234       if (! source.has_delete_data())
00235       {
00236         error= MISSING_DATA;
00237         return error;
00238       }
00239 
00240       const DeleteHeader &delete_header= source.delete_header();
00241       const DeleteData &delete_data= source.delete_data();
00242       size_t num_keys= delete_data.record_size();
00243       size_t x;
00244 
00245       if (num_keys > 1 && ! already_in_transaction)
00246         sql_strings.push_back("START TRANSACTION");
00247 
00248       for (x= 0; x < num_keys; ++x)
00249       {
00250         string destination;
00251 
00252         error= transformDeleteRecordToSql(delete_header,
00253                                           delete_data.record(x),
00254                                           destination,
00255                                           sql_variant);
00256         if (error != NONE)
00257           break;
00258 
00259         sql_strings.push_back(destination);
00260       }
00261 
00262       if (num_keys > 1 && ! already_in_transaction)
00263       {
00264         if (error == NONE)
00265           sql_strings.push_back("COMMIT");
00266         else
00267           sql_strings.push_back("ROLLBACK");
00268       }
00269     }
00270     break;
00271   case Statement::CREATE_TABLE:
00272     {
00273       assert(source.has_create_table_statement());
00274       string destination;
00275       error= transformCreateTableStatementToSql(source.create_table_statement(),
00276                                                 destination,
00277                                                 sql_variant);
00278       sql_strings.push_back(destination);
00279     }
00280     break;
00281   case Statement::TRUNCATE_TABLE:
00282     {
00283       assert(source.has_truncate_table_statement());
00284       string destination;
00285       error= transformTruncateTableStatementToSql(source.truncate_table_statement(),
00286                                                   destination,
00287                                                   sql_variant);
00288       sql_strings.push_back(destination);
00289     }
00290     break;
00291   case Statement::DROP_TABLE:
00292     {
00293       assert(source.has_drop_table_statement());
00294       string destination;
00295       error= transformDropTableStatementToSql(source.drop_table_statement(),
00296                                               destination,
00297                                               sql_variant);
00298       sql_strings.push_back(destination);
00299     }
00300     break;
00301   case Statement::CREATE_SCHEMA:
00302     {
00303       assert(source.has_create_schema_statement());
00304       string destination;
00305       error= transformCreateSchemaStatementToSql(source.create_schema_statement(),
00306                                                  destination,
00307                                                  sql_variant);
00308       sql_strings.push_back(destination);
00309     }
00310     break;
00311   case Statement::DROP_SCHEMA:
00312     {
00313       assert(source.has_drop_schema_statement());
00314       string destination;
00315       error= transformDropSchemaStatementToSql(source.drop_schema_statement(),
00316                                                destination,
00317                                                sql_variant);
00318       sql_strings.push_back(destination);
00319     }
00320     break;
00321   case Statement::ALTER_SCHEMA:
00322     {
00323       assert(source.has_alter_schema_statement());
00324       string destination;
00325       error= transformAlterSchemaStatementToSql(source.alter_schema_statement(),
00326                                                 destination,
00327                                                 sql_variant);
00328       sql_strings.push_back(destination);
00329     }
00330     break;
00331   case Statement::SET_VARIABLE:
00332     {
00333       assert(source.has_set_variable_statement());
00334       string destination;
00335       error= transformSetVariableStatementToSql(source.set_variable_statement(),
00336                                                 destination,
00337                                                 sql_variant);
00338       sql_strings.push_back(destination);
00339     }
00340     break;
00341   case Statement::RAW_SQL:
00342     {
00343       if (source.has_raw_sql_schema())
00344       {
00345         string destination("USE ");
00346         destination.append(source.raw_sql_schema());
00347         sql_strings.push_back(destination);
00348       }
00349       sql_strings.push_back(source.sql());
00350     }
00351     break;
00352   default:
00353     sql_strings.push_back(source.sql());
00354     break;
00355   }
00356   return error;
00357 }
00358 
00359 enum TransformSqlError
00360 transformInsertHeaderToSql(const InsertHeader &header,
00361                            string &destination,
00362                            enum TransformSqlVariant sql_variant)
00363 {
00364   char quoted_identifier= '`';
00365   if (sql_variant == ANSI)
00366     quoted_identifier= '"';
00367 
00368   destination.assign("INSERT INTO ", 12);
00369   destination.push_back(quoted_identifier);
00370   destination.append(header.table_metadata().schema_name());
00371   destination.push_back(quoted_identifier);
00372   destination.push_back('.');
00373   destination.push_back(quoted_identifier);
00374   destination.append(header.table_metadata().table_name());
00375   destination.push_back(quoted_identifier);
00376   destination.append(" (", 2);
00377 
00378   /* Add field list to SQL string... */
00379   size_t num_fields= header.field_metadata_size();
00380   size_t x;
00381 
00382   for (x= 0; x < num_fields; ++x)
00383   {
00384     const FieldMetadata &field_metadata= header.field_metadata(x);
00385     if (x != 0)
00386       destination.push_back(',');
00387     
00388     destination.push_back(quoted_identifier);
00389     destination.append(field_metadata.name());
00390     destination.push_back(quoted_identifier);
00391   }
00392 
00393   return NONE;
00394 }
00395 
00396 enum TransformSqlError
00397 transformInsertRecordToSql(const InsertHeader &header,
00398                            const InsertRecord &record,
00399                            string &destination,
00400                            enum TransformSqlVariant sql_variant)
00401 {
00402   enum TransformSqlError error= transformInsertHeaderToSql(header,
00403                                                            destination,
00404                                                            sql_variant);
00405 
00406   char quoted_identifier= '`';
00407   if (sql_variant == ANSI)
00408     quoted_identifier= '"';
00409 
00410   destination.append(") VALUES (");
00411 
00412   /* Add insert values */
00413   size_t num_fields= header.field_metadata_size();
00414   size_t x;
00415   bool should_quote_field_value= false;
00416   
00417   for (x= 0; x < num_fields; ++x)
00418   {
00419     if (x != 0)
00420       destination.push_back(',');
00421 
00422     const FieldMetadata &field_metadata= header.field_metadata(x);
00423 
00424     if (record.is_null(x))
00425     {
00426       should_quote_field_value= false;
00427     }
00428     else 
00429     {
00430       should_quote_field_value= shouldQuoteFieldValue(field_metadata.type());
00431     }
00432 
00433     if (should_quote_field_value)
00434       destination.push_back('\'');
00435 
00436     if (record.is_null(x))
00437     {
00438       destination.append("NULL");
00439     }
00440     else
00441     {
00442       if (field_metadata.type() == Table::Field::BLOB)
00443       {
00444         /*
00445          * We do this here because BLOB data is returned
00446          * in a string correctly, but calling append()
00447          * without a length will result in only the string
00448          * up to a \0 being output here.
00449          */
00450         string raw_data(record.insert_value(x));
00451         destination.append(raw_data.c_str(), raw_data.size());
00452       }
00453       else
00454       {
00455         string tmp(record.insert_value(x));
00456         escapeEmbeddedQuotes(tmp);
00457         destination.append(tmp);
00458       }
00459     }
00460 
00461     if (should_quote_field_value)
00462       destination.push_back('\'');
00463   }
00464   destination.push_back(')');
00465 
00466   return error;
00467 }
00468 
00469 enum TransformSqlError
00470 transformInsertStatementToSql(const InsertHeader &header,
00471                               const InsertData &data,
00472                               string &destination,
00473                               enum TransformSqlVariant sql_variant)
00474 {
00475   enum TransformSqlError error= transformInsertHeaderToSql(header,
00476                                                            destination,
00477                                                            sql_variant);
00478 
00479   char quoted_identifier= '`';
00480   if (sql_variant == ANSI)
00481     quoted_identifier= '"';
00482 
00483   destination.append(") VALUES (", 10);
00484 
00485   /* Add insert values */
00486   size_t num_records= data.record_size();
00487   size_t num_fields= header.field_metadata_size();
00488   size_t x, y;
00489   bool should_quote_field_value= false;
00490   
00491   for (x= 0; x < num_records; ++x)
00492   {
00493     if (x != 0)
00494       destination.append("),(", 3);
00495 
00496     for (y= 0; y < num_fields; ++y)
00497     {
00498       if (y != 0)
00499         destination.push_back(',');
00500 
00501       const FieldMetadata &field_metadata= header.field_metadata(y);
00502       
00503       should_quote_field_value= shouldQuoteFieldValue(field_metadata.type());
00504 
00505       if (should_quote_field_value)
00506         destination.push_back('\'');
00507 
00508       if (field_metadata.type() == Table::Field::BLOB)
00509       {
00510         /* 
00511          * We do this here because BLOB data is returned
00512          * in a string correctly, but calling append()
00513          * without a length will result in only the string
00514          * up to a \0 being output here.
00515          */
00516         string raw_data(data.record(x).insert_value(y));
00517         destination.append(raw_data.c_str(), raw_data.size());
00518       }
00519       else
00520       {
00521         string tmp(data.record(x).insert_value(y));
00522         escapeEmbeddedQuotes(tmp);
00523         destination.append(tmp);
00524       }
00525 
00526       if (should_quote_field_value)
00527         destination.push_back('\'');
00528     }
00529   }
00530   destination.push_back(')');
00531 
00532   return error;
00533 }
00534 
00535 enum TransformSqlError
00536 transformUpdateHeaderToSql(const UpdateHeader &header,
00537                            string &destination,
00538                            enum TransformSqlVariant sql_variant)
00539 {
00540   char quoted_identifier= '`';
00541   if (sql_variant == ANSI)
00542     quoted_identifier= '"';
00543 
00544   destination.assign("UPDATE ", 7);
00545   destination.push_back(quoted_identifier);
00546   destination.append(header.table_metadata().schema_name());
00547   destination.push_back(quoted_identifier);
00548   destination.push_back('.');
00549   destination.push_back(quoted_identifier);
00550   destination.append(header.table_metadata().table_name());
00551   destination.push_back(quoted_identifier);
00552   destination.append(" SET ", 5);
00553 
00554   return NONE;
00555 }
00556 
00557 enum TransformSqlError
00558 transformUpdateRecordToSql(const UpdateHeader &header,
00559                            const UpdateRecord &record,
00560                            string &destination,
00561                            enum TransformSqlVariant sql_variant)
00562 {
00563   enum TransformSqlError error= transformUpdateHeaderToSql(header,
00564                                                            destination,
00565                                                            sql_variant);
00566 
00567   char quoted_identifier= '`';
00568   if (sql_variant == ANSI)
00569     quoted_identifier= '"';
00570 
00571   /* Add field SET list to SQL string... */
00572   size_t num_set_fields= header.set_field_metadata_size();
00573   size_t x;
00574   bool should_quote_field_value= false;
00575 
00576   for (x= 0; x < num_set_fields; ++x)
00577   {
00578     const FieldMetadata &field_metadata= header.set_field_metadata(x);
00579     if (x != 0)
00580       destination.push_back(',');
00581     
00582     destination.push_back(quoted_identifier);
00583     destination.append(field_metadata.name());
00584     destination.push_back(quoted_identifier);
00585     destination.push_back('=');
00586 
00587     if (record.is_null(x))
00588     {
00589       should_quote_field_value= false;
00590     }
00591     else 
00592     {
00593       should_quote_field_value= shouldQuoteFieldValue(field_metadata.type());
00594     }    
00595 
00596     if (should_quote_field_value)
00597       destination.push_back('\'');
00598 
00599     if (record.is_null(x))
00600     {
00601       destination.append("NULL");
00602     }
00603     else 
00604     {
00605       if (field_metadata.type() == Table::Field::BLOB)
00606       {
00607         /*
00608          * We do this here because BLOB data is returned
00609          * in a string correctly, but calling append()
00610          * without a length will result in only the string
00611          * up to a \0 being output here.
00612          */
00613         string raw_data(record.after_value(x));
00614         destination.append(raw_data.c_str(), raw_data.size());
00615       }
00616       else 
00617       {
00618         string tmp(record.after_value(x));
00619         escapeEmbeddedQuotes(tmp);
00620         destination.append(tmp);
00621       }
00622     }
00623 
00624     if (should_quote_field_value)
00625       destination.push_back('\'');
00626   }
00627 
00628   size_t num_key_fields= header.key_field_metadata_size();
00629 
00630   destination.append(" WHERE ", 7);
00631   for (x= 0; x < num_key_fields; ++x) 
00632   {
00633     const FieldMetadata &field_metadata= header.key_field_metadata(x);
00634     
00635     if (x != 0)
00636       destination.append(" AND ", 5); /* Always AND condition with a multi-column PK */
00637 
00638     destination.push_back(quoted_identifier);
00639     destination.append(field_metadata.name());
00640     destination.push_back(quoted_identifier);
00641 
00642     destination.push_back('=');
00643 
00644     should_quote_field_value= shouldQuoteFieldValue(field_metadata.type());
00645 
00646     if (should_quote_field_value)
00647       destination.push_back('\'');
00648 
00649     if (field_metadata.type() == Table::Field::BLOB)
00650     {
00651       /* 
00652        * We do this here because BLOB data is returned
00653        * in a string correctly, but calling append()
00654        * without a length will result in only the string
00655        * up to a \0 being output here.
00656        */
00657       string raw_data(record.key_value(x));
00658       destination.append(raw_data.c_str(), raw_data.size());
00659     }
00660     else
00661     {
00662       destination.append(record.key_value(x));
00663     }
00664 
00665     if (should_quote_field_value)
00666       destination.push_back('\'');
00667   }
00668 
00669   return error;
00670 }
00671 
00672 enum TransformSqlError
00673 transformDeleteHeaderToSql(const DeleteHeader &header,
00674                            string &destination,
00675                            enum TransformSqlVariant sql_variant)
00676 {
00677   char quoted_identifier= '`';
00678   if (sql_variant == ANSI)
00679     quoted_identifier= '"';
00680 
00681   destination.assign("DELETE FROM ", 12);
00682   destination.push_back(quoted_identifier);
00683   destination.append(header.table_metadata().schema_name());
00684   destination.push_back(quoted_identifier);
00685   destination.push_back('.');
00686   destination.push_back(quoted_identifier);
00687   destination.append(header.table_metadata().table_name());
00688   destination.push_back(quoted_identifier);
00689 
00690   return NONE;
00691 }
00692 
00693 enum TransformSqlError
00694 transformDeleteRecordToSql(const DeleteHeader &header,
00695                            const DeleteRecord &record,
00696                            string &destination,
00697                            enum TransformSqlVariant sql_variant)
00698 {
00699   enum TransformSqlError error= transformDeleteHeaderToSql(header,
00700                                                            destination,
00701                                                            sql_variant);
00702   char quoted_identifier= '`';
00703   if (sql_variant == ANSI)
00704     quoted_identifier= '"';
00705 
00706   /* Add WHERE clause to SQL string... */
00707   uint32_t num_key_fields= header.key_field_metadata_size();
00708   uint32_t x;
00709   bool should_quote_field_value= false;
00710 
00711   destination.append(" WHERE ", 7);
00712   for (x= 0; x < num_key_fields; ++x) 
00713   {
00714     const FieldMetadata &field_metadata= header.key_field_metadata(x);
00715     
00716     if (x != 0)
00717       destination.append(" AND ", 5); /* Always AND condition with a multi-column PK */
00718 
00719     destination.push_back(quoted_identifier);
00720     destination.append(field_metadata.name());
00721     destination.push_back(quoted_identifier);
00722 
00723     destination.push_back('=');
00724 
00725     should_quote_field_value= shouldQuoteFieldValue(field_metadata.type());
00726 
00727     if (should_quote_field_value)
00728       destination.push_back('\'');
00729 
00730     if (field_metadata.type() == Table::Field::BLOB)
00731     {
00732       /* 
00733        * We do this here because BLOB data is returned
00734        * in a string correctly, but calling append()
00735        * without a length will result in only the string
00736        * up to a \0 being output here.
00737        */
00738       string raw_data(record.key_value(x));
00739       destination.append(raw_data.c_str(), raw_data.size());
00740     }
00741     else
00742     {
00743       string tmp(record.key_value(x));
00744       escapeEmbeddedQuotes(tmp);
00745       destination.append(tmp);
00746     }
00747 
00748     if (should_quote_field_value)
00749       destination.push_back('\'');
00750   }
00751 
00752   return error;
00753 }
00754 
00755 enum TransformSqlError
00756 transformDeleteStatementToSql(const DeleteHeader &header,
00757                               const DeleteData &data,
00758                               string &destination,
00759                               enum TransformSqlVariant sql_variant)
00760 {
00761   enum TransformSqlError error= transformDeleteHeaderToSql(header,
00762                                                            destination,
00763                                                            sql_variant);
00764   char quoted_identifier= '`';
00765   if (sql_variant == ANSI)
00766     quoted_identifier= '"';
00767 
00768   /* Add WHERE clause to SQL string... */
00769   uint32_t num_key_fields= header.key_field_metadata_size();
00770   uint32_t num_key_records= data.record_size();
00771   uint32_t x, y;
00772   bool should_quote_field_value= false;
00773 
00774   destination.append(" WHERE ", 7);
00775   for (x= 0; x < num_key_records; ++x)
00776   {
00777     if (x != 0)
00778       destination.append(" OR ", 4); /* Always OR condition for multiple key records */
00779 
00780     if (num_key_fields > 1)
00781       destination.push_back('(');
00782 
00783     for (y= 0; y < num_key_fields; ++y) 
00784     {
00785       const FieldMetadata &field_metadata= header.key_field_metadata(y);
00786       
00787       if (y != 0)
00788         destination.append(" AND ", 5); /* Always AND condition with a multi-column PK */
00789 
00790       destination.push_back(quoted_identifier);
00791       destination.append(field_metadata.name());
00792       destination.push_back(quoted_identifier);
00793 
00794       destination.push_back('=');
00795 
00796       should_quote_field_value= shouldQuoteFieldValue(field_metadata.type());
00797 
00798       if (should_quote_field_value)
00799         destination.push_back('\'');
00800 
00801       if (field_metadata.type() == Table::Field::BLOB)
00802       {
00803         /* 
00804          * We do this here because BLOB data is returned
00805          * in a string correctly, but calling append()
00806          * without a length will result in only the string
00807          * up to a \0 being output here.
00808          */
00809         string raw_data(data.record(x).key_value(y));
00810         destination.append(raw_data.c_str(), raw_data.size());
00811       }
00812       else
00813       {
00814         string tmp(data.record(x).key_value(y));
00815         escapeEmbeddedQuotes(tmp);
00816         destination.append(tmp);
00817       }
00818 
00819       if (should_quote_field_value)
00820         destination.push_back('\'');
00821     }
00822     if (num_key_fields > 1)
00823       destination.push_back(')');
00824   }
00825   return error;
00826 }
00827 
00828 enum TransformSqlError
00829 transformAlterSchemaStatementToSql(const AlterSchemaStatement &statement,
00830                                    string &destination,
00831                                    enum TransformSqlVariant sql_variant)
00832 {
00833   const Schema &before= statement.before();
00834   const Schema &after= statement.after();
00835 
00836   /* Make sure we are given the before and after for the same object */
00837   if (before.uuid() != after.uuid())
00838     return UUID_MISMATCH;
00839 
00840   char quoted_identifier= '`';
00841   if (sql_variant == ANSI)
00842     quoted_identifier= '"';
00843 
00844   destination.append("ALTER SCHEMA ");
00845   destination.push_back(quoted_identifier);
00846   destination.append(before.name());
00847   destination.push_back(quoted_identifier);
00848 
00849   /*
00850    * Diff our schemas. Currently, only collation can change so a
00851    * diff of the two structures is not really necessary.
00852    */
00853   destination.append(" COLLATE = ");
00854   destination.append(after.collation());
00855 
00856   return NONE;
00857 }
00858 
00859 enum TransformSqlError
00860 transformDropSchemaStatementToSql(const DropSchemaStatement &statement,
00861                                   string &destination,
00862                                   enum TransformSqlVariant sql_variant)
00863 {
00864   char quoted_identifier= '`';
00865   if (sql_variant == ANSI)
00866     quoted_identifier= '"';
00867 
00868   destination.append("DROP SCHEMA ", 12);
00869   destination.push_back(quoted_identifier);
00870   destination.append(statement.schema_name());
00871   destination.push_back(quoted_identifier);
00872 
00873   return NONE;
00874 }
00875 
00876 enum TransformSqlError
00877 transformCreateSchemaStatementToSql(const CreateSchemaStatement &statement,
00878                                     string &destination,
00879                                     enum TransformSqlVariant sql_variant)
00880 {
00881   char quoted_identifier= '`';
00882   if (sql_variant == ANSI)
00883     quoted_identifier= '"';
00884 
00885   const Schema &schema= statement.schema();
00886 
00887   destination.append("CREATE SCHEMA ");
00888   destination.push_back(quoted_identifier);
00889   destination.append(schema.name());
00890   destination.push_back(quoted_identifier);
00891 
00892   if (schema.has_collation())
00893   {
00894     destination.append(" COLLATE ");
00895     destination.append(schema.collation());
00896   }
00897 
00898   if (not message::is_replicated(schema))
00899   {
00900     destination.append(" REPLICATE = FALSE");
00901   }
00902 
00903   return NONE;
00904 }
00905 
00906 enum TransformSqlError
00907 transformDropTableStatementToSql(const DropTableStatement &statement,
00908                                  string &destination,
00909                                  enum TransformSqlVariant sql_variant)
00910 {
00911   char quoted_identifier= '`';
00912   if (sql_variant == ANSI)
00913     quoted_identifier= '"';
00914 
00915   const TableMetadata &table_metadata= statement.table_metadata();
00916 
00917   destination.append("DROP TABLE ");
00918 
00919   /* Add the IF EXISTS clause if necessary */
00920   if (statement.has_if_exists_clause() &&
00921       statement.if_exists_clause() == true)
00922   {
00923     destination.append("IF EXISTS ");
00924   }
00925 
00926   destination.push_back(quoted_identifier);
00927   destination.append(table_metadata.schema_name());
00928   destination.push_back(quoted_identifier);
00929   destination.push_back('.');
00930   destination.push_back(quoted_identifier);
00931   destination.append(table_metadata.table_name());
00932   destination.push_back(quoted_identifier);
00933 
00934   return NONE;
00935 }
00936 
00937 enum TransformSqlError
00938 transformTruncateTableStatementToSql(const TruncateTableStatement &statement,
00939                                      string &destination,
00940                                      enum TransformSqlVariant sql_variant)
00941 {
00942   char quoted_identifier= '`';
00943   if (sql_variant == ANSI)
00944     quoted_identifier= '"';
00945 
00946   const TableMetadata &table_metadata= statement.table_metadata();
00947 
00948   destination.append("TRUNCATE TABLE ");
00949   destination.push_back(quoted_identifier);
00950   destination.append(table_metadata.schema_name());
00951   destination.push_back(quoted_identifier);
00952   destination.push_back('.');
00953   destination.push_back(quoted_identifier);
00954   destination.append(table_metadata.table_name());
00955   destination.push_back(quoted_identifier);
00956 
00957   return NONE;
00958 }
00959 
00960 enum TransformSqlError
00961 transformSetVariableStatementToSql(const SetVariableStatement &statement,
00962                                    string &destination,
00963                                    enum TransformSqlVariant sql_variant)
00964 {
00965   (void) sql_variant;
00966   const FieldMetadata &variable_metadata= statement.variable_metadata();
00967   bool should_quote_field_value= shouldQuoteFieldValue(variable_metadata.type());
00968 
00969   destination.append("SET GLOBAL "); /* Only global variables are replicated */
00970   destination.append(variable_metadata.name());
00971   destination.push_back('=');
00972 
00973   if (should_quote_field_value)
00974     destination.push_back('\'');
00975   
00976   destination.append(statement.variable_value());
00977 
00978   if (should_quote_field_value)
00979     destination.push_back('\'');
00980 
00981   return NONE;
00982 }
00983 
00984 enum TransformSqlError
00985 transformCreateTableStatementToSql(const CreateTableStatement &statement,
00986                                    string &destination,
00987                                    enum TransformSqlVariant sql_variant)
00988 {
00989   return transformTableDefinitionToSql(statement.table(), destination, sql_variant);
00990 }
00991 
00992 enum TransformSqlError
00993 transformTableDefinitionToSql(const Table &table,
00994                               string &destination,
00995                               enum TransformSqlVariant sql_variant, bool with_schema)
00996 {
00997   char quoted_identifier= '`';
00998   if (sql_variant == ANSI)
00999     quoted_identifier= '"';
01000 
01001   destination.append("CREATE ");
01002 
01003   if (table.type() == Table::TEMPORARY)
01004     destination.append("TEMPORARY ");
01005   
01006   destination.append("TABLE ");
01007   if (with_schema)
01008   {
01009     append_escaped_string(&destination, table.schema(), quoted_identifier);
01010     destination.push_back('.');
01011   }
01012   append_escaped_string(&destination, table.name(), quoted_identifier);
01013   destination.append(" (\n");
01014 
01015   enum TransformSqlError result= NONE;
01016   size_t num_fields= table.field_size();
01017   for (size_t x= 0; x < num_fields; ++x)
01018   {
01019     const Table::Field &field= table.field(x);
01020 
01021     if (x != 0)
01022       destination.append(",\n");
01023 
01024     destination.append("  ");
01025 
01026     result= transformFieldDefinitionToSql(field, destination, sql_variant);
01027 
01028     if (result != NONE)
01029       return result;
01030   }
01031 
01032   size_t num_indexes= table.indexes_size();
01033   
01034   if (num_indexes > 0)
01035     destination.append(",\n");
01036 
01037   for (size_t x= 0; x < num_indexes; ++x)
01038   {
01039     const message::Table::Index &index= table.indexes(x);
01040 
01041     if (x != 0)
01042       destination.append(",\n");
01043 
01044     result= transformIndexDefinitionToSql(index, table, destination, sql_variant);
01045     
01046     if (result != NONE)
01047       return result;
01048   }
01049 
01050   size_t num_foreign_keys= table.fk_constraint_size();
01051 
01052   if (num_foreign_keys > 0)
01053     destination.append(",\n");
01054 
01055   for (size_t x= 0; x < num_foreign_keys; ++x)
01056   {
01057     const message::Table::ForeignKeyConstraint &fkey= table.fk_constraint(x);
01058 
01059     if (x != 0)
01060       destination.append(",\n");
01061 
01062     result= transformForeignKeyConstraintDefinitionToSql(fkey, table, destination, sql_variant);
01063 
01064     if (result != NONE)
01065       return result;
01066   }
01067 
01068   destination.append("\n)");
01069 
01070   /* Add ENGINE = " clause */
01071   if (table.has_engine())
01072   {
01073     destination.append(" ENGINE=");
01074     destination.append(table.engine().name());
01075 
01076     size_t num_engine_options= table.engine().options_size();
01077     if (num_engine_options > 0)
01078       destination.append(" ", 1);
01079     for (size_t x= 0; x < num_engine_options; ++x)
01080     {
01081       const Engine::Option &option= table.engine().options(x);
01082       destination.append(option.name());
01083       destination.append("='");
01084       destination.append(option.state());
01085       destination.append("'");
01086       if (x != num_engine_options-1)
01087       {
01088         destination.append(", ");
01089       }
01090     }
01091   }
01092 
01093   if (table.has_options())
01094     (void) transformTableOptionsToSql(table.options(), destination, sql_variant);
01095 
01096   if (not message::is_replicated(table))
01097   {
01098     destination.append(" REPLICATE = FALSE");
01099   }
01100 
01101   return NONE;
01102 }
01103 
01104 enum TransformSqlError
01105 transformTableOptionsToSql(const Table::TableOptions &options,
01106                            string &destination,
01107                            enum TransformSqlVariant sql_variant)
01108 {
01109   if (sql_variant == ANSI)
01110     return NONE; /* ANSI does not support table options... */
01111 
01112   if (options.has_comment())
01113   {
01114     destination.append(" COMMENT=");
01115     append_escaped_string(&destination, options.comment());
01116   }
01117 
01118   if (options.has_collation())
01119   {
01120     destination.append(" COLLATE = ");
01121     destination.append(options.collation());
01122   }
01123 
01124   if (options.has_data_file_name())
01125   {
01126     destination.append("\nDATA_FILE_NAME = '");
01127     destination.append(options.data_file_name());
01128     destination.push_back('\'');
01129   }
01130 
01131   if (options.has_index_file_name())
01132   {
01133     destination.append("\nINDEX_FILE_NAME = '");
01134     destination.append(options.index_file_name());
01135     destination.push_back('\'');
01136   }
01137 
01138   if (options.has_max_rows())
01139   {
01140     destination.append("\nMAX_ROWS = ");
01141     destination.append(boost::lexical_cast<string>(options.max_rows()));
01142   }
01143 
01144   if (options.has_min_rows())
01145   {
01146     destination.append("\nMIN_ROWS = ");
01147     destination.append(boost::lexical_cast<string>(options.min_rows()));
01148   }
01149 
01150   if (options.has_user_set_auto_increment_value()
01151       && options.has_auto_increment_value())
01152   {
01153     destination.append(" AUTO_INCREMENT=");
01154     destination.append(boost::lexical_cast<string>(options.auto_increment_value()));
01155   }
01156 
01157   if (options.has_avg_row_length())
01158   {
01159     destination.append("\nAVG_ROW_LENGTH = ");
01160     destination.append(boost::lexical_cast<string>(options.avg_row_length()));
01161   }
01162 
01163   if (options.has_checksum() && options.checksum())
01164     destination.append("\nCHECKSUM = TRUE");
01165 
01166   if (options.has_page_checksum() && options.page_checksum())
01167     destination.append("\nPAGE_CHECKSUM = TRUE");
01168 
01169   return NONE;
01170 }
01171 
01172 enum TransformSqlError
01173 transformIndexDefinitionToSql(const Table::Index &index,
01174                               const Table &table,
01175                               string &destination,
01176                               enum TransformSqlVariant sql_variant)
01177 {
01178   char quoted_identifier= '`';
01179   if (sql_variant == ANSI)
01180     quoted_identifier= '"';
01181 
01182   destination.append("  ", 2);
01183 
01184   if (index.is_primary())
01185     destination.append("PRIMARY ");
01186   else if (index.is_unique())
01187     destination.append("UNIQUE ");
01188 
01189   destination.append("KEY ", 4);
01190   if (! (index.is_primary() && index.name().compare("PRIMARY")==0))
01191   {
01192     destination.push_back(quoted_identifier);
01193     destination.append(index.name());
01194     destination.push_back(quoted_identifier);
01195     destination.append(" (", 2);
01196   }
01197   else
01198     destination.append("(", 1);
01199 
01200   size_t num_parts= index.index_part_size();
01201   for (size_t x= 0; x < num_parts; ++x)
01202   {
01203     const Table::Index::IndexPart &part= index.index_part(x);
01204     const Table::Field &field= table.field(part.fieldnr());
01205 
01206     if (x != 0)
01207       destination.push_back(',');
01208     
01209     destination.push_back(quoted_identifier);
01210     destination.append(field.name());
01211     destination.push_back(quoted_identifier);
01212 
01213     /* 
01214      * If the index part's field type is VARCHAR or TEXT
01215      * then check for a prefix length then is different
01216      * from the field's full length...
01217      */
01218     if (field.type() == Table::Field::VARCHAR ||
01219         field.type() == Table::Field::BLOB)
01220     {
01221       if (part.has_compare_length())
01222       {
01223         if (part.compare_length() != field.string_options().length())
01224         {
01225           destination.push_back('(');
01226           destination.append(boost::lexical_cast<string>(part.compare_length()));
01227           destination.push_back(')');
01228         }
01229       }
01230     }
01231   }
01232   destination.push_back(')');
01233 
01234   switch (index.type())
01235   {
01236   case Table::Index::UNKNOWN_INDEX:
01237     break;
01238   case Table::Index::BTREE:
01239     destination.append(" USING BTREE");
01240     break;
01241   case Table::Index::RTREE:
01242     destination.append(" USING RTREE");
01243     break;
01244   case Table::Index::HASH:
01245     destination.append(" USING HASH");
01246     break;
01247   case Table::Index::FULLTEXT:
01248     destination.append(" USING FULLTEXT");
01249     break;
01250   }
01251 
01252   if (index.has_comment())
01253   {
01254     destination.append(" COMMENT ");
01255     append_escaped_string(&destination, index.comment());
01256   }
01257 
01258   return NONE;
01259 }
01260 
01261 static void transformForeignKeyOptionToSql(Table::ForeignKeyConstraint::ForeignKeyOption opt, string &destination)
01262 {
01263   switch (opt)
01264   {
01265   case Table::ForeignKeyConstraint::OPTION_RESTRICT:
01266     destination.append("RESTRICT");
01267     break;
01268   case Table::ForeignKeyConstraint::OPTION_CASCADE:
01269     destination.append("CASCADE");
01270     break;
01271   case Table::ForeignKeyConstraint::OPTION_SET_NULL:
01272     destination.append("SET NULL");
01273     break;
01274   case Table::ForeignKeyConstraint::OPTION_UNDEF:
01275   case Table::ForeignKeyConstraint::OPTION_NO_ACTION:
01276     destination.append("NO ACTION");
01277     break;
01278   case Table::ForeignKeyConstraint::OPTION_SET_DEFAULT:
01279     destination.append("SET DEFAULT");
01280     break;
01281   }
01282 }
01283 
01284 enum TransformSqlError
01285 transformForeignKeyConstraintDefinitionToSql(const Table::ForeignKeyConstraint &fkey,
01286                                              const Table &,
01287                                              string &destination,
01288                                              enum TransformSqlVariant sql_variant)
01289 {
01290   char quoted_identifier= '`';
01291   if (sql_variant == ANSI)
01292     quoted_identifier= '"';
01293 
01294   destination.append("  ");
01295 
01296   if (fkey.has_name())
01297   {
01298     destination.append("CONSTRAINT ");
01299     append_escaped_string(&destination, fkey.name(), quoted_identifier);
01300     destination.append(" ", 1);
01301   }
01302 
01303   destination.append("FOREIGN KEY (");
01304 
01305   for (ssize_t x= 0; x < fkey.column_names_size(); ++x)
01306   {
01307     if (x != 0)
01308       destination.append(", ");
01309 
01310     append_escaped_string(&destination, fkey.column_names(x),
01311                           quoted_identifier);
01312   }
01313 
01314   destination.append(") REFERENCES ");
01315 
01316   append_escaped_string(&destination, fkey.references_table_name(),
01317                         quoted_identifier);
01318   destination.append(" (");
01319 
01320   for (ssize_t x= 0; x < fkey.references_columns_size(); ++x)
01321   {
01322     if (x != 0)
01323       destination.append(", ");
01324 
01325     append_escaped_string(&destination, fkey.references_columns(x),
01326                           quoted_identifier);
01327   }
01328 
01329   destination.push_back(')');
01330 
01331   if (fkey.has_update_option() and fkey.update_option() != Table::ForeignKeyConstraint::OPTION_UNDEF)
01332   {
01333     destination.append(" ON UPDATE ");
01334     transformForeignKeyOptionToSql(fkey.update_option(), destination);
01335   }
01336 
01337   if (fkey.has_delete_option() and fkey.delete_option() != Table::ForeignKeyConstraint::OPTION_UNDEF)
01338   {
01339     destination.append(" ON DELETE ");
01340     transformForeignKeyOptionToSql(fkey.delete_option(), destination);
01341   }
01342 
01343   return NONE;
01344 }
01345 
01346 enum TransformSqlError
01347 transformFieldDefinitionToSql(const Table::Field &field,
01348                               string &destination,
01349                               enum TransformSqlVariant sql_variant)
01350 {
01351   char quoted_identifier= '`';
01352   char quoted_default;
01353 
01354   if (sql_variant == ANSI)
01355     quoted_identifier= '"';
01356 
01357   if (sql_variant == DRIZZLE)
01358     quoted_default= '\'';
01359   else
01360     quoted_default= quoted_identifier;
01361 
01362   append_escaped_string(&destination, field.name(), quoted_identifier);
01363 
01364   Table::Field::FieldType field_type= field.type();
01365 
01366   switch (field_type)
01367   {
01368     case Table::Field::DOUBLE:
01369     destination.append(" DOUBLE");
01370     if (field.has_numeric_options()
01371         && field.numeric_options().has_precision())
01372     {
01373       stringstream ss;
01374       ss << "(" << field.numeric_options().precision() << ",";
01375       ss << field.numeric_options().scale() << ")";
01376       destination.append(ss.str());
01377     }
01378     break;
01379   case Table::Field::VARCHAR:
01380     {
01381       if (field.string_options().has_collation()
01382           && field.string_options().collation().compare("binary") == 0)
01383         destination.append(" VARBINARY(");
01384       else
01385         destination.append(" VARCHAR(");
01386 
01387       destination.append(boost::lexical_cast<string>(field.string_options().length()));
01388       destination.append(")");
01389     }
01390     break;
01391   case Table::Field::BLOB:
01392     {
01393       if (field.string_options().has_collation()
01394           && field.string_options().collation().compare("binary") == 0)
01395         destination.append(" BLOB");
01396       else
01397         destination.append(" TEXT");
01398     }
01399     break;
01400   case Table::Field::ENUM:
01401     {
01402       size_t num_field_values= field.enumeration_values().field_value_size();
01403       destination.append(" ENUM(");
01404       for (size_t x= 0; x < num_field_values; ++x)
01405       {
01406         const string &type= field.enumeration_values().field_value(x);
01407 
01408         if (x != 0)
01409           destination.push_back(',');
01410 
01411         destination.push_back('\'');
01412         destination.append(type);
01413         destination.push_back('\'');
01414       }
01415       destination.push_back(')');
01416       break;
01417     }
01418   case Table::Field::UUID:
01419     destination.append(" UUID");
01420     break;
01421   case Table::Field::BOOLEAN:
01422     destination.append(" BOOLEAN");
01423     break;
01424   case Table::Field::INTEGER:
01425     destination.append(" INT");
01426     break;
01427   case Table::Field::BIGINT:
01428     if (field.has_constraints() and
01429         field.constraints().is_unsigned())
01430     {
01431       destination.append(" BIGINT UNSIGNED");
01432     }
01433     else
01434     {
01435       destination.append(" BIGINT");
01436     }
01437     break;
01438   case Table::Field::DECIMAL:
01439     {
01440       destination.append(" DECIMAL(");
01441       stringstream ss;
01442       ss << field.numeric_options().precision() << ",";
01443       ss << field.numeric_options().scale() << ")";
01444       destination.append(ss.str());
01445     }
01446     break;
01447   case Table::Field::DATE:
01448     destination.append(" DATE");
01449     break;
01450 
01451   case Table::Field::EPOCH:
01452     if (field.time_options().microseconds())
01453     {
01454       destination.append(" TIMESTAMP(6)");
01455     }
01456     else
01457     {
01458       destination.append(" TIMESTAMP");
01459     }
01460     break;
01461 
01462   case Table::Field::DATETIME:
01463     destination.append(" DATETIME");
01464     break;
01465   case Table::Field::TIME:
01466     destination.append(" TIME");
01467     break;
01468   }
01469 
01470   if (field.type() == Table::Field::BLOB ||
01471       field.type() == Table::Field::VARCHAR)
01472   {
01473     if (field.string_options().has_collation()
01474         && field.string_options().collation().compare("binary"))
01475     {
01476       destination.append(" COLLATE ");
01477       destination.append(field.string_options().collation());
01478     }
01479   }
01480 
01481   if (field.has_constraints() and field.constraints().is_unique())
01482   {
01483     destination.append(" UNIQUE");
01484   }
01485 
01486   if (field.has_constraints() && field.constraints().is_notnull())
01487   {
01488     destination.append(" NOT NULL");
01489   }
01490   else if (field.type() == Table::Field::EPOCH)
01491   {
01492     destination.append(" NULL");
01493   }
01494 
01495   if (field.type() == Table::Field::INTEGER || 
01496       field.type() == Table::Field::BIGINT)
01497   {
01498     /* AUTO_INCREMENT must be after NOT NULL */
01499     if (field.has_numeric_options() &&
01500         field.numeric_options().is_autoincrement())
01501     {
01502       destination.append(" AUTO_INCREMENT");
01503     }
01504   }
01505 
01506   if (field.options().has_default_value())
01507   {
01508     destination.append(" DEFAULT ");
01509     append_escaped_string(&destination, field.options().default_value());
01510   }
01511   else if (field.options().has_default_expression())
01512   {
01513     destination.append(" DEFAULT ");
01514     destination.append(field.options().default_expression());
01515   }
01516   else if (field.options().has_default_bin_value())
01517   {
01518     const string &v= field.options().default_bin_value();
01519     if (v.length() == 0)
01520     {
01521       destination.append(" DEFAULT ''");
01522     }
01523     else
01524     {
01525       destination.append(" DEFAULT 0x");
01526       for (size_t x= 0; x < v.length(); x++)
01527       {
01528         char hex[3];
01529         snprintf(hex, sizeof(hex), "%.2X", *(v.c_str() + x));
01530         destination.append(hex, 2);
01531       }
01532     }
01533   }
01534   else if (field.options().has_default_null()
01535            && field.options().default_null()
01536            && field.type() != Table::Field::BLOB)
01537   {
01538     destination.append(" DEFAULT NULL");
01539   }
01540 
01541   if (field.has_options() && field.options().has_update_expression())
01542   {
01543     destination.append(" ON UPDATE ");
01544     destination.append(field.options().update_expression());
01545   }
01546 
01547   if (field.has_comment())
01548   {
01549     destination.append(" COMMENT ");
01550     append_escaped_string(&destination, field.comment(), quoted_default);
01551   }
01552   return NONE;
01553 }
01554 
01555 bool shouldQuoteFieldValue(Table::Field::FieldType in_type)
01556 {
01557   switch (in_type)
01558   {
01559   case Table::Field::DOUBLE:
01560   case Table::Field::DECIMAL:
01561   case Table::Field::INTEGER:
01562   case Table::Field::BIGINT:
01563     return false;
01564   default:
01565     return true;
01566   } 
01567 }
01568 
01569 Table::Field::FieldType internalFieldTypeToFieldProtoType(enum enum_field_types type)
01570 {
01571   switch (type) {
01572   case DRIZZLE_TYPE_LONG:
01573     return Table::Field::INTEGER;
01574   case DRIZZLE_TYPE_DOUBLE:
01575     return Table::Field::DOUBLE;
01576   case DRIZZLE_TYPE_NULL:
01577     assert(false); /* Not a user definable type */
01578     return Table::Field::INTEGER; /* unreachable */
01579   case DRIZZLE_TYPE_MICROTIME:
01580   case DRIZZLE_TYPE_TIMESTAMP:
01581     return Table::Field::EPOCH;
01582   case DRIZZLE_TYPE_LONGLONG:
01583     return Table::Field::BIGINT;
01584   case DRIZZLE_TYPE_DATETIME:
01585     return Table::Field::DATETIME;
01586   case DRIZZLE_TYPE_TIME:
01587     return Table::Field::TIME;
01588   case DRIZZLE_TYPE_DATE:
01589     return Table::Field::DATE;
01590   case DRIZZLE_TYPE_VARCHAR:
01591     return Table::Field::VARCHAR;
01592   case DRIZZLE_TYPE_DECIMAL:
01593     return Table::Field::DECIMAL;
01594   case DRIZZLE_TYPE_ENUM:
01595     return Table::Field::ENUM;
01596   case DRIZZLE_TYPE_BLOB:
01597     return Table::Field::BLOB;
01598   case DRIZZLE_TYPE_UUID:
01599     return Table::Field::UUID;
01600   case DRIZZLE_TYPE_BOOLEAN:
01601     return Table::Field::BOOLEAN;
01602   }
01603 
01604   assert(false);
01605   return Table::Field::INTEGER; /* unreachable */
01606 }
01607 
01608 bool transactionContainsBulkSegment(const Transaction &transaction)
01609 {
01610   size_t num_statements= transaction.statement_size();
01611   if (num_statements == 0)
01612     return false;
01613 
01614   /*
01615    * Only INSERT, UPDATE, and DELETE statements can possibly
01616    * have bulk segments.  So, we loop through the statements
01617    * checking for segment_id > 1 in those specific submessages.
01618    */
01619   size_t x;
01620   for (x= 0; x < num_statements; ++x)
01621   {
01622     const Statement &statement= transaction.statement(x);
01623     Statement::Type type= statement.type();
01624 
01625     switch (type)
01626     {
01627       case Statement::INSERT:
01628         if (statement.insert_data().segment_id() > 1)
01629           return true;
01630         break;
01631       case Statement::UPDATE:
01632         if (statement.update_data().segment_id() > 1)
01633           return true;
01634         break;
01635       case Statement::DELETE:
01636         if (statement.delete_data().segment_id() > 1)
01637           return true;
01638         break;
01639       default:
01640         break;
01641     }
01642   }
01643   return false;
01644 }
01645 
01646 } /* namespace message */
01647 } /* namespace drizzled */