00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00079 #include <config.h>
00080
00081 #include <fcntl.h>
00082
00083 #include <drizzled/error.h>
00084 #include <drizzled/thr_lock.h>
00085 #include <drizzled/session.h>
00086 #include <drizzled/sql_base.h>
00087 #include <drizzled/lock.h>
00088 #include <drizzled/pthread_globals.h>
00089 #include <drizzled/internal/my_sys.h>
00090 #include <drizzled/pthread_globals.h>
00091 #include <drizzled/refresh_version.h>
00092 #include <drizzled/plugin/storage_engine.h>
00093
00094 #include <set>
00095 #include <vector>
00096 #include <algorithm>
00097 #include <functional>
00098
00099 #include <boost/thread/shared_mutex.hpp>
00100 #include <boost/thread/condition_variable.hpp>
00101
00102 using namespace std;
00103
00104 namespace drizzled
00105 {
00106
00107 static boost::mutex LOCK_global_read_lock;
00108 static boost::condition_variable_any COND_global_read_lock;
00109
00115 static void print_lock_error(int error, const char *);
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139 static drizzled::error_t thr_lock_errno_to_mysql[]=
00140 { EE_OK, EE_ERROR_FIRST, ER_LOCK_WAIT_TIMEOUT, ER_LOCK_DEADLOCK };
00141
00142
00163 static void reset_lock_data_and_free(DrizzleLock **mysql_lock)
00164 {
00165 DrizzleLock *sql_lock= *mysql_lock;
00166 sql_lock->reset();
00167 delete sql_lock;
00168 *mysql_lock= 0;
00169 }
00170
00171 DrizzleLock *Session::lockTables(Table **tables, uint32_t count, uint32_t flags)
00172 {
00173 DrizzleLock *sql_lock;
00174 Table *write_lock_used;
00175 vector<plugin::StorageEngine *> involved_engines;
00176
00177 do
00178 {
00179 if (! (sql_lock= get_lock_data(tables, count, true, &write_lock_used)))
00180 break;
00181
00182 if (global_read_lock && write_lock_used and (not (flags & DRIZZLE_LOCK_IGNORE_GLOBAL_READ_LOCK)))
00183 {
00184
00185
00186
00187
00188 if (wait_if_global_read_lock(1, 1))
00189 {
00190
00191 reset_lock_data_and_free(&sql_lock);
00192 break;
00193 }
00194
00195 if (version != refresh_version)
00196 {
00197
00198 reset_lock_data_and_free(&sql_lock);
00199 break;
00200 }
00201 }
00202
00203 set_proc_info("Notify start statement");
00204
00205
00206
00207
00208 if (sql_lock->sizeTable())
00209 {
00210 size_t num_tables= sql_lock->sizeTable();
00211 plugin::StorageEngine *engine;
00212 std::set<size_t> involved_slots;
00213
00214 for (size_t x= 1; x <= num_tables; x++, tables++)
00215 {
00216 engine= (*tables)->cursor->getEngine();
00217
00218 if (involved_slots.count(engine->getId()) > 0)
00219 continue;
00220
00221 involved_engines.push_back(engine);
00222 involved_slots.insert(engine->getId());
00223 }
00224
00225 for_each(involved_engines.begin(),
00226 involved_engines.end(),
00227 bind2nd(mem_fun(&plugin::StorageEngine::startStatement), this));
00228 }
00229
00230 set_proc_info("External lock");
00231
00232
00233
00234
00235
00236
00237 if (sql_lock->sizeTable() && lock_external(sql_lock->getTable(), sql_lock->sizeTable()))
00238 {
00239
00240 reset_lock_data_and_free(&sql_lock);
00241 break;
00242 }
00243 set_proc_info("Table lock");
00244
00245 memcpy(sql_lock->getLocks() + sql_lock->sizeLock(),
00246 sql_lock->getLocks(),
00247 sql_lock->sizeLock() * sizeof(*sql_lock->getLocks()));
00248
00249
00250 drizzled::error_t rc;
00251 rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(*this,
00252 sql_lock->getLocks() +
00253 sql_lock->sizeLock(),
00254 sql_lock->sizeLock(),
00255 this->lock_id)];
00256 if (rc)
00257 {
00258 if (sql_lock->sizeTable())
00259 unlock_external(sql_lock->getTable(), sql_lock->sizeTable());
00260 reset_lock_data_and_free(&sql_lock);
00261 my_error(rc, MYF(0));
00262 }
00263 } while(0);
00264
00265 set_proc_info(0);
00266 if (getKilled())
00267 {
00268 send_kill_message();
00269 if (sql_lock)
00270 {
00271 unlockTables(sql_lock);
00272 sql_lock= NULL;
00273 }
00274 }
00275
00276 set_time_after_lock();
00277
00278 return (sql_lock);
00279 }
00280
00281
00282 int Session::lock_external(Table **tables, uint32_t count)
00283 {
00284 int lock_type,error;
00285 for (uint32_t i= 1 ; i <= count ; i++, tables++)
00286 {
00287 assert((*tables)->reginfo.lock_type >= TL_READ);
00288 lock_type=F_WRLCK;
00289 if ((*tables)->db_stat & HA_READ_ONLY ||
00290 ((*tables)->reginfo.lock_type >= TL_READ &&
00291 (*tables)->reginfo.lock_type <= TL_READ_NO_INSERT))
00292 lock_type=F_RDLCK;
00293
00294 if ((error=(*tables)->cursor->ha_external_lock(this,lock_type)))
00295 {
00296 print_lock_error(error, (*tables)->cursor->getEngine()->getName().c_str());
00297 while (--i)
00298 {
00299 tables--;
00300 (*tables)->cursor->ha_external_lock(this, F_UNLCK);
00301 (*tables)->current_lock=F_UNLCK;
00302 }
00303 return error;
00304 }
00305 else
00306 {
00307 (*tables)->db_stat &= ~ HA_BLOCK_LOCK;
00308 (*tables)->current_lock= lock_type;
00309 }
00310 }
00311 return 0;
00312 }
00313
00314
00315 void Session::unlockTables(DrizzleLock *sql_lock)
00316 {
00317 if (sql_lock->sizeLock())
00318 sql_lock->unlock(sql_lock->sizeLock());
00319 if (sql_lock->sizeTable())
00320 unlock_external(sql_lock->getTable(), sql_lock->sizeTable());
00321 delete sql_lock;
00322 }
00323
00330 void Session::unlockSomeTables(Table **table, uint32_t count)
00331 {
00332 DrizzleLock *sql_lock;
00333 Table *write_lock_used;
00334 if ((sql_lock= get_lock_data(table, count, false,
00335 &write_lock_used)))
00336 unlockTables(sql_lock);
00337 }
00338
00339
00344 void Session::unlockReadTables(DrizzleLock *sql_lock)
00345 {
00346 uint32_t i,found;
00347
00348
00349 THR_LOCK_DATA **lock_local= sql_lock->getLocks();
00350 for (i=found=0 ; i < sql_lock->sizeLock(); i++)
00351 {
00352 if (sql_lock->getLocks()[i]->type >= TL_WRITE_ALLOW_READ)
00353 {
00354 std::swap(*lock_local, sql_lock->getLocks()[i]);
00355 lock_local++;
00356 found++;
00357 }
00358 }
00359
00360 if (i != found)
00361 {
00362 thr_multi_unlock(lock_local, i - found);
00363 sql_lock->setLock(found);
00364 }
00365
00366
00367
00368 Table **table= sql_lock->getTable();
00369 for (i=found=0 ; i < sql_lock->sizeTable() ; i++)
00370 {
00371 assert(sql_lock->getTable()[i]->lock_position == i);
00372 if ((uint32_t) sql_lock->getTable()[i]->reginfo.lock_type >= TL_WRITE_ALLOW_READ)
00373 {
00374 std::swap(*table, sql_lock->getTable()[i]);
00375 table++;
00376 found++;
00377 }
00378 }
00379
00380 if (i != found)
00381 {
00382 unlock_external(table, i - found);
00383 sql_lock->resizeTable(found);
00384 }
00385
00386 table= sql_lock->getTable();
00387 found= 0;
00388 for (i= 0; i < sql_lock->sizeTable(); i++)
00389 {
00390 Table *tbl= *table;
00391 tbl->lock_position= table - sql_lock->getTable();
00392 tbl->lock_data_start= found;
00393 found+= tbl->lock_count;
00394 table++;
00395 }
00396 }
00397
00398
00419 void Session::removeLock(Table *table)
00420 {
00421 unlockSomeTables(&table, 1);
00422 }
00423
00424
00427 void Session::abortLock(Table *table)
00428 {
00429 DrizzleLock *locked;
00430 Table *write_lock_used;
00431
00432 if ((locked= get_lock_data(&table, 1, false,
00433 &write_lock_used)))
00434 {
00435 for (uint32_t x= 0; x < locked->sizeLock(); x++)
00436 locked->getLocks()[x]->lock->abort_locks();
00437 delete locked;
00438 }
00439 }
00440
00441
00454 bool Session::abortLockForThread(Table *table)
00455 {
00456 DrizzleLock *locked;
00457 Table *write_lock_used;
00458 bool result= false;
00459
00460 if ((locked= get_lock_data(&table, 1, false,
00461 &write_lock_used)))
00462 {
00463 for (uint32_t i= 0; i < locked->sizeLock(); i++)
00464 {
00465 if (locked->getLocks()[i]->lock->abort_locks_for_thread(table->in_use->thread_id))
00466 result= true;
00467 }
00468 delete locked;
00469 }
00470 return result;
00471 }
00472
00475 int Session::unlock_external(Table **table, uint32_t count)
00476 {
00477 int error,error_code;
00478
00479 error_code=0;
00480 do
00481 {
00482 if ((*table)->current_lock != F_UNLCK)
00483 {
00484 (*table)->current_lock = F_UNLCK;
00485 if ((error=(*table)->cursor->ha_external_lock(this, F_UNLCK)))
00486 {
00487 error_code=error;
00488 print_lock_error(error_code, (*table)->cursor->getEngine()->getName().c_str());
00489 }
00490 }
00491 table++;
00492 } while (--count);
00493 return error_code;
00494 }
00495
00496
00508 DrizzleLock *Session::get_lock_data(Table **table_ptr, uint32_t count,
00509 bool should_lock, Table **write_lock_used)
00510 {
00511 uint32_t lock_count;
00512 THR_LOCK_DATA **locks, **locks_buf, **locks_start;
00513 Table **to, **table_buf;
00514
00515 *write_lock_used=0;
00516 for (uint32_t i= lock_count= 0 ; i < count ; i++)
00517 {
00518 Table *t= table_ptr[i];
00519
00520 if (! (t->getEngine()->check_flag(HTON_BIT_SKIP_STORE_LOCK)))
00521 {
00522 lock_count++;
00523 }
00524 }
00525
00526
00527
00528
00529
00530
00531
00532 DrizzleLock *sql_lock= new DrizzleLock(lock_count);
00533
00534 if (not sql_lock)
00535 return NULL;
00536
00537 locks= locks_buf= sql_lock->getLocks();
00538 to= table_buf= sql_lock->getTable();
00539
00540 for (uint32_t i= 0; i < count ; i++)
00541 {
00542 Table *table;
00543 enum thr_lock_type lock_type;
00544
00545 if (table_ptr[i]->getEngine()->check_flag(HTON_BIT_SKIP_STORE_LOCK))
00546 continue;
00547
00548 table= table_ptr[i];
00549 lock_type= table->reginfo.lock_type;
00550 assert (lock_type != TL_WRITE_DEFAULT);
00551 if (lock_type >= TL_WRITE_ALLOW_WRITE)
00552 {
00553 *write_lock_used=table;
00554 if (table->db_stat & HA_READ_ONLY)
00555 {
00556 my_error(ER_OPEN_AS_READONLY, MYF(0), table->getAlias());
00557
00558 sql_lock->setLock(locks - sql_lock->getLocks());
00559 reset_lock_data_and_free(&sql_lock);
00560 return NULL;
00561 }
00562 }
00563 locks_start= locks;
00564 locks= table->cursor->store_lock(this, locks, should_lock == false ? TL_IGNORE : lock_type);
00565 if (should_lock)
00566 {
00567 table->lock_position= (uint32_t) (to - table_buf);
00568 table->lock_data_start= (uint32_t) (locks_start - locks_buf);
00569 table->lock_count= (uint32_t) (locks - locks_start);
00570 assert(table->lock_count == 1);
00571 }
00572 *to++= table;
00573 }
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588 sql_lock->setLock(locks - locks_buf);
00589
00590 return sql_lock;
00591 }
00592
00593
00620 int Session::lock_table_name(TableList *table_list)
00621 {
00622 identifier::Table identifier(table_list->getSchemaName(), table_list->getTableName());
00623 const identifier::Table::Key &key(identifier.getKey());
00624
00625 {
00626
00627 table::CacheRange ppp;
00628
00629 ppp= table::getCache().equal_range(key);
00630
00631 for (table::CacheMap::const_iterator iter= ppp.first;
00632 iter != ppp.second; ++iter)
00633 {
00634 Table *table= iter->second;
00635 if (table->reginfo.lock_type < TL_WRITE)
00636 {
00637 continue;
00638 }
00639
00640 if (table->in_use == this)
00641 {
00642 table->getMutableShare()->resetVersion();
00643 table->locked_by_name= true;
00644 return 0;
00645 }
00646 }
00647 }
00648
00649 table::Placeholder *table= NULL;
00650 if (!(table= table_cache_insert_placeholder(identifier)))
00651 {
00652 return -1;
00653 }
00654
00655 table_list->table= reinterpret_cast<Table *>(table);
00656
00657
00658 return(test(table::Cache::singleton().removeTable(this, identifier, RTFC_NO_FLAG)));
00659 }
00660
00661
00662 void TableList::unlock_table_name()
00663 {
00664 if (table)
00665 {
00666 table::remove_table(static_cast<table::Concurrent *>(table));
00667 locking::broadcast_refresh();
00668 }
00669 }
00670
00671
00672 static bool locked_named_table(TableList *table_list)
00673 {
00674 for (; table_list ; table_list=table_list->next_local)
00675 {
00676 Table *table= table_list->table;
00677 if (table)
00678 {
00679 Table *save_next= table->getNext();
00680 bool result;
00681 table->setNext(NULL);
00682 result= table::Cache::singleton().areTablesUsed(table_list->table, 0);
00683 table->setNext(save_next);
00684 if (result)
00685 return 1;
00686 }
00687 }
00688 return 0;
00689 }
00690
00691
00692 bool Session::wait_for_locked_table_names(TableList *table_list)
00693 {
00694 bool result= false;
00695
00696 #if 0
00697 assert(ownership of table::Cache::singleton().mutex());
00698 #endif
00699
00700 while (locked_named_table(table_list))
00701 {
00702 if (getKilled())
00703 {
00704 result=1;
00705 break;
00706 }
00707 wait_for_condition(table::Cache::singleton().mutex(), COND_refresh);
00708 table::Cache::singleton().mutex().lock();
00709 }
00710 return result;
00711 }
00712
00713
00728 bool Session::lock_table_names(TableList *table_list)
00729 {
00730 bool got_all_locks= true;
00731 TableList *lock_table;
00732
00733 for (lock_table= table_list; lock_table; lock_table= lock_table->next_local)
00734 {
00735 int got_lock;
00736 if ((got_lock= lock_table_name(lock_table)) < 0)
00737 {
00738 table_list->unlock_table_names(table_list);
00739 return true;
00740 }
00741
00742 if (got_lock)
00743 got_all_locks= false;
00744 }
00745
00746
00747 if (not got_all_locks && wait_for_locked_table_names(table_list))
00748 {
00749 table_list->unlock_table_names(table_list);
00750
00751 return true;
00752 }
00753
00754 return false;
00755 }
00756
00757
00776 bool Session::lock_table_names_exclusively(TableList *table_list)
00777 {
00778 if (lock_table_names(table_list))
00779 return true;
00780
00781
00782
00783
00784 for (TableList *table= table_list; table; table= table->next_global)
00785 {
00786 if (table->table)
00787 table->table->open_placeholder= 1;
00788 }
00789 return false;
00790 }
00791
00792
00815 void TableList::unlock_table_names(TableList *last_table)
00816 {
00817 for (TableList *table_iter= this;
00818 table_iter != last_table;
00819 table_iter= table_iter->next_local)
00820 {
00821 table_iter->unlock_table_name();
00822 }
00823
00824 locking::broadcast_refresh();
00825 }
00826
00827
00828 static void print_lock_error(int error, const char *table)
00829 {
00830 drizzled::error_t textno;
00831
00832 switch (error) {
00833 case HA_ERR_LOCK_WAIT_TIMEOUT:
00834 textno=ER_LOCK_WAIT_TIMEOUT;
00835 break;
00836 case HA_ERR_READ_ONLY_TRANSACTION:
00837 textno=ER_READ_ONLY_TRANSACTION;
00838 break;
00839 case HA_ERR_LOCK_DEADLOCK:
00840 textno=ER_LOCK_DEADLOCK;
00841 break;
00842 case HA_ERR_WRONG_COMMAND:
00843 textno=ER_ILLEGAL_HA;
00844 break;
00845 default:
00846 textno=ER_CANT_LOCK;
00847 break;
00848 }
00849
00850 if ( textno == ER_ILLEGAL_HA )
00851 my_error(textno, MYF(ME_BELL+ME_OLDWIN+ME_WAITTANG), table);
00852 else
00853 my_error(textno, MYF(ME_BELL+ME_OLDWIN+ME_WAITTANG), error);
00854 }
00855
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931
00932 volatile uint32_t global_read_lock=0;
00933 volatile uint32_t global_read_lock_blocks_commit=0;
00934 static volatile uint32_t protect_against_global_read_lock=0;
00935 static volatile uint32_t waiting_for_read_lock=0;
00936
00937 bool Session::lockGlobalReadLock()
00938 {
00939 if (isGlobalReadLock() == Session::NONE)
00940 {
00941 const char *old_message;
00942 LOCK_global_read_lock.lock();
00943 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
00944 "Waiting to get readlock");
00945
00946 waiting_for_read_lock++;
00947 boost_unique_lock_t scopedLock(LOCK_global_read_lock, boost::adopt_lock_t());
00948 while (protect_against_global_read_lock && not getKilled())
00949 COND_global_read_lock.wait(scopedLock);
00950 waiting_for_read_lock--;
00951 scopedLock.release();
00952 if (getKilled())
00953 {
00954 exit_cond(old_message);
00955 return true;
00956 }
00957 setGlobalReadLock(Session::GOT_GLOBAL_READ_LOCK);
00958 global_read_lock++;
00959 exit_cond(old_message);
00960 }
00961
00962
00963
00964
00965
00966
00967
00968
00969
00970 return false;
00971 }
00972
00973
00974 void Session::unlockGlobalReadLock(void)
00975 {
00976 uint32_t tmp;
00977
00978 if (not isGlobalReadLock())
00979 return;
00980
00981 {
00982 boost_unique_lock_t scopedLock(LOCK_global_read_lock);
00983 tmp= --global_read_lock;
00984 if (isGlobalReadLock() == Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT)
00985 --global_read_lock_blocks_commit;
00986 }
00987
00988 if (not tmp)
00989 {
00990 COND_global_read_lock.notify_all();
00991 }
00992 setGlobalReadLock(Session::NONE);
00993 }
00994
00995 static inline bool must_wait(bool is_not_commit)
00996 {
00997 return (global_read_lock &&
00998 (is_not_commit ||
00999 global_read_lock_blocks_commit));
01000 }
01001
01002 bool Session::wait_if_global_read_lock(bool abort_on_refresh, bool is_not_commit)
01003 {
01004 const char *old_message= NULL;
01005 bool result= 0, need_exit_cond;
01006
01007
01008
01009
01010
01011
01012 safe_mutex_assert_not_owner(table::Cache::singleton().mutex().native_handle());
01013
01014 LOCK_global_read_lock.lock();
01015 if ((need_exit_cond= must_wait(is_not_commit)))
01016 {
01017 if (isGlobalReadLock())
01018 {
01019 if (is_not_commit)
01020 my_message(ER_CANT_UPDATE_WITH_READLOCK,
01021 ER(ER_CANT_UPDATE_WITH_READLOCK), MYF(0));
01022 LOCK_global_read_lock.unlock();
01023
01024
01025
01026
01027
01028 return is_not_commit;
01029 }
01030 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
01031 "Waiting for release of readlock");
01032
01033 while (must_wait(is_not_commit) && not getKilled() &&
01034 (!abort_on_refresh || version == refresh_version))
01035 {
01036 boost_unique_lock_t scoped(LOCK_global_read_lock, boost::adopt_lock_t());
01037 COND_global_read_lock.wait(scoped);
01038 scoped.release();
01039 }
01040 if (getKilled())
01041 result=1;
01042 }
01043
01044 if (not abort_on_refresh && not result)
01045 protect_against_global_read_lock++;
01046
01047
01048
01049
01050
01051 if (unlikely(need_exit_cond))
01052 {
01053 exit_cond(old_message);
01054 }
01055 else
01056 {
01057 LOCK_global_read_lock.unlock();
01058 }
01059
01060 return result;
01061 }
01062
01063
01064 void Session::startWaitingGlobalReadLock()
01065 {
01066 bool tmp;
01067 if (unlikely(isGlobalReadLock()))
01068 return;
01069
01070 LOCK_global_read_lock.lock();
01071 tmp= (!--protect_against_global_read_lock &&
01072 (waiting_for_read_lock || global_read_lock_blocks_commit));
01073 LOCK_global_read_lock.unlock();
01074
01075 if (tmp)
01076 COND_global_read_lock.notify_all();
01077 }
01078
01079
01080 bool Session::makeGlobalReadLockBlockCommit()
01081 {
01082 bool error;
01083 const char *old_message;
01084
01085
01086
01087
01088 if (isGlobalReadLock() != Session::GOT_GLOBAL_READ_LOCK)
01089 return false;
01090 LOCK_global_read_lock.lock();
01091
01092 global_read_lock_blocks_commit++;
01093 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
01094 "Waiting for all running commits to finish");
01095 while (protect_against_global_read_lock && not getKilled())
01096 {
01097 boost_unique_lock_t scopedLock(LOCK_global_read_lock, boost::adopt_lock_t());
01098 COND_global_read_lock.wait(scopedLock);
01099 scopedLock.release();
01100 }
01101 if ((error= test(getKilled())))
01102 {
01103 global_read_lock_blocks_commit--;
01104 }
01105 else
01106 {
01107 setGlobalReadLock(Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT);
01108 }
01109
01110 exit_cond(old_message);
01111
01112 return error;
01113 }
01114
01115
01135 namespace locking {
01136
01137 void broadcast_refresh(void)
01138 {
01139 COND_refresh.notify_all();
01140 COND_global_read_lock.notify_all();
01141 }
01142
01143 }
01144
01145
01150 }