Drizzled Public API Documentation

connection_handler_ms.cc
00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-05-25
00023  *
00024  * H&G2JCtL
00025  *
00026  * Network interface.
00027  *
00028  */
00029 
00030 #include "cslib/CSConfig.h"
00031 #include <inttypes.h>
00032 
00033 #include "defs_ms.h"
00034 
00035 #include "cslib/CSGlobal.h"
00036 #include "cslib/CSSocket.h"
00037 #include "cslib/CSStrUtil.h"
00038 #include "cslib/CSHTTPStream.h"
00039 
00040 #include "connection_handler_ms.h"
00041 #include "network_ms.h"
00042 #include "open_table_ms.h"
00043 #include "engine_ms.h"
00044 #include "version_ms.h"
00045 
00046 //#include "mysql_ms.h"
00047 
00048 u_long MSConnectionHandler::gMaxKeepAlive;
00049 
00050 MSConnectionHandler::MSConnectionHandler(CSThreadList *list):
00051   CSDaemon(list),
00052   amWaitingToListen(false),
00053   shuttingDown(false),
00054   lastUse(0),
00055   replyPending(false),
00056   iInputStream(NULL),
00057   iOutputStream(NULL),
00058   iTableURI(NULL)
00059 {
00060 }
00061 
00062 void MSConnectionHandler::close()
00063 {
00064   closeStream();
00065   freeRequestURI();
00066 }
00067 
00068 MSConnectionHandler *MSConnectionHandler::newHandler(CSThreadList *list)
00069 {
00070   return new MSConnectionHandler(list);
00071 }
00072 
00073 /* Return false if not connection was openned, and the thread must quit. */
00074 bool MSConnectionHandler::openStream()
00075 {
00076   CSSocket    *sock;
00077   CSInputStream *in;
00078   CSOutputStream  *out;
00079   
00080   enter_();
00081   if (!(sock = MSNetwork::openConnection(this)))
00082     return_(false);
00083   push_(sock);
00084   in = sock->getInputStream();
00085   in = CSBufferedInputStream::newStream(in);
00086   iInputStream = CSHTTPInputStream::newStream(in);
00087 
00088   out = sock->getOutputStream();
00089   out = CSBufferedOutputStream::newStream(out);
00090   iOutputStream = CSHTTPOutputStream::newStream(out);
00091   release_(sock);
00092   return_(true);
00093 }
00094 
00095 int MSConnectionHandler::getHTTPStatus(int err)
00096 {
00097   int code;
00098 
00099   switch (err) {
00100     case MS_OK:           code = 200; break;
00101     case MS_ERR_ENGINE:       code = 500; break;
00102     case MS_ERR_UNKNOWN_TABLE:    code = 404; break;
00103     case MS_ERR_UNKNOWN_DB:     code = 404; break;
00104     case MS_ERR_DATABASE_DELETED: code = 404; break;
00105     case MS_ERR_NOT_FOUND:      code = 404; break;
00106     case MS_ERR_REMOVING_REPO:    code = 404; break;
00107     case MS_ERR_TABLE_LOCKED:   code = 412; break; // Precondition Failed
00108     case MS_ERR_INCORRECT_URL:    code = 404; break;
00109     case MS_ERR_AUTH_FAILED:    code = 403; break; // Forbidden
00110     default:            code = 500; break;
00111   }
00112   return code;
00113 }
00114 
00115 void MSConnectionHandler::writeException(const char *qualifier)
00116 {
00117   int code;
00118 
00119   enter_();
00120   iOutputStream->clearHeaders();
00121   iOutputStream->clearBody();
00122   code = getHTTPStatus(myException.getErrorCode());
00123   iOutputStream->setStatus(code);
00124   iOutputStream->appendBody("<HTML><HEAD><TITLE>HTTP Error ");
00125   iOutputStream->appendBody(code);
00126   iOutputStream->appendBody(": ");
00127   iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
00128   iOutputStream->appendBody("</TITLE></HEAD>");
00129   iOutputStream->appendBody("<BODY><H2>HTTP Error ");
00130   iOutputStream->appendBody(code);
00131   iOutputStream->appendBody(": ");
00132   iOutputStream->appendBody(CSHTTPOutputStream::getReasonPhrase(code));
00133   iOutputStream->appendBody("</H2>");
00134   if (qualifier)
00135     iOutputStream->appendBody(qualifier);
00136   iOutputStream->appendBody(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG);
00137   iOutputStream->appendBody(myException.getMessage());
00138   iOutputStream->appendBody(EXCEPTION_REPLY_MESSAGE_SUFFIX_TAG);
00139   iOutputStream->appendBody(myException.getStackTrace());
00140   iOutputStream->appendBody(EXCEPTION_REPLY_STACK_TRACE_SUFFIX_TAG);
00141   iOutputStream->appendBody("MySQL ");
00142   iOutputStream->appendBody(PBMSVersion::getCString());
00143   iOutputStream->appendBody(", PBMS ");
00144   iOutputStream->appendBody(PBMSVersion::getCString());
00145   iOutputStream->appendBody("<br>Copyright &#169; 2009, PrimeBase Technologies GmbH</font></P></BODY></HTML>");
00146 
00147   replyPending = false;
00148   iOutputStream->writeHead();
00149   iOutputStream->writeBody();
00150   iOutputStream->flush();
00151   exit_();
00152 }
00153 
00154 void MSConnectionHandler::writeException()
00155 {
00156   writeException(NULL);
00157 }
00158 
00159 void MSConnectionHandler::closeStream()
00160 {
00161   enter_();
00162   if (iOutputStream) {
00163     if (replyPending) {
00164       try_(a) {
00165         writeException();
00166       }
00167       catch_(a) {
00168       }
00169       cont_(a);
00170     }
00171     iOutputStream->release();
00172     iOutputStream = NULL;
00173   }
00174   if (iInputStream) {
00175     iInputStream->release();
00176     iInputStream = NULL;
00177   }
00178   exit_();
00179 }
00180 
00181 void MSConnectionHandler::parseRequestURI()
00182 {
00183   CSString  *uri = iInputStream->getRequestURI();
00184   uint32_t    pos = 0, end;
00185   enter_();
00186   
00187   freeRequestURI();
00188   pos = uri->locate(0, "://");
00189   if (pos < uri->length())
00190     pos += 3;
00191   else
00192     pos = uri->skip(0, '/');
00193 
00194   // Table URI
00195   end = uri->locate(pos, '/');
00196   //end = uri->locate(uri->nextPos(end), '/'); I am not sure why this was done.
00197   //iTableURI = uri->substr(pos, end - pos);
00198   iTableURI = uri->substr(pos);
00199 
00200   exit_();
00201 }
00202 
00203 void MSConnectionHandler::freeRequestURI()
00204 {
00205   if (iTableURI)
00206     iTableURI->release();
00207   iTableURI = NULL;
00208 }
00209 
00210 void MSConnectionHandler::writeFile(CSString *file_path)
00211 {
00212   CSPath      *path;
00213   CSFile      *file;
00214 
00215   enter_();
00216   push_(file_path);
00217 
00218   path = CSPath::newPath(RETAIN(file_path));
00219   pop_(file_path);
00220   push_(path);
00221   if (path->exists()) {
00222     file = path->openFile(CSFile::READONLY);
00223     push_(file);
00224 
00225     iOutputStream->setContentLength((uint64_t) path->getSize());
00226     replyPending = false;
00227     iOutputStream->writeHead();
00228     
00229     CSStream::pipe(RETAIN(iOutputStream), file->getInputStream());
00230 
00231     release_(file);
00232   }
00233   else {
00234     myException.initFileError(CS_CONTEXT, path->getCString(), ENOENT);
00235     writeException();
00236   }
00237   release_(path);
00238 
00239   exit_();
00240 }
00241 
00242 /*
00243  * Request URI: /<blob URL>
00244 * OR
00245  * Request URI: /<database>/<blob alias>
00246  */
00247 void MSConnectionHandler::handleGet(bool info_only)
00248 {
00249   const char  *bad_url_comment = "Incorrect URL: ";
00250   MSOpenTable *otab;
00251   CSString  *info_request;
00252   CSString  *ping_request;
00253 
00254   enter_();
00255   self->myException.setErrorCode(0);
00256 
00257   iOutputStream->clearHeaders();
00258   iOutputStream->clearBody();
00259   //iOutputStream->setStatus(200); This is done in the send now.
00260   
00261   parseRequestURI();
00262 
00263   ping_request = iInputStream->getHeaderValue(MS_PING_REQUEST);
00264   if (ping_request) {
00265     MSDatabase *db;
00266     
00267     db = MSDatabase::getDatabase(ping_request, false);
00268     if (db) {
00269       push_(db);
00270       if (db->myBlobCloud->cl_getDefaultCloudRef()) {
00271         MSCloudInfo *info = db->myBlobCloud->cl_getCloudInfo();
00272         push_(info);
00273         iOutputStream->addHeader(MS_CLOUD_SERVER, info->getServer());
00274         release_(info);
00275       }
00276       release_(db);
00277     }
00278     
00279     iOutputStream->setStatus(200); 
00280     iOutputStream->writeHead();
00281     iOutputStream->flush();
00282     exit_();
00283 
00284   }
00285   
00286   info_request = iInputStream->getHeaderValue(MS_BLOB_INFO_REQUEST);
00287   if (info_request) {
00288     info_only = (info_request->compare("yes") == 0);
00289     info_request->release();
00290   }
00291   
00292   
00293   
00294   if (iTableURI->length() == 0)
00295     goto bad_url;
00296   
00297 
00298   MSBlobURLRec blob;
00299 
00300   if (iTableURI->equals("favicon.ico")) {
00301     iOutputStream->setStatus(200); 
00302     writeFile(iTableURI);
00303   } else if (PBMSBlobURLTools::couldBeURL(iTableURI->getCString(), &blob)) {  
00304     uint64_t size, offset;
00305     
00306     if ((! info_only) && iInputStream->getRange(&size, &offset)) { 
00307       if (offset >= blob.bu_blob_size) {
00308         iOutputStream->setStatus(416); // Requested range not satisfiable.
00309         iOutputStream->writeHead();
00310         iOutputStream->flush();
00311         exit_();
00312       }
00313       
00314       if (size > (blob.bu_blob_size - offset))
00315         size = blob.bu_blob_size - offset;        
00316 
00317       iOutputStream->setRange(size, offset, blob.bu_blob_size);
00318     } else {
00319       size = blob.bu_blob_size;
00320       offset = 0;
00321     }
00322  
00323     if (blob.bu_type == MS_URL_TYPE_BLOB) {
00324       otab = MSTableList::getOpenTableByID(blob.bu_db_id, blob.bu_tab_id);
00325       frompool_(otab);
00326       otab->sendRepoBlob(blob.bu_blob_id, offset, size, blob.bu_auth_code, info_only, iOutputStream);
00327       backtopool_(otab);
00328     } else {
00329       MSRepoFile  *repo_file;
00330 
00331       if (!(otab = MSTableList::getOpenTableForDB(blob.bu_db_id))) {
00332         char buffer[CS_EXC_MESSAGE_SIZE];
00333         char id_str[12];
00334         
00335         snprintf(id_str, 12, "%"PRIu32"", blob.bu_db_id);
00336 
00337         cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database ID # ");
00338         cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
00339         CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
00340       }
00341       frompool_(otab);
00342       repo_file = otab->getDB()->getRepoFileFromPool(blob.bu_tab_id, false);
00343       frompool_(repo_file);
00344       repo_file->sendBlob(otab, blob.bu_blob_id, offset, size, blob.bu_auth_code, true, info_only, iOutputStream);
00345       backtopool_(repo_file);
00346       backtopool_(otab);
00347     }
00348   } 
00349   else { 
00350 #ifdef HAVE_ALIAS_SUPPORT
00351 
00352     CSString  *db_name;
00353     CSString  *alias;
00354     MSDatabase * db;
00355     uint32_t repo_id;
00356     uint64_t blob_id;
00357     
00358     db_name = iTableURI->left("/");
00359     push_(db_name);
00360     alias = iTableURI->right("/");
00361     push_(alias);
00362 
00363     if (db_name->length() == 0 || alias->length() == 0 || alias->length() > BLOB_ALIAS_LENGTH) 
00364       goto bad_url;
00365     
00366     if (!(otab = MSTableList::getOpenTableForDB(MSDatabase::getDatabaseID(db_name->getCString(), true)))) {
00367       char buffer[CS_EXC_MESSAGE_SIZE];
00368 
00369       cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database: ");
00370       cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
00371       CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
00372     }
00373     frompool_(otab);
00374 
00375     db = otab->getDB();
00376     
00377     // lookup the blob alias in the database.
00378     if (!db->findBlobWithAlias(alias->getCString(), &repo_id, &blob_id)) {
00379       char buffer[CS_EXC_MESSAGE_SIZE];
00380 
00381       cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown alias: ");
00382       cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, alias->getCString());
00383       CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
00384     }
00385       
00386     MSRepoFile *repo_file = db->getRepoFileFromPool(repo_id, false);
00387     
00388     frompool_(repo_file);
00389     repo_file->sendBlob(otab, blob_id, 0, false, info_only, iOutputStream);
00390     backtopool_(repo_file); 
00391       
00392     backtopool_(otab);    
00393     
00394     release_(alias);
00395     release_(db_name);
00396 
00397 #else
00398     char buffer[CS_EXC_MESSAGE_SIZE];
00399 
00400     cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Bad PBMS BLOB URL: ");
00401     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iTableURI->getCString());
00402     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
00403 #endif
00404   }
00405   
00406 
00407   exit_();
00408 
00409   bad_url:
00410   char buffer[CS_EXC_MESSAGE_SIZE];
00411 
00412   cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, bad_url_comment);
00413   cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
00414   CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
00415   exit_();
00416 }
00417 
00418 void MSConnectionHandler::handlePut()
00419 {
00420   MSOpenTable *otab = NULL;
00421   uint32_t  db_id = 0, tab_id;
00422 
00423   enter_();
00424   self->myException.setErrorCode(0);
00425 
00426   iOutputStream->clearHeaders();
00427   iOutputStream->clearBody();
00428   iOutputStream->setStatus(200);
00429   
00430   parseRequestURI();
00431   if (iTableURI->length() != 0)
00432     MSDatabase::convertTablePathToIDs(iTableURI->getCString(), &db_id, &tab_id, true);
00433 
00434 
00435   if ((!db_id) || !(otab = MSTableList::getOpenTableByID(db_id, tab_id))) {
00436     char buffer[CS_EXC_MESSAGE_SIZE];
00437 
00438     cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
00439     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, iInputStream->getRequestURI()->getCString());
00440     CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
00441   }
00442   frompool_(otab);
00443   
00444   uint64_t      blob_len, cloud_blob_len = 0;
00445   PBMSBlobURLRec  bh;
00446   size_t      handle_len;
00447   uint16_t      metadata_size = 0; 
00448   CSStringBuffer  *metadata;
00449   
00450   new_(metadata, CSStringBuffer(80));
00451   push_(metadata);
00452   
00453    if (! iInputStream->getContentLength(&blob_len)) {
00454     CSException::throwException(CS_CONTEXT, CS_ERR_MISSING_HTTP_HEADER, "Missing content length header");
00455    }
00456    
00457   
00458   // Collect the meta data.
00459   for (uint32_t i = 0; i < iInputStream->numHeaders(); i++) {
00460     CSHeader *header = iInputStream->getHeader(i);
00461     const char *name = header->getNameCString();
00462     
00463     push_(header);
00464     
00465     if (!strcmp(name, MS_BLOB_SIZE)) { // The actual BLOB data size if it is being stored in a cloud.
00466       sscanf(header->getValueCString(), "%"PRIu64"", &cloud_blob_len);
00467     }
00468       
00469     if (name && otab->getDB()->isValidHeaderField(name)) {
00470       uint16_t rec_size, name_size, value_size;
00471       const char *value = header->getValueCString();
00472       char *buf;
00473       if (!value)
00474         value = "";
00475         
00476       name_size = strlen(name);
00477       value_size = strlen(value);
00478       
00479       rec_size = name_size + value_size + 2;
00480       metadata->setLength(metadata_size + rec_size);
00481       
00482       buf = metadata->getBuffer(metadata_size);
00483       metadata_size += rec_size;
00484       
00485       memcpy(buf, name, name_size);
00486       buf += name_size;
00487       *buf = 0; buf++;
00488       
00489       memcpy(buf, value, value_size);
00490       buf += value_size;
00491       *buf = 0;
00492     }
00493     
00494     release_(header);
00495   }
00496   
00497   if (blob_len) {
00498     char hex_checksum[33];
00499     Md5Digest checksum;
00500     
00501     otab->createBlob(&bh, blob_len, metadata->getBuffer(0), metadata_size, RETAIN(iInputStream), NULL, &checksum);
00502 
00503     cs_bin_to_hex(33, hex_checksum, 16, checksum.val);
00504     iOutputStream->addHeader(MS_CHECKSUM_TAG, hex_checksum);
00505   } else { // If there is no BLOB data then the client will send it to the cloud server themselves.
00506     if (!cloud_blob_len)
00507       CSException::throwException(CS_CONTEXT, CS_ERR_MISSING_HTTP_HEADER, "Missing BLOB length header for cloud BLOB.");
00508     if (otab->getDB()->myBlobType == MS_CLOUD_STORAGE) {
00509       CloudKeyRec cloud_key;
00510       uint32_t signature_time;
00511       char time_str[20];
00512       CloudDB *cloud = otab->getDB()->myBlobCloud;
00513       MSCloudInfo *info;
00514       
00515       
00516       cloud->cl_getNewKey(&cloud_key);
00517       otab->createBlob(&bh, cloud_blob_len, metadata->getBuffer(0), metadata_size, NULL, &cloud_key);
00518       
00519       CSString *signature;
00520       signature = cloud->cl_getSignature(&cloud_key, iInputStream->getHeaderValue("Content-Type"), &signature_time);
00521       push_(signature);
00522       
00523       info = cloud->cl_getCloudInfo(cloud_key.cloud_ref);
00524       push_(info);
00525       iOutputStream->addHeader(MS_CLOUD_SERVER, info->getServer());
00526       iOutputStream->addHeader(MS_CLOUD_BUCKET, info->getBucket());
00527       iOutputStream->addHeader(MS_CLOUD_KEY, info->getPublicKey());
00528       iOutputStream->addHeader(MS_CLOUD_OBJECT_KEY, cloud->cl_getObjectKey(&cloud_key));
00529       iOutputStream->addHeader(MS_BLOB_SIGNATURE, signature->getCString());
00530       release_(info);
00531       
00532       release_(signature);
00533       snprintf(time_str, 20, "%"PRIu32"", signature_time);
00534       iOutputStream->addHeader(MS_BLOB_DATE, time_str);
00535       
00536     } else {
00537       // If the database is not using cloud storage then the client will
00538       // resend the BLOB data as a normal BLOB when it fails to get the
00539       // expected cloud server infor headers back.
00540       bh.bu_data[0] = 0;
00541     }
00542   }
00543   handle_len = strlen(bh.bu_data);
00544   iOutputStream->setContentLength(handle_len);
00545 
00546   replyPending = false;
00547   iOutputStream->writeHead();
00548   iOutputStream->write(bh.bu_data, handle_len);
00549   iOutputStream->flush();
00550 
00551   release_(metadata);
00552 
00553   backtopool_(otab);
00554 
00555   exit_();
00556 }
00557 
00558 void MSConnectionHandler::serviceConnection()
00559 {
00560   const char  *method;
00561   bool    threadStarted = false;
00562 
00563   for (;;) {
00564     iInputStream->readHead();
00565     if (iInputStream->expect100Continue()) {      
00566       iOutputStream->clearHeaders();
00567       iOutputStream->clearBody();
00568       iOutputStream->setStatus(100);
00569       iOutputStream->setContentLength(0);
00570       iOutputStream->writeHead();
00571       iOutputStream->flush();
00572     }
00573     
00574     if (!(method = iInputStream->getMethod()))
00575       break;
00576     if (!threadStarted /* && iInputStream->keepAlive() */) { // Ignore keepalive: Never trust the client!
00577       /* Start another service handler if no threads
00578        * are waiting to listen!
00579        */
00580       threadStarted = true;
00581       if (!MSNetwork::gWaitingToListen)
00582         MSNetwork::startConnectionHandler();
00583     }
00584     replyPending = true;
00585     if (strcmp(method, "GET") == 0)
00586       handleGet(false);
00587     else if (strcmp(method, "PUT") == 0 ||
00588       strcmp(method, "POST") == 0)
00589       handlePut();
00590     else if (strcmp(method, "HEAD"))
00591       handleGet(true);
00592     else
00593       CSException::throwCoreError(CS_CONTEXT, CS_ERR_UNKNOWN_METHOD, method);
00594   }
00595 }
00596 
00597 bool MSConnectionHandler::initializeWork()
00598 {
00599   return true;
00600 }
00601 
00602 /*
00603  * Return false if no connection this thread should quit!
00604  */
00605 bool MSConnectionHandler::doWork()
00606 {
00607   enter_();
00608 
00609   /* Open a connection: */
00610   if (!openStream()) {
00611     myMustQuit = true;
00612     return_(false);
00613   }
00614 
00615   /* Do the work for the connection: */
00616   serviceConnection();
00617 
00618   /* Close the connection: */
00619   close();
00620 
00621   return_(false);
00622 }
00623 
00624 void *MSConnectionHandler::completeWork()
00625 {
00626   shuttingDown = true;
00627   /* Close the stream, if it was openned. */
00628   close();
00629 
00630   return NULL;
00631 }
00632 
00633 bool MSConnectionHandler::handleException()
00634 {
00635   if (!myMustQuit) {
00636     /* Start another handler if required: */
00637     if (!MSNetwork::gWaitingToListen)
00638       MSNetwork::startConnectionHandler();
00639   }
00640   close();
00641   if (!shuttingDown)
00642     CSDaemon::handleException();
00643   return false;
00644 }
00645 
00646