00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 #include <config.h>
00056 #include <drizzled/internal/my_sys.h>
00057 #include <drizzled/internal/thread_var.h>
00058 #include <drizzled/statistics_variables.h>
00059 #include <drizzled/pthread_globals.h>
00060
00061 #include <drizzled/session.h>
00062
00063 #include "thr_lock.h"
00064 #include <drizzled/internal/m_string.h>
00065 #include <errno.h>
00066 #include <list>
00067
00068 #if TIME_WITH_SYS_TIME
00069 # include <sys/time.h>
00070 # include <time.h>
00071 #else
00072 # if HAVE_SYS_TIME_H
00073 # include <sys/time.h>
00074 # else
00075 # include <time.h>
00076 # endif
00077 #endif
00078
00079 #include <drizzled/util/test.h>
00080
00081 #include <boost/interprocess/sync/lock_options.hpp>
00082
00083 using namespace std;
00084
00085 namespace drizzled
00086 {
00087
00088 uint64_t table_lock_wait_timeout;
00089 static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
00090
00091
00092 uint64_t max_write_lock_count= UINT64_MAX;
00093
00094
00095
00096
00097
00098 static inline bool
00099 thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
00100 {
00101 return rhs == lhs;
00102 }
00103
00104
00105
00106
00107 void thr_lock_init(THR_LOCK *lock)
00108 {
00109 lock->init();
00110 lock->read.last= &lock->read.data;
00111 lock->read_wait.last= &lock->read_wait.data;
00112 lock->write_wait.last= &lock->write_wait.data;
00113 lock->write.last= &lock->write.data;
00114 }
00115
00116
00117 void THR_LOCK_INFO::init()
00118 {
00119 internal::st_my_thread_var *tmp= my_thread_var;
00120 thread_id= tmp->id;
00121 n_cursors= 0;
00122 }
00123
00124
00125
00126 void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
00127 {
00128 lock= lock_arg;
00129 type= TL_UNLOCK;
00130 owner= NULL;
00131 status_param= param_arg;
00132 cond= NULL;
00133 }
00134
00135
00136 static inline bool
00137 have_old_read_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner)
00138 {
00139 for ( ; data ; data=data->next)
00140 {
00141 if (thr_lock_owner_equal(data->owner, owner))
00142 return true;
00143 }
00144 return false;
00145 }
00146
00147 static void wake_up_waiters(THR_LOCK *lock);
00148
00149
00150 static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
00151 {
00152 internal::st_my_thread_var *thread_var= session.getThreadVar();
00153
00154 boost::condition_variable_any *cond= &thread_var->suspend;
00155 enum enum_thr_lock_result result= THR_LOCK_ABORTED;
00156 bool can_deadlock= test(data->owner->info->n_cursors);
00157
00158 {
00159 (*wait->last)=data;
00160 data->prev= wait->last;
00161 wait->last= &data->next;
00162 }
00163
00164 current_global_counters.locks_waited++;
00165
00166
00167 thread_var->current_mutex= data->lock->native_handle();
00168 thread_var->current_cond= &thread_var->suspend;
00169 data->cond= &thread_var->suspend;;
00170
00171 while (not thread_var->abort)
00172 {
00173 boost_unique_lock_t scoped(*data->lock->native_handle(), boost::adopt_lock_t());
00174
00175 if (can_deadlock)
00176 {
00177 boost::xtime xt;
00178 xtime_get(&xt, boost::TIME_UTC);
00179 xt.sec += table_lock_wait_timeout;
00180 if (not cond->timed_wait(scoped, xt))
00181 {
00182 result= THR_LOCK_WAIT_TIMEOUT;
00183 scoped.release();
00184 break;
00185 }
00186 }
00187 else
00188 {
00189 cond->wait(scoped);
00190 }
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204 if (data->cond == NULL)
00205 {
00206 scoped.release();
00207 break;
00208 }
00209 scoped.release();
00210 }
00211 if (data->cond || data->type == TL_UNLOCK)
00212 {
00213 if (data->cond)
00214 {
00215 if (((*data->prev)=data->next))
00216 data->next->prev= data->prev;
00217 else
00218 wait->last=data->prev;
00219 data->type= TL_UNLOCK;
00220 wake_up_waiters(data->lock);
00221 }
00222 }
00223 else
00224 {
00225 result= THR_LOCK_SUCCESS;
00226 }
00227 data->lock->unlock();
00228
00229
00230 boost_unique_lock_t scopedLock(thread_var->mutex);
00231 thread_var->current_mutex= NULL;
00232 thread_var->current_cond= NULL;
00233 return(result);
00234 }
00235
00236
00237 static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
00238 {
00239 THR_LOCK *lock= data->lock;
00240 enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
00241 struct st_lock_list *wait_queue;
00242 THR_LOCK_DATA *lock_owner;
00243
00244 data->next=0;
00245 data->cond=0;
00246 data->type=lock_type;
00247 data->owner= owner;
00248 lock->lock();
00249 if ((int) lock_type <= (int) TL_READ_NO_INSERT)
00250 {
00251
00252 if (lock->write.data)
00253 {
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263 if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
00264 (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
00265 (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
00266 (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
00267 lock->write.data->type != TL_WRITE_ALLOW_READ))))
00268 {
00269 (*lock->read.last)=data;
00270 data->prev=lock->read.last;
00271 lock->read.last= &data->next;
00272 if (lock_type == TL_READ_NO_INSERT)
00273 lock->read_no_write_count++;
00274 current_global_counters.locks_immediate++;
00275 goto end;
00276 }
00277 if (lock->write.data->type == TL_WRITE_ONLY)
00278 {
00279
00280 data->type=TL_UNLOCK;
00281 result= THR_LOCK_ABORTED;
00282 goto end;
00283 }
00284 }
00285 else if (!lock->write_wait.data ||
00286 lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
00287 have_old_read_lock(lock->read.data, data->owner))
00288 {
00289 (*lock->read.last)=data;
00290 data->prev=lock->read.last;
00291 lock->read.last= &data->next;
00292 if (lock_type == TL_READ_NO_INSERT)
00293 lock->read_no_write_count++;
00294 current_global_counters.locks_immediate++;
00295 goto end;
00296 }
00297
00298
00299
00300
00301
00302 wait_queue= &lock->read_wait;
00303 }
00304 else
00305 {
00306 if (lock_type == TL_WRITE_CONCURRENT_INSERT)
00307 data->type=lock_type= thr_upgraded_concurrent_insert_lock;
00308
00309 if (lock->write.data)
00310 {
00311 if (lock->write.data->type == TL_WRITE_ONLY)
00312 {
00313
00314 if (!thr_lock_owner_equal(data->owner, lock->write.data->owner))
00315 {
00316
00317 data->type=TL_UNLOCK;
00318 result= THR_LOCK_ABORTED;
00319 goto end;
00320 }
00321 }
00322
00323
00324
00325
00326
00327
00328 if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
00329 (lock_type == TL_WRITE_ALLOW_WRITE &&
00330 !lock->write_wait.data &&
00331 lock->write.data->type == TL_WRITE_ALLOW_WRITE))
00332 {
00333
00334
00335
00336
00337
00338 (*lock->write.last)=data;
00339 data->prev=lock->write.last;
00340 lock->write.last= &data->next;
00341 current_global_counters.locks_immediate++;
00342 goto end;
00343 }
00344 }
00345 else
00346 {
00347 if (!lock->write_wait.data)
00348 {
00349 bool concurrent_insert= 0;
00350 if (lock_type == TL_WRITE_CONCURRENT_INSERT)
00351 {
00352 concurrent_insert= 1;
00353 }
00354
00355 if (!lock->read.data ||
00356 (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
00357 ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
00358 lock_type != TL_WRITE_ALLOW_WRITE) ||
00359 !lock->read_no_write_count)))
00360 {
00361 (*lock->write.last)=data;
00362 data->prev=lock->write.last;
00363 lock->write.last= &data->next;
00364 current_global_counters.locks_immediate++;
00365 goto end;
00366 }
00367 }
00368 }
00369 wait_queue= &lock->write_wait;
00370 }
00371
00372
00373
00374
00375
00376
00377 lock_owner= lock->read.data ? lock->read.data : lock->write.data;
00378 if (lock_owner && lock_owner->owner->info == owner->info)
00379 {
00380 result= THR_LOCK_DEADLOCK;
00381 goto end;
00382 }
00383
00384
00385 return(wait_for_lock(session, wait_queue, data));
00386 end:
00387 lock->unlock();
00388
00389 return(result);
00390 }
00391
00392
00393 static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
00394 {
00395 THR_LOCK_DATA *data= lock->read_wait.data;
00396
00397
00398 (*lock->read.last)=data;
00399 data->prev=lock->read.last;
00400 lock->read.last=lock->read_wait.last;
00401
00402
00403 lock->read_wait.last= &lock->read_wait.data;
00404
00405 do
00406 {
00407 boost::condition_variable_any *cond= data->cond;
00408 if ((int) data->type == (int) TL_READ_NO_INSERT)
00409 {
00410 if (using_concurrent_insert)
00411 {
00412
00413
00414
00415
00416 if (((*data->prev)=data->next))
00417 data->next->prev=data->prev;
00418 else
00419 lock->read.last=data->prev;
00420 *lock->read_wait.last= data;
00421 data->prev= lock->read_wait.last;
00422 lock->read_wait.last= &data->next;
00423 continue;
00424 }
00425 lock->read_no_write_count++;
00426 }
00427 data->cond= NULL;
00428 cond->notify_one();
00429 } while ((data=data->next));
00430 *lock->read_wait.last=0;
00431 if (!lock->read_wait.data)
00432 lock->write_lock_count=0;
00433 }
00434
00435
00436
00437 static void thr_unlock(THR_LOCK_DATA *data)
00438 {
00439 THR_LOCK *lock=data->lock;
00440 enum thr_lock_type lock_type=data->type;
00441 lock->lock();
00442
00443 if (((*data->prev)=data->next))
00444 data->next->prev= data->prev;
00445 else if (lock_type <= TL_READ_NO_INSERT)
00446 lock->read.last=data->prev;
00447 else
00448 lock->write.last=data->prev;
00449 if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
00450 { }
00451 else
00452 { }
00453 if (lock_type == TL_READ_NO_INSERT)
00454 lock->read_no_write_count--;
00455 data->type=TL_UNLOCK;
00456 wake_up_waiters(lock);
00457 lock->unlock();
00458 }
00459
00460
00469 static void wake_up_waiters(THR_LOCK *lock)
00470 {
00471 THR_LOCK_DATA *data;
00472 enum thr_lock_type lock_type;
00473
00474 if (!lock->write.data)
00475 {
00476 data=lock->write_wait.data;
00477 if (!lock->read.data)
00478 {
00479
00480 if (data &&
00481 (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
00482 {
00483 if (lock->write_lock_count++ > max_write_lock_count)
00484 {
00485
00486 lock->write_lock_count=0;
00487 if (lock->read_wait.data)
00488 {
00489 free_all_read_locks(lock,0);
00490 goto end;
00491 }
00492 }
00493 for (;;)
00494 {
00495 if (((*data->prev)=data->next))
00496 data->next->prev= data->prev;
00497 else
00498 lock->write_wait.last=data->prev;
00499 (*lock->write.last)=data;
00500 data->prev=lock->write.last;
00501 data->next=0;
00502 lock->write.last= &data->next;
00503
00504 {
00505 boost::condition_variable_any *cond= data->cond;
00506 data->cond= NULL;
00507 cond->notify_one();
00508 }
00509 if (data->type != TL_WRITE_ALLOW_WRITE ||
00510 !lock->write_wait.data ||
00511 lock->write_wait.data->type != TL_WRITE_ALLOW_WRITE)
00512 break;
00513 data=lock->write_wait.data;
00514 }
00515 if (data->type >= TL_WRITE)
00516 goto end;
00517
00518 }
00519 if (lock->read_wait.data)
00520 free_all_read_locks(lock,
00521 data &&
00522 (data->type == TL_WRITE_CONCURRENT_INSERT ||
00523 data->type == TL_WRITE_ALLOW_WRITE));
00524 }
00525 else if (data &&
00526 (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
00527 ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
00528 lock_type != TL_WRITE_ALLOW_WRITE) ||
00529 !lock->read_no_write_count))
00530 {
00531 do {
00532 boost::condition_variable_any *cond= data->cond;
00533 if (((*data->prev)=data->next))
00534 data->next->prev= data->prev;
00535 else
00536 lock->write_wait.last=data->prev;
00537 (*lock->write.last)=data;
00538 data->prev=lock->write.last;
00539 lock->write.last= &data->next;
00540 data->next=0;
00541 data->cond= NULL;
00542 cond->notify_one();
00543 } while (lock_type == TL_WRITE_ALLOW_WRITE &&
00544 (data=lock->write_wait.data) &&
00545 data->type == TL_WRITE_ALLOW_WRITE);
00546 if (lock->read_wait.data)
00547 free_all_read_locks(lock,
00548 (lock_type == TL_WRITE_CONCURRENT_INSERT ||
00549 lock_type == TL_WRITE_ALLOW_WRITE));
00550 }
00551 else if (!data && lock->read_wait.data)
00552 {
00553 free_all_read_locks(lock,0);
00554 }
00555 }
00556 end:
00557 return;
00558 }
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568 #define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
00569
00570 static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
00571 {
00572 THR_LOCK_DATA **pos,**end,**prev,*tmp;
00573
00574
00575
00576 for (pos=data+1,end=data+count; pos < end ; pos++)
00577 {
00578 tmp= *pos;
00579 if (LOCK_CMP(tmp,pos[-1]))
00580 {
00581 prev=pos;
00582 do {
00583 prev[0]=prev[-1];
00584 } while (--prev != data && LOCK_CMP(tmp,prev[-1]));
00585 prev[0]=tmp;
00586 }
00587 }
00588 }
00589
00590
00591 enum enum_thr_lock_result
00592 thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
00593 {
00594 THR_LOCK_DATA **pos,**end;
00595 if (count > 1)
00596 sort_locks(data,count);
00597
00598 for (pos=data,end=data+count; pos < end ; pos++)
00599 {
00600 enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
00601 if (result != THR_LOCK_SUCCESS)
00602 {
00603 thr_multi_unlock(data,(uint32_t) (pos-data));
00604 return(result);
00605 }
00606 }
00607
00608
00609
00610
00611
00612 #if !defined(DONT_USE_RW_LOCKS)
00613 if (count > 1)
00614 {
00615 THR_LOCK_DATA *last_lock= end[-1];
00616 pos=end-1;
00617 do
00618 {
00619 pos--;
00620 last_lock=(*pos);
00621 } while (pos != data);
00622 }
00623 #endif
00624 return(THR_LOCK_SUCCESS);
00625 }
00626
00627
00628
00629 void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
00630 {
00631 THR_LOCK_DATA **pos,**end;
00632
00633 for (pos=data,end=data+count; pos < end ; pos++)
00634 {
00635 if ((*pos)->type != TL_UNLOCK)
00636 thr_unlock(*pos);
00637 }
00638 return;
00639 }
00640
00641 void DrizzleLock::unlock(uint32_t count)
00642 {
00643 THR_LOCK_DATA **pos,**end;
00644
00645 for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
00646 {
00647 if ((*pos)->type != TL_UNLOCK)
00648 thr_unlock(*pos);
00649 }
00650 }
00651
00652
00653
00654
00655
00656
00657 void THR_LOCK::abort_locks()
00658 {
00659 boost_unique_lock_t scopedLock(mutex);
00660
00661 for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
00662 {
00663 local_data->type= TL_UNLOCK;
00664
00665 local_data->cond->notify_one();
00666 local_data->cond= NULL;
00667 }
00668 for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
00669 {
00670 local_data->type= TL_UNLOCK;
00671 local_data->cond->notify_one();
00672 local_data->cond= NULL;
00673 }
00674 read_wait.last= &read_wait.data;
00675 write_wait.last= &write_wait.data;
00676 read_wait.data= write_wait.data=0;
00677 if (write.data)
00678 write.data->type=TL_WRITE_ONLY;
00679 }
00680
00681
00682
00683
00684
00685
00686
00687
00688 bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
00689 {
00690 bool found= false;
00691
00692 boost_unique_lock_t scopedLock(mutex);
00693 for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
00694 {
00695 if (local_data->owner->info->thread_id == thread_id_arg)
00696 {
00697 local_data->type= TL_UNLOCK;
00698
00699 found= true;
00700 local_data->cond->notify_one();
00701 local_data->cond= 0;
00702
00703 if (((*local_data->prev)= local_data->next))
00704 local_data->next->prev= local_data->prev;
00705 else
00706 read_wait.last= local_data->prev;
00707 }
00708 }
00709 for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
00710 {
00711 if (local_data->owner->info->thread_id == thread_id_arg)
00712 {
00713 local_data->type= TL_UNLOCK;
00714 found= true;
00715 local_data->cond->notify_one();
00716 local_data->cond= NULL;
00717
00718 if (((*local_data->prev)= local_data->next))
00719 local_data->next->prev= local_data->prev;
00720 else
00721 write_wait.last= local_data->prev;
00722 }
00723 }
00724 wake_up_waiters(this);
00725
00726 return found;
00727 }
00728
00729 }