Drizzled Public API Documentation

session.cc
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2008 Sun Microsystems, Inc.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; version 2 of the License.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  */
19 
24 #include <config.h>
25 
26 #include <boost/checked_delete.hpp>
27 #include <boost/filesystem.hpp>
28 #include <boost/ptr_container/ptr_container.hpp>
29 #include <drizzled/copy_field.h>
30 #include <drizzled/catalog/local.h>
31 #include <drizzled/diagnostics_area.h>
32 #include <drizzled/display.h>
33 #include <drizzled/drizzled.h>
34 #include <drizzled/error.h>
35 #include <drizzled/gettext.h>
36 #include <drizzled/ha_data.h>
37 #include <drizzled/identifier.h>
38 #include <drizzled/internal/iocache.h>
39 #include <drizzled/internal/thread_var.h>
40 #include <drizzled/item/cache.h>
41 #include <drizzled/item/empty_string.h>
42 #include <drizzled/item/float.h>
43 #include <drizzled/item/return_int.h>
44 #include <drizzled/item/subselect.h>
45 #include <drizzled/lock.h>
46 #include <drizzled/open_tables_state.h>
47 #include <drizzled/plugin/authentication.h>
48 #include <drizzled/plugin/authorization.h>
49 #include <drizzled/plugin/client.h>
50 #include <drizzled/plugin/event_observer.h>
51 #include <drizzled/plugin/logging.h>
52 #include <drizzled/plugin/query_rewrite.h>
53 #include <drizzled/plugin/scheduler.h>
54 #include <drizzled/plugin/transactional_storage_engine.h>
55 #include <drizzled/probes.h>
56 #include <drizzled/pthread_globals.h>
57 #include <drizzled/schema.h>
58 #include <drizzled/select_dump.h>
59 #include <drizzled/select_exists_subselect.h>
60 #include <drizzled/select_export.h>
61 #include <drizzled/select_max_min_finder_subselect.h>
62 #include <drizzled/select_singlerow_subselect.h>
63 #include <drizzled/select_subselect.h>
64 #include <drizzled/select_to_file.h>
65 #include <drizzled/session.h>
66 #include <drizzled/session/cache.h>
67 #include <drizzled/session/state.h>
68 #include <drizzled/session/table_messages.h>
69 #include <drizzled/session/times.h>
70 #include <drizzled/session/transactions.h>
71 #include <drizzled/show.h>
72 #include <drizzled/sql_base.h>
73 #include <drizzled/sql_lex.h>
74 #include <drizzled/system_variables.h>
75 #include <drizzled/statement.h>
76 #include <drizzled/statistics_variables.h>
77 #include <drizzled/table/singular.h>
78 #include <drizzled/table_proto.h>
79 #include <drizzled/tmp_table_param.h>
80 #include <drizzled/transaction_services.h>
81 #include <drizzled/user_var_entry.h>
82 #include <drizzled/util/backtrace.h>
83 #include <drizzled/util/find_ptr.h>
84 #include <drizzled/util/functors.h>
85 #include <drizzled/util/storable.h>
86 #include <plugin/myisam/myisam.h>
87 
88 #include <algorithm>
89 #include <climits>
90 #include <fcntl.h>
91 #include <sys/stat.h>
92 
93 using namespace std;
94 
95 namespace fs= boost::filesystem;
96 
97 namespace drizzled {
98 
99 const char* const Session::DEFAULT_WHERE= "field list";
100 
101 uint64_t g_refresh_version = 1;
102 
103 bool Key_part_spec::operator==(const Key_part_spec& other) const
104 {
105  return length == other.length
106  && field_name.size() == other.field_name.size()
107  && not system_charset_info->strcasecmp(field_name.data(), other.field_name.data());
108 }
109 
110 Open_tables_state::Open_tables_state(Session& session, uint64_t version_arg) :
111  version(version_arg),
112  session_(session)
113 {
114  open_tables_= temporary_tables= derived_tables= NULL;
115  extra_lock= lock= NULL;
116 }
117 
118 /*
119  The following functions form part of the C plugin API
120 */
121 int tmpfile(const char *prefix)
122 {
123  char filename[FN_REFLEN];
124  int fd = internal::create_temp_file(filename, drizzle_tmpdir.c_str(), prefix, MYF(MY_WME));
125  if (fd >= 0)
126  unlink(filename);
127  return fd;
128 }
129 
130 void **Session::getEngineData(const plugin::MonitoredInTransaction *monitored)
131 {
132  return static_cast<void **>(&ha_data[monitored->getId()].ha_ptr);
133 }
134 
135 ResourceContext& Session::getResourceContext(const plugin::MonitoredInTransaction& monitored, size_t index)
136 {
137  return ha_data[monitored.getId()].resource_context[index];
138 }
139 
140 int64_t session_test_options(const Session *session, int64_t test_options)
141 {
142  return session->options & test_options;
143 }
144 
146 {
147 public:
148  typedef boost::unordered_map<std::string, util::Storable*, util::insensitive_hash, util::insensitive_equal_to> properties_t;
149  typedef std::map<std::string, plugin::EventObserverList*> schema_event_observers_t;
150 
151  impl_c(Session& session) :
152  open_tables(session, g_refresh_version),
153  schema(boost::make_shared<std::string>())
154  {
155  }
156 
157  ~impl_c()
158  {
159  BOOST_FOREACH(properties_t::reference it, properties)
160  delete it.second;
161  }
162 
163  Diagnostics_area diagnostics;
164  memory::Root mem_root;
165 
172  LEX lex;
173  Open_tables_state open_tables;
174  properties_t properties;
175  schema_event_observers_t schema_event_observers;
176  system_status_var status_var;
177  session::TableMessages table_message_cache;
178  util::string::mptr schema;
179  boost::shared_ptr<session::State> state;
180  boost::ptr_vector<table::Singular> temporary_shares;
181  session::Times times;
182  session::Transactions transaction;
183  drizzle_system_variables variables;
184 };
185 
186 Session::Session(plugin::Client *client_arg, catalog::Instance::shared_ptr catalog_arg) :
187  impl_(new impl_c(*this)),
188  mem(impl_->mem_root),
189  mem_root(&impl_->mem_root),
190  query(new std::string),
191  scheduler(NULL),
192  variables(impl_->variables),
193  status_var(impl_->status_var),
194  lock_id(&main_lock_id),
195  thread_stack(NULL),
196  _where(Session::DEFAULT_WHERE),
197  mysys_var(0),
198  command(COM_CONNECT),
199  ha_data(plugin::num_trx_monitored_objects),
200  query_id(0),
201  warn_query_id(0),
202  transaction(impl_->transaction),
203  open_tables(impl_->open_tables),
204  times(impl_->times),
205  first_successful_insert_id_in_prev_stmt(0),
206  first_successful_insert_id_in_cur_stmt(0),
207  limit_found_rows(0),
208  options(session_startup_options),
209  row_count_func(-1),
210  sent_row_count(0),
211  examined_row_count(0),
212  used_tables(0),
213  total_warn_count(0),
214  row_count(0),
215  thread_id(0),
216  tmp_table(0),
217  _global_read_lock(NONE),
218  count_cuted_fields(CHECK_FIELD_ERROR_FOR_NULL),
219  _killed(NOT_KILLED),
220  no_errors(false),
221  is_fatal_error(false),
222  transaction_rollback_request(false),
223  is_fatal_sub_stmt_error(0),
224  derived_tables_processing(false),
225  m_lip(NULL),
226  arg_of_last_insert_id_function(false),
227  _catalog(catalog_arg),
228  transaction_message(NULL),
229  statement_message(NULL),
230  session_event_observers(NULL),
231  xa_id(0),
232  concurrent_execute_allowed(true),
233  tablespace_op(false),
234  use_usage(false),
235  security_ctx(identifier::User::make_shared()),
236  originating_server_uuid_set(false),
237  client(client_arg)
238 {
239  client->setSession(this);
240 
241  /*
242  Pass nominal parameters to init only to ensure that
243  the destructor works OK in case of an error. The main_mem_root
244  will be re-initialized in init_for_queries().
245  */
246  mem.init(memory::ROOT_MIN_BLOCK_SIZE);
248  // Must be reset to handle error with Session's created for init of mysqld
249  lex().current_select= 0;
250  memset(&variables, 0, sizeof(variables));
251  scoreboard_index= -1;
252  originating_server_uuid= "";
253  originating_commit_id= 0;
254  cleanup_done= abort_on_warning= no_warnings_for_error= false;
255 
256  resultset= NULL;
257 
258  /* Variables with default values */
259  proc_info="login";
260 
261  plugin_sessionvar_init(this);
262  /*
263  variables= global_system_variables above has reset
264  variables.pseudo_thread_id to 0. We need to correct it here to
265  avoid temporary tables replication failure.
266  */
267  variables.pseudo_thread_id= thread_id;
268  server_status= SERVER_STATUS_AUTOCOMMIT;
269 
270  if (variables.max_join_size == HA_POS_ERROR)
271  options |= OPTION_BIG_SELECTS;
272  else
273  options &= ~OPTION_BIG_SELECTS;
274 
275  open_options=ha_open_options;
276  update_lock_default= TL_WRITE;
277  session_tx_isolation= (enum_tx_isolation) variables.tx_isolation;
278  memset(warn_count, 0, sizeof(warn_count));
279  memset(&status_var, 0, sizeof(status_var));
280 
281  /* Initialize sub structures */
282  warn_root.init(WARN_ALLOC_BLOCK_SIZE);
283 
285  lock_info.init(); /* safety: will be reset after start */
286  main_lock_id.info= &lock_info;
287 
288  plugin::EventObserver::registerSessionEvents(*this);
289 }
290 
291 Diagnostics_area& Session::main_da()
292 {
293  return impl_->diagnostics;
294 }
295 
296 const LEX& Session::lex() const
297 {
298  return impl_->lex;
299 }
300 
301 LEX& Session::lex()
302 {
303  return impl_->lex;
304 }
305 
306 enum_sql_command Session::getSqlCommand() const
307 {
308  return lex().sql_command;
309 }
310 
311 session::TableMessages& Session::getMessageCache()
312 {
313  return impl_->table_message_cache;
314 }
315 
316 void statement::Statement::set_command(enum_sql_command v)
317 {
318  session().lex().sql_command= v;
319 }
320 
321 LEX& statement::Statement::lex()
322 {
323  return session().lex();
324 }
325 
326 session::Transactions& statement::Statement::transaction()
327 {
328  return session().transaction;
329 }
330 
331 void Session::add_item_to_list(Item *item)
332 {
333  lex().current_select->add_item_to_list(this, item);
334 }
335 
336 void Session::add_value_to_list(Item *value)
337 {
338  lex().value_list.push_back(value);
339 }
340 
341 void Session::add_order_to_list(Item *item, bool asc)
342 {
343  lex().current_select->add_order_to_list(this, item, asc);
344 }
345 
346 void Session::add_group_to_list(Item *item, bool asc)
347 {
348  lex().current_select->add_group_to_list(this, item, asc);
349 }
350 
352 {
353  /* This works because items are allocated with memory::sql_alloc() */
354  for (Item* next; free_list; free_list= next)
355  {
356  next= free_list->next;
357  free_list->delete_self();
358  }
359 }
360 
361 void Session::setAbort(bool arg)
362 {
363  mysys_var->abort= arg;
364 }
365 
366 void Session::lockOnSys()
367 {
368  if (not mysys_var)
369  return;
370 
371  setAbort(true);
372  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
373  if (mysys_var->current_cond)
374  {
375  mysys_var->current_mutex->lock();
376  mysys_var->current_cond->notify_all();
377  mysys_var->current_mutex->unlock();
378  }
379 }
380 
381 void Session::get_xid(DrizzleXid *xid) const
382 {
383  *xid = *(DrizzleXid *) &transaction.xid_state.xid;
384 }
385 
386 /* Do operations that may take a long time */
387 
388 void Session::cleanup()
389 {
390  assert(not cleanup_done);
391 
392  setKilled(KILL_CONNECTION);
393 
394  /* In the future, you may want to do something about XA_PREPARED here.
395  In the dim distant past there was some #ifdefed out #error here about it.
396  */
397  TransactionServices::rollbackTransaction(*this, true);
398 
399  BOOST_FOREACH(UserVars::reference iter, user_vars)
400  boost::checked_delete(iter.second);
401  user_vars.clear();
402 
403  open_tables.close_temporary_tables();
404 
405  if (global_read_lock)
406  unlockGlobalReadLock();
407 
408  cleanup_done= true;
409 }
410 
411 Session::~Session()
412 {
413  if (client and client->isConnected())
414  {
415  assert(security_ctx);
416  if (global_system_variables.log_warnings)
417  {
418  errmsg_printf(error::WARN, ER(ER_FORCING_CLOSE), internal::my_progname, thread_id, security_ctx->username().c_str());
419  }
420 
421  disconnect();
422  }
423 
424  /* Close connection */
425  if (client)
426  {
427  client->close();
428  boost::checked_delete(client);
429  client= NULL;
430  }
431 
432  if (not cleanup_done)
433  cleanup();
434 
436  plugin_sessionvar_cleanup(this);
437 
438  warn_root.free_root(MYF(0));
439  mysys_var=0; // Safety (shouldn't be needed)
440 
441  impl_->mem_root.free_root(MYF(0));
442  setCurrentMemRoot(NULL);
443  setCurrentSession(NULL);
444 
445  plugin::Logging::postEndDo(this);
446  plugin::EventObserver::deregisterSessionEvents(session_event_observers);
447 
448  BOOST_FOREACH(impl_c::schema_event_observers_t::reference it, impl_->schema_event_observers)
449  plugin::EventObserver::deregisterSchemaEvents(it.second);
450 }
451 
452 void Session::setClient(plugin::Client *client_arg)
453 {
454  client= client_arg;
455  client->setSession(this);
456 }
457 
458 void Session::awake(Session::killed_state_t state_to_set)
459 {
460  if (state_to_set == Session::KILL_QUERY && command == COM_SLEEP)
461  return;
462 
463  setKilled(state_to_set);
464  scheduler->killSession(this);
465 
466  if (state_to_set != Session::KILL_QUERY)
467  {
468  DRIZZLE_CONNECTION_DONE(thread_id);
469  }
470 
471  if (mysys_var)
472  {
473  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
474  /*
475  "
476  This broadcast could be up in the air if the victim thread
477  exits the cond in the time between read and broadcast, but that is
478  ok since all we want to do is to make the victim thread get out
479  of waiting on current_cond.
480  If we see a non-zero current_cond: it cannot be an old value (because
481  then exit_cond() should have run and it can't because we have mutex); so
482  it is the true value but maybe current_mutex is not yet non-zero (we're
483  in the middle of enter_cond() and there is a "memory order
484  inversion"). So we test the mutex too to not lock 0.
485 
486  Note that there is a small chance we fail to kill. If victim has locked
487  current_mutex, but hasn't yet entered enter_cond() (which means that
488  current_cond and current_mutex are 0), then the victim will not get
489  a signal and it may wait "forever" on the cond (until
490  we issue a second KILL or the status it's waiting for happens).
491  It's true that we have set its session->killed but it may not
492  see it immediately and so may have time to reach the cond_wait().
493  */
494  if (mysys_var->current_cond && mysys_var->current_mutex)
495  {
496  mysys_var->current_mutex->lock();
497  mysys_var->current_cond->notify_all();
498  mysys_var->current_mutex->unlock();
499  }
500  }
501 }
502 
503 /*
504  Remember the location of thread info, the structure needed for
505  memory::sql_alloc() and the structure for the net buffer
506 */
507 void Session::storeGlobals()
508 {
509  /*
510  Assert that thread_stack is initialized: it's necessary to be able
511  to track stack overrun.
512  */
513  assert(thread_stack);
514  setCurrentSession(this);
515  setCurrentMemRoot(&mem);
516 
517  mysys_var= internal::my_thread_var2().get();
518 
519  /*
520  Let mysqld define the thread id (not mysys)
521  This allows us to move Session to different threads if needed.
522  */
523  mysys_var->id= thread_id;
524 
525  /*
526  We have to call thr_lock_info_init() again here as Session may have been
527  created in another thread
528  */
529  lock_info.init();
530 }
531 
532 /*
533  Init Session for query processing.
534  This has to be called once before we call mysql_parse.
535  See also comments in session.h.
536 */
537 
539 {
540  if (variables.max_join_size == HA_POS_ERROR)
541  options |= OPTION_BIG_SELECTS;
542 
543  open_tables.version= g_refresh_version;
544  set_proc_info(NULL);
545  command= COM_SLEEP;
546  times.set_time();
547 
548  mem.reset_defaults(variables.query_alloc_block_size, variables.query_prealloc_size);
549  transaction.xid_state.xid.set_null();
550  transaction.xid_state.in_session=1;
551  if (use_usage)
552  resetUsage();
553 }
554 
555 void Session::run()
556 {
557  storeGlobals();
558  if (authenticate())
559  {
560  disconnect();
561  return;
562  }
564  while (not client->haveError() && getKilled() != KILL_CONNECTION)
565  {
566  if (not executeStatement())
567  break;
568  }
569  disconnect();
570 }
571 
572 bool Session::schedule(const shared_ptr& arg)
573 {
574  arg->scheduler= plugin::Scheduler::getScheduler();
575  assert(arg->scheduler);
576 
577  ++connection_count;
578 
579  long current_connections= connection_count;
580 
581  if (current_connections > 0 and static_cast<uint64_t>(current_connections) > current_global_counters.max_used_connections)
582  {
583  current_global_counters.max_used_connections= static_cast<uint64_t>(connection_count);
584  }
585 
586  current_global_counters.connections++;
587  arg->thread_id= arg->variables.pseudo_thread_id= global_thread_id++;
588 
589  session::Cache::insert(arg);
590 
591  if (unlikely(plugin::EventObserver::connectSession(*arg)))
592  {
593  // We should do something about an error...
594  }
595 
596  if (plugin::Scheduler::getScheduler()->addSession(arg))
597  {
598  DRIZZLE_CONNECTION_START(arg->getSessionId());
599  char error_message_buff[DRIZZLE_ERRMSG_SIZE];
600 
601  arg->setKilled(Session::KILL_CONNECTION);
602 
603  arg->status_var.aborted_connects++;
604 
605  /* Can't use my_error() since store_globals has not been called. */
606  /* TODO replace will better error message */
607  snprintf(error_message_buff, sizeof(error_message_buff), ER(ER_CANT_CREATE_THREAD), 1);
608  arg->client->sendError(ER_CANT_CREATE_THREAD, error_message_buff);
609  return true;
610  }
611  return false;
612 }
613 
614 
615 /*
616  Is this session viewable by the current user?
617 */
618 bool Session::isViewable(const identifier::User& user_arg) const
619 {
620  return plugin::Authorization::isAuthorized(user_arg, *this, false);
621 }
622 
623 
624 const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
625 {
626  const char* old_msg = get_proc_info();
627  safe_mutex_assert_owner(mutex);
628  mysys_var->current_mutex = &mutex;
629  mysys_var->current_cond = &cond;
630  this->set_proc_info(msg);
631  return old_msg;
632 }
633 
634 void Session::exit_cond(const char* old_msg)
635 {
636  /*
637  Putting the mutex unlock in exit_cond() ensures that
638  mysys_var->current_mutex is always unlocked _before_ mysys_var->mutex is
639  locked (if that would not be the case, you'll get a deadlock if someone
640  does a Session::awake() on you).
641  */
642  mysys_var->current_mutex->unlock();
643  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
644  mysys_var->current_mutex = 0;
645  mysys_var->current_cond = 0;
646  this->set_proc_info(old_msg);
647 }
648 
650 {
651  if (client->authenticate())
652  return false;
653 
654  status_var.aborted_connects++;
655 
656  return true;
657 }
658 
659 bool Session::checkUser(const std::string &passwd_str, const std::string &in_db)
660 {
661  if (not plugin::Authentication::isAuthenticated(*user(), passwd_str))
662  {
663  status_var.access_denied++;
664  /* isAuthenticated has pushed the error message */
665  return false;
666  }
667 
668  /* Change database if necessary */
669  if (not in_db.empty() && schema::change(*this, identifier::Schema(in_db)))
670  return false; // change() has pushed the error message
671  my_ok();
672 
673  /* Ready to handle queries */
674  return true;
675 }
676 
678 {
679  /*
680  indicator of uninitialized lex => normal flow of errors handling
681  (see my_message_sql)
682  */
683  lex().current_select= 0;
684  clear_error();
685  main_da().reset_diagnostics_area();
686  char *l_packet= 0;
687  uint32_t packet_length;
688  if (not client->readCommand(&l_packet, packet_length))
689  return false;
690 
691  if (getKilled() == KILL_CONNECTION)
692  return false;
693 
694  if (packet_length == 0)
695  return true;
696 
697  enum_server_command l_command= static_cast<enum_server_command>(l_packet[0]);
698 
699  if (command >= COM_END)
700  command= COM_END; // Wrong command
701 
702  assert(packet_length);
703  return not dispatch_command(l_command, this, l_packet+1, (uint32_t) (packet_length-1));
704 }
705 
706 void Session::readAndStoreQuery(const char *in_packet, uint32_t in_packet_length)
707 {
708  /* Remove garbage at start and end of query */
709  while (in_packet_length > 0 && charset()->isspace(in_packet[0]))
710  {
711  in_packet++;
712  in_packet_length--;
713  }
714  const char *pos= in_packet + in_packet_length; /* Point at end null */
715  while (in_packet_length > 0 && (pos[-1] == ';' || charset()->isspace(pos[-1])))
716  {
717  pos--;
718  in_packet_length--;
719  }
720 
721  util::string::mptr new_query= boost::make_shared<std::string>(in_packet, in_packet_length);
722  plugin::QueryRewriter::rewriteQuery(*impl_->schema, *new_query);
723  query= new_query;
724  impl_->state= boost::make_shared<session::State>(in_packet, in_packet_length);
725 }
726 
727 bool Session::endTransaction(enum_mysql_completiontype completion)
728 {
729  bool do_release= 0;
730  bool result= true;
731 
732  if (transaction.xid_state.xa_state != XA_NOTR)
733  {
734  my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
735  return false;
736  }
737  switch (completion)
738  {
739  case COMMIT:
740  /*
741  * We don't use endActiveTransaction() here to ensure that this works
742  * even if there is a problem with the OPTION_AUTO_COMMIT flag
743  * (Which of course should never happen...)
744  */
745  server_status&= ~SERVER_STATUS_IN_TRANS;
747  result= false;
748  options&= ~(OPTION_BEGIN);
749  break;
750  case COMMIT_RELEASE:
751  do_release= 1; /* fall through */
752  case COMMIT_AND_CHAIN:
753  result= endActiveTransaction();
754  if (result == true && completion == COMMIT_AND_CHAIN)
755  result= startTransaction();
756  break;
757  case ROLLBACK_RELEASE:
758  do_release= 1; /* fall through */
759  case ROLLBACK:
760  case ROLLBACK_AND_CHAIN:
761  {
762  server_status&= ~SERVER_STATUS_IN_TRANS;
763  if (TransactionServices::rollbackTransaction(*this, true))
764  result= false;
765  options&= ~(OPTION_BEGIN);
766  if (result == true && (completion == ROLLBACK_AND_CHAIN))
767  result= startTransaction();
768  break;
769  }
770  default:
771  my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
772  return false;
773  }
774 
775  if (not result)
776  {
777  my_error(static_cast<drizzled::error_t>(killed_errno()), MYF(0));
778  }
779  else if (result && do_release)
780  {
781  setKilled(Session::KILL_CONNECTION);
782  }
783 
784  return result;
785 }
786 
787 bool Session::endActiveTransaction()
788 {
789  bool result= true;
790 
791  if (transaction.xid_state.xa_state != XA_NOTR)
792  {
793  my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[transaction.xid_state.xa_state]);
794  return false;
795  }
796  if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
797  {
798  server_status&= ~SERVER_STATUS_IN_TRANS;
800  result= false;
801  }
802  options&= ~(OPTION_BEGIN);
803  return result;
804 }
805 
806 bool Session::startTransaction(start_transaction_option_t opt)
807 {
808  assert(not inTransaction());
809 
810  options|= OPTION_BEGIN;
811  server_status|= SERVER_STATUS_IN_TRANS;
812 
814  return false;
815  return true;
816 }
817 
819 {
820  /*
821  Reset rand_used so that detection of calls to rand() will save random
822  seeds if needed by the slave.
823  */
825  {
826  /* set what LAST_INSERT_ID() will return */
830  }
831 
832  arg_of_last_insert_id_function= false;
833 
834  /* Free Items that were created during this execution */
835  free_items();
836 
837  /* Reset _where. */
839 
840  /* Reset the temporary shares we built */
841  impl_->temporary_shares.clear();
842 }
843 
855 {
856  if (not lex_str)
857  lex_str= new (mem) lex_string_t;
858  lex_str->assign(mem_root->strdup(str), str.size());
859  return lex_str;
860 }
861 
862 void Session::send_explain_fields(select_result *result)
863 {
864  List<Item> field_list;
865  Item *item;
866  const charset_info_st* cs= system_charset_info;
867  field_list.push_back(new Item_return_int("id",3, DRIZZLE_TYPE_LONGLONG));
868  field_list.push_back(new Item_empty_string("select_type", 19, cs));
869  field_list.push_back(item= new Item_empty_string("table", NAME_CHAR_LEN, cs));
870  item->maybe_null= 1;
871  field_list.push_back(item= new Item_empty_string("type", 10, cs));
872  item->maybe_null= 1;
873  field_list.push_back(item= new Item_empty_string("possible_keys", NAME_CHAR_LEN*MAX_KEY, cs));
874  item->maybe_null=1;
875  field_list.push_back(item= new Item_empty_string("key", NAME_CHAR_LEN, cs));
876  item->maybe_null=1;
877  field_list.push_back(item= new Item_empty_string("key_len", MAX_KEY * (MAX_KEY_LENGTH_DECIMAL_WIDTH + 1 /* for comma */), cs));
878  item->maybe_null=1;
879  field_list.push_back(item= new Item_empty_string("ref", NAME_CHAR_LEN*MAX_REF_PARTS, cs));
880  item->maybe_null=1;
881  field_list.push_back(item= new Item_return_int("rows", 10, DRIZZLE_TYPE_LONGLONG));
882  if (lex().describe & DESCRIBE_EXTENDED)
883  {
884  field_list.push_back(item= new Item_float("filtered", 0.1234, 2, 4));
885  item->maybe_null=1;
886  }
887  item->maybe_null= 1;
888  field_list.push_back(new Item_empty_string("Extra", 255, cs));
889  result->send_fields(field_list);
890 }
891 
892 void select_result::send_error(drizzled::error_t errcode, const char *err)
893 {
894  my_message(errcode, err, MYF(0));
895 }
896 
897 /************************************************************************
898  Handling writing to file
899 ************************************************************************/
900 
901 void select_to_file::send_error(drizzled::error_t errcode,const char *err)
902 {
903  my_message(errcode, err, MYF(0));
904  if (file > 0)
905  {
906  (void) cache->end_io_cache();
907  (void) internal::my_close(file, MYF(0));
908  (void) internal::my_delete(path.file_string().c_str(), MYF(0)); // Delete file on error
909  file= -1;
910  }
911 }
912 
913 
914 bool select_to_file::send_eof()
915 {
916  int error= test(cache->end_io_cache());
917  if (internal::my_close(file, MYF(MY_WME)))
918  error= 1;
919  if (!error)
920  {
921  /*
922  In order to remember the value of affected rows for ROW_COUNT()
923  function, SELECT INTO has to have an own SQLCOM.
924  TODO: split from SQLCOM_SELECT
925  */
926  session->my_ok(row_count);
927  }
928  file= -1;
929  return error;
930 }
931 
932 
933 void select_to_file::cleanup()
934 {
935  /* In case of error send_eof() may be not called: close the file here. */
936  if (file >= 0)
937  {
938  (void) cache->end_io_cache();
939  (void) internal::my_close(file, MYF(0));
940  file= -1;
941  }
942  path= "";
943  row_count= 0;
944 }
945 
946 select_to_file::select_to_file(file_exchange *ex)
947  : exchange(ex),
948  file(-1),
949  cache(static_cast<internal::io_cache_st *>(memory::sql_calloc(sizeof(internal::io_cache_st)))),
950  row_count(0L)
951 {
952  path= "";
953 }
954 
955 select_to_file::~select_to_file()
956 {
957  cleanup();
958 }
959 
960 /***************************************************************************
961 ** Export of select to textfile
962 ***************************************************************************/
963 
964 select_export::~select_export()
965 {
966  session->sent_row_count=row_count;
967 }
968 
969 
970 /*
971  Create file with IO cache
972 
973  SYNOPSIS
974  create_file()
975  session Thread handle
976  path File name
977  exchange Excange class
978  cache IO cache
979 
980  RETURN
981  >= 0 File handle
982  -1 Error
983 */
984 
985 
986 static int create_file(Session& session,
987  fs::path &target_path,
988  file_exchange *exchange,
989  internal::io_cache_st *cache)
990 {
991  fs::path to_file(exchange->file_name);
992 
993  if (not to_file.has_root_directory())
994  {
995  target_path= fs::system_complete(catalog::local_identifier().getPath());
996  util::string::ptr schema(session.schema());
997  if (not schema->empty())
998  {
999  int count_elements= 0;
1000  for (fs::path::iterator it= to_file.begin(); it != to_file.end(); it++)
1001  count_elements++;
1002  if (count_elements == 1)
1003  target_path /= *schema;
1004  }
1005  target_path /= to_file;
1006  }
1007  else
1008  {
1009  target_path = exchange->file_name;
1010  }
1011 
1012  if (not secure_file_priv.string().empty())
1013  {
1014  if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1015  {
1016  /* Write only allowed to dir or subdir specified by secure_file_priv */
1017  my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1018  return -1;
1019  }
1020  }
1021 
1022  if (!access(target_path.file_string().c_str(), F_OK))
1023  {
1024  my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1025  return -1;
1026  }
1027  /* Create the file world readable */
1028  int file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME));
1029  if (file < 0)
1030  return file;
1031  (void) fchmod(file, 0666); // Because of umask()
1032  if (cache->init_io_cache(file, 0, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1033  {
1034  internal::my_close(file, MYF(0));
1035  internal::my_delete(target_path.file_string().c_str(), MYF(0)); // Delete file on error, it was just created
1036  return -1;
1037  }
1038  return file;
1039 }
1040 
1041 
1042 int
1043 select_export::prepare(List<Item> &list, Select_Lex_Unit *u)
1044 {
1045  bool blob_flag=0;
1046  bool string_results= false, non_string_results= false;
1047  unit= u;
1048  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1049  {
1050  path= exchange->file_name;
1051  }
1052 
1053  /* Check if there is any blobs in data */
1054  {
1055  List<Item>::iterator li(list.begin());
1056  while (Item* item= li++)
1057  {
1058  if (item->max_length >= MAX_BLOB_WIDTH)
1059  {
1060  blob_flag=1;
1061  break;
1062  }
1063 
1064  if (item->result_type() == STRING_RESULT)
1065  string_results= true;
1066  else
1067  non_string_results= true;
1068  }
1069  }
1070  field_term_length=exchange->field_term->length();
1071  field_term_char= field_term_length ?
1072  (int) (unsigned char) (*exchange->field_term)[0] : INT_MAX;
1073  if (!exchange->line_term->length())
1074  exchange->line_term=exchange->field_term; // Use this if it exists
1075  field_sep_char= exchange->enclosed->length() ? (int) (unsigned char) (*exchange->enclosed)[0] : field_term_char;
1076  escape_char= exchange->escaped->length() ? (int) (unsigned char) (*exchange->escaped)[0] : -1;
1077  is_ambiguous_field_sep= test(strchr(ESCAPE_CHARS, field_sep_char));
1078  is_unsafe_field_sep= test(strchr(NUMERIC_CHARS, field_sep_char));
1079  line_sep_char= exchange->line_term->length() ? (int) (unsigned char) (*exchange->line_term)[0] : INT_MAX;
1080  if (!field_term_length)
1081  exchange->opt_enclosed=0;
1082  if (!exchange->enclosed->length())
1083  exchange->opt_enclosed=1; // A little quicker loop
1084  fixed_row_size= (!field_term_length && !exchange->enclosed->length() &&
1085  !blob_flag);
1086  if ((is_ambiguous_field_sep && exchange->enclosed->empty() && (string_results || is_unsafe_field_sep)) ||
1087  (exchange->opt_enclosed && non_string_results && field_term_length && strchr(NUMERIC_CHARS, field_term_char)))
1088  {
1089  my_error(ER_AMBIGUOUS_FIELD_TERM, MYF(0));
1090  return 1;
1091  }
1092 
1093  if ((file= create_file(*session, path, exchange, cache)) < 0)
1094  return 1;
1095 
1096  return 0;
1097 }
1098 
1099 bool select_export::send_data(List<Item> &items)
1100 {
1101  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
1102  bool space_inited=0;
1103  String tmp(buff,sizeof(buff),&my_charset_bin),*res;
1104  tmp.length(0);
1105 
1106  if (unit->offset_limit_cnt)
1107  { // using limit offset,count
1108  unit->offset_limit_cnt--;
1109  return false;
1110  }
1111  row_count++;
1112  uint32_t used_length=0,items_left=items.size();
1113  List<Item>::iterator li(items.begin());
1114 
1115  if (cache->write(exchange->line_start->ptr(), exchange->line_start->length()))
1116  return true;
1117 
1118  while (Item* item=li++)
1119  {
1120  Item_result result_type=item->result_type();
1121  bool enclosed = (exchange->enclosed->length() &&
1122  (!exchange->opt_enclosed || result_type == STRING_RESULT));
1123  res=item->str_result(&tmp);
1124  if (res && enclosed)
1125  {
1126  if (cache->write(exchange->enclosed->ptr(), exchange->enclosed->length()))
1127  return true;
1128  }
1129  if (!res)
1130  { // NULL
1131  if (!fixed_row_size)
1132  {
1133  if (escape_char != -1) // Use \N syntax
1134  {
1135  null_buff[0]=escape_char;
1136  null_buff[1]='N';
1137  if (cache->write(null_buff, 2))
1138  return true;
1139  }
1140  else if (cache->write("NULL", 4))
1141  return true;
1142  }
1143  else
1144  {
1145  used_length=0; // Fill with space
1146  }
1147  }
1148  else
1149  {
1150  if (fixed_row_size)
1151  used_length= min(res->length(), static_cast<size_t>(item->max_length));
1152  else
1153  used_length= res->length();
1154 
1155  if ((result_type == STRING_RESULT || is_unsafe_field_sep) &&
1156  escape_char != -1)
1157  {
1158  char *pos, *start, *end;
1159  const charset_info_st* const res_charset= res->charset();
1160 
1161  for (start= pos= (char*) res->ptr(),end=pos+used_length; pos != end; pos++)
1162  {
1163  if (use_mb(res_charset))
1164  {
1165  if (int l= my_ismbchar(res_charset, pos, end))
1166  {
1167  pos += l - 1;
1168  continue;
1169  }
1170  }
1171 
1172  /*
1173  Special case when dumping BINARY/VARBINARY/BLOB values
1174  for the clients with character sets big5, cp932, gbk and sjis,
1175  which can have the escape character (0x5C "\" by default)
1176  as the second byte of a multi-byte sequence.
1177 
1178  If
1179  - pos[0] is a valid multi-byte head (e.g 0xEE) and
1180  - pos[1] is 0x00, which will be escaped as "\0",
1181 
1182  then we'll get "0xEE + 0x5C + 0x30" in the output file.
1183 
1184  If this file is later loaded using this sequence of commands:
1185 
1186  mysql> create table t1 (a varchar(128)) character set big5;
1187  mysql> LOAD DATA INFILE 'dump.txt' INTO Table t1;
1188 
1189  then 0x5C will be misinterpreted as the second byte
1190  of a multi-byte character "0xEE + 0x5C", instead of
1191  escape character for 0x00.
1192 
1193  To avoid this confusion, we'll escape the multi-byte
1194  head character too, so the sequence "0xEE + 0x00" will be
1195  dumped as "0x5C + 0xEE + 0x5C + 0x30".
1196 
1197  Note, in the condition below we only check if
1198  mbcharlen is equal to 2, because there are no
1199  character sets with mbmaxlen longer than 2
1200  and with escape_with_backslash_is_dangerous set.
1201  assert before the loop makes that sure.
1202  */
1203 
1204  if (needs_escaping(*pos, enclosed) &&
1205  /*
1206  Don't escape field_term_char by doubling - doubling is only
1207  valid for ENCLOSED BY characters:
1208  */
1209  (enclosed || !is_ambiguous_field_term ||
1210  (int) (unsigned char) *pos != field_term_char))
1211  {
1212  char tmp_buff[2];
1213  tmp_buff[0]= ((int) (unsigned char) *pos == field_sep_char &&
1214  is_ambiguous_field_sep) ? field_sep_char : escape_char;
1215  tmp_buff[1]= *pos ? *pos : '0';
1216  if (cache->write(start, pos - start) || cache->write(tmp_buff, 2))
1217  return true;
1218  start=pos+1;
1219  }
1220  }
1221  if (cache->write(start, pos - start))
1222  return true;
1223  }
1224  else if (cache->write(res->ptr(), used_length))
1225  return true;
1226  }
1227  if (fixed_row_size)
1228  { // Fill with space
1229  if (item->max_length > used_length)
1230  {
1231  /* QQ: Fix by adding a my_b_fill() function */
1232  if (!space_inited)
1233  {
1234  space_inited=1;
1235  memset(space, ' ', sizeof(space));
1236  }
1237  uint32_t length=item->max_length-used_length;
1238  for (; length > sizeof(space) ; length-=sizeof(space))
1239  {
1240  if (cache->write(space, sizeof(space)))
1241  return true;
1242  }
1243  if (cache->write(space, length))
1244  return true;
1245  }
1246  }
1247  if (res && enclosed)
1248  {
1249  if (cache->write(exchange->enclosed->ptr(), exchange->enclosed->length()))
1250  return true;
1251  }
1252  if (--items_left)
1253  {
1254  if (cache->write(exchange->field_term->ptr(), field_term_length))
1255  return true;
1256  }
1257  }
1258  if (cache->write(exchange->line_term->ptr(), exchange->line_term->length()))
1259  {
1260  return true;
1261  }
1262 
1263  return false;
1264 }
1265 
1266 
1267 /***************************************************************************
1268 ** Dump of select to a binary file
1269 ***************************************************************************/
1270 
1271 
1272 int
1273 select_dump::prepare(List<Item> &, Select_Lex_Unit *u)
1274 {
1275  unit= u;
1276  return (file= create_file(*session, path, exchange, cache)) < 0;
1277 }
1278 
1279 
1280 bool select_dump::send_data(List<Item> &items)
1281 {
1282  List<Item>::iterator li(items.begin());
1283  char buff[MAX_FIELD_WIDTH];
1284  String tmp(buff,sizeof(buff),&my_charset_bin),*res;
1285  tmp.length(0);
1286 
1287  if (unit->offset_limit_cnt)
1288  { // using limit offset,count
1289  unit->offset_limit_cnt--;
1290  return 0;
1291  }
1292  if (row_count++ > 1)
1293  {
1294  my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1295  return 1;
1296  }
1297  while (Item* item=li++)
1298  {
1299  res=item->str_result(&tmp);
1300  if (!res) // If NULL
1301  {
1302  if (cache->write("", 1))
1303  return 1;
1304  }
1305  else if (cache->write(res->ptr(), res->length()))
1306  {
1307  my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
1308  return 1;
1309  }
1310  }
1311  return 0;
1312 }
1313 
1314 
1315 select_subselect::select_subselect(Item_subselect *item_arg)
1316 {
1317  item= item_arg;
1318 }
1319 
1320 
1321 bool select_singlerow_subselect::send_data(List<Item> &items)
1322 {
1323  Item_singlerow_subselect *it= (Item_singlerow_subselect *)item;
1324  if (it->assigned())
1325  {
1326  my_message(ER_SUBQUERY_NO_1_ROW, ER(ER_SUBQUERY_NO_1_ROW), MYF(0));
1327  return 1;
1328  }
1329  if (unit->offset_limit_cnt)
1330  { // Using limit offset,count
1331  unit->offset_limit_cnt--;
1332  return 0;
1333  }
1334  List<Item>::iterator li(items.begin());
1335  Item *val_item;
1336  for (uint32_t i= 0; (val_item= li++); i++)
1337  it->store(i, val_item);
1338  it->assigned(1);
1339  return 0;
1340 }
1341 
1342 
1343 void select_max_min_finder_subselect::cleanup()
1344 {
1345  cache= 0;
1346 }
1347 
1348 
1349 bool select_max_min_finder_subselect::send_data(List<Item> &items)
1350 {
1351  Item_maxmin_subselect *it= (Item_maxmin_subselect *)item;
1352  List<Item>::iterator li(items.begin());
1353  Item *val_item= li++;
1354  it->register_value();
1355  if (it->assigned())
1356  {
1357  cache->store(val_item);
1358  if ((this->*op)())
1359  it->store(0, cache);
1360  }
1361  else
1362  {
1363  if (!cache)
1364  {
1365  cache= Item_cache::get_cache(val_item);
1366  switch (val_item->result_type())
1367  {
1368  case REAL_RESULT:
1369  op= &select_max_min_finder_subselect::cmp_real;
1370  break;
1371  case INT_RESULT:
1372  op= &select_max_min_finder_subselect::cmp_int;
1373  break;
1374  case STRING_RESULT:
1375  op= &select_max_min_finder_subselect::cmp_str;
1376  break;
1377  case DECIMAL_RESULT:
1378  op= &select_max_min_finder_subselect::cmp_decimal;
1379  break;
1380  case ROW_RESULT:
1381  // This case should never be choosen
1382  assert(0);
1383  op= 0;
1384  }
1385  }
1386  cache->store(val_item);
1387  it->store(0, cache);
1388  }
1389  it->assigned(1);
1390  return 0;
1391 }
1392 
1393 bool select_max_min_finder_subselect::cmp_real()
1394 {
1395  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1396  double val1= cache->val_real(), val2= maxmin->val_real();
1397  if (fmax)
1398  return (cache->null_value && !maxmin->null_value) ||
1399  (!cache->null_value && !maxmin->null_value &&
1400  val1 > val2);
1401  return (maxmin->null_value && !cache->null_value) ||
1402  (!cache->null_value && !maxmin->null_value &&
1403  val1 < val2);
1404 }
1405 
1406 bool select_max_min_finder_subselect::cmp_int()
1407 {
1408  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1409  int64_t val1= cache->val_int(), val2= maxmin->val_int();
1410  if (fmax)
1411  return (cache->null_value && !maxmin->null_value) ||
1412  (!cache->null_value && !maxmin->null_value &&
1413  val1 > val2);
1414  return (maxmin->null_value && !cache->null_value) ||
1415  (!cache->null_value && !maxmin->null_value &&
1416  val1 < val2);
1417 }
1418 
1419 bool select_max_min_finder_subselect::cmp_decimal()
1420 {
1421  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1422  type::Decimal cval, *cvalue= cache->val_decimal(&cval);
1423  type::Decimal mval, *mvalue= maxmin->val_decimal(&mval);
1424  if (fmax)
1425  return (cache->null_value && !maxmin->null_value) ||
1426  (!cache->null_value && !maxmin->null_value &&
1427  class_decimal_cmp(cvalue, mvalue) > 0) ;
1428  return (maxmin->null_value && !cache->null_value) ||
1429  (!cache->null_value && !maxmin->null_value &&
1430  class_decimal_cmp(cvalue,mvalue) < 0);
1431 }
1432 
1433 bool select_max_min_finder_subselect::cmp_str()
1434 {
1435  String *val1, *val2, buf1, buf2;
1436  Item *maxmin= ((Item_singlerow_subselect *)item)->element_index(0);
1437  /*
1438  as far as both operand is Item_cache buf1 & buf2 will not be used,
1439  but added for safety
1440  */
1441  val1= cache->val_str(&buf1);
1442  val2= maxmin->val_str(&buf1);
1443  if (fmax)
1444  return (cache->null_value && !maxmin->null_value) ||
1445  (!cache->null_value && !maxmin->null_value &&
1446  sortcmp(val1, val2, cache->collation.collation) > 0) ;
1447  return (maxmin->null_value && !cache->null_value) ||
1448  (!cache->null_value && !maxmin->null_value &&
1449  sortcmp(val1, val2, cache->collation.collation) < 0);
1450 }
1451 
1452 bool select_exists_subselect::send_data(List<Item> &)
1453 {
1454  Item_exists_subselect *it= (Item_exists_subselect *)item;
1455  if (unit->offset_limit_cnt)
1456  { // Using limit offset,count
1457  unit->offset_limit_cnt--;
1458  return 0;
1459  }
1460  it->value= 1;
1461  it->assigned(1);
1462  return 0;
1463 }
1464 
1465 /*
1466  Don't free mem_root, as mem_root is freed in the end of dispatch_command
1467  (once for any command).
1468 */
1470 {
1471  /* Cleanup SQL processing state to reuse this statement in next query. */
1472  lex().end();
1474 }
1475 
1476 str_ref Session::copy_db_to() const
1477 {
1478  if (not impl_->schema->empty())
1479  return str_ref(mem.strdup(*impl_->schema), impl_->schema->size());
1480  my_message(ER_NO_DB_ERROR, ER(ER_NO_DB_ERROR), MYF(0));
1481  return str_ref();
1482 }
1483 
1484 /****************************************************************************
1485  Tmp_Table_Param
1486 ****************************************************************************/
1487 
1488 void Tmp_Table_Param::init()
1489 {
1490  field_count= sum_func_count= func_count= hidden_field_count= 0;
1491  group_parts= group_length= group_null_parts= 0;
1492  quick_group= 1;
1493  table_charset= 0;
1494  precomputed_group_by= 0;
1495 }
1496 
1497 void Tmp_Table_Param::cleanup()
1498 {
1499  /* Fix for Intel compiler */
1500  if (copy_field)
1501  {
1502  boost::checked_array_delete(copy_field);
1503  save_copy_field= save_copy_field_end= copy_field= copy_field_end= 0;
1504  }
1505 }
1506 
1507 void Session::send_kill_message() const
1508 {
1509  drizzled::error_t err= static_cast<drizzled::error_t>(killed_errno());
1510  if (err != EE_OK)
1511  my_message(err, ER(err), MYF(0));
1512 }
1513 
1514 void Session::set_schema(const std::string& new_db)
1515 {
1516  impl_->schema = boost::make_shared<std::string>(new_db);
1517 }
1518 
1519 
1527 {
1530 }
1531 
1532 void Session::disconnect(error_t errcode)
1533 {
1534  /* Allow any plugins to cleanup their session variables */
1535  plugin_sessionvar_cleanup(this);
1536 
1537  /* If necessary, log any aborted or unauthorized connections */
1538  if (getKilled() || client->wasAborted())
1539  {
1540  status_var.aborted_threads++;
1541  }
1542 
1543  if (client->wasAborted())
1544  {
1545  if (not getKilled() && variables.log_warnings > 1)
1546  {
1547  errmsg_printf(error::WARN, ER(ER_NEW_ABORTING_CONNECTION)
1548  , thread_id
1549  , (impl_->schema->empty() ? "unconnected" : impl_->schema->c_str())
1550  , security_ctx->username().empty() ? "unauthenticated" : security_ctx->username().c_str()
1551  , security_ctx->address().c_str()
1552  , (main_da().is_error() ? main_da().message() : ER(ER_UNKNOWN_ERROR)));
1553  }
1554  }
1555 
1556  setKilled(Session::KILL_CONNECTION);
1557 
1558  if (client->isConnected())
1559  {
1560  if (errcode != EE_OK)
1561  {
1562  /*my_error(errcode, ER(errcode));*/
1563  client->sendError(errcode, ER(errcode));
1564  }
1565  client->close();
1566  }
1567 }
1568 
1570 {
1571  free_list= 0;
1572  select_number= 1;
1573 
1574  is_fatal_error= false;
1575  server_status&= ~ (SERVER_MORE_RESULTS_EXISTS |
1576  SERVER_QUERY_NO_INDEX_USED |
1577  SERVER_QUERY_NO_GOOD_INDEX_USED);
1578 
1579  clear_error();
1580  main_da().reset_diagnostics_area();
1581  total_warn_count=0; // Warnings for this query
1583 }
1584 
1585 /*
1586  Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
1587 */
1588 
1589 void Open_tables_state::close_temporary_tables()
1590 {
1591  Table *table;
1592  Table *tmp_next;
1593 
1594  if (not temporary_tables)
1595  return;
1596 
1597  for (table= temporary_tables; table; table= tmp_next)
1598  {
1599  tmp_next= table->getNext();
1600  nukeTable(table);
1601  }
1602  temporary_tables= NULL;
1603 }
1604 
1605 /*
1606  unlink from session->temporary tables and close temporary table
1607 */
1608 
1609 void Open_tables_state::close_temporary_table(Table *table)
1610 {
1611  if (table->getPrev())
1612  {
1613  table->getPrev()->setNext(table->getNext());
1614  if (table->getPrev()->getNext())
1615  {
1616  table->getNext()->setPrev(table->getPrev());
1617  }
1618  }
1619  else
1620  {
1621  /* removing the item from the list */
1622  assert(table == temporary_tables);
1623  /*
1624  slave must reset its temporary list pointer to zero to exclude
1625  passing non-zero value to end_slave via rli->save_temporary_tables
1626  when no temp tables opened, see an invariant below.
1627  */
1628  temporary_tables= table->getNext();
1629  if (temporary_tables)
1630  {
1631  table->getNext()->setPrev(NULL);
1632  }
1633  }
1634  nukeTable(table);
1635 }
1636 
1637 /*
1638  Close and drop a temporary table
1639 
1640  NOTE
1641  This dosn't unlink table from session->temporary
1642  If this is needed, use close_temporary_table()
1643 */
1644 
1645 void Open_tables_state::nukeTable(Table *table)
1646 {
1647  plugin::StorageEngine& table_type= *table->getShare()->db_type();
1648  table->free_io_cache();
1649  table->delete_table();
1650  rm_temporary_table(table_type, identifier::Table(table->getShare()->getSchemaName(), table->getShare()->getTableName(), table->getShare()->getPath()));
1651  boost::checked_delete(table->getMutableShare());
1652  boost::checked_delete(table);
1653 }
1654 
1656 extern time_t flush_status_time;
1657 
1658 void Session::refresh_status()
1659 {
1660  /* Reset thread's status variables */
1661  memset(&status_var, 0, sizeof(status_var));
1662 
1663  flush_status_time= time((time_t*) 0);
1664  current_global_counters.max_used_connections= 1; /* We set it to one, because we know we exist */
1665  current_global_counters.connections= 0;
1666 }
1667 
1668 user_var_entry* Session::getVariable(str_ref name0, bool create_if_not_exists)
1669 {
1670  if (cleanup_done)
1671  return NULL;
1672 
1673  string name(name0.data(), name0.size());
1674  if (UserVars::mapped_type* iter= find_ptr(user_vars, name))
1675  return *iter;
1676 
1677  if (not create_if_not_exists)
1678  return NULL;
1679 
1680  user_var_entry *entry= new user_var_entry(name.c_str(), query_id);
1681 
1682  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
1683 
1684  if (not returnable.second)
1685  {
1686  boost::checked_delete(entry);
1687  }
1688 
1689  return entry;
1690 }
1691 
1692 void Session::setVariable(const std::string &name, const std::string &value)
1693 {
1694  if (user_var_entry* var= getVariable(name, true))
1695  {
1696  var->update_hash(false, value, STRING_RESULT, &my_charset_bin, DERIVATION_IMPLICIT, false);
1697  }
1698 }
1699 
1701 {
1702  for (Table *table= temporary_tables ; table ; table= table->getNext())
1703  {
1704  if (table->query_id == session_.getQueryId())
1705  {
1706  table->query_id= 0;
1707  table->cursor->ha_reset();
1708  }
1709  }
1710 }
1711 
1712 /*
1713  Unlocks tables and frees derived tables.
1714  Put all normal tables used by thread in free list.
1715 
1716  It will only close/mark as free for reuse tables opened by this
1717  substatement, it will also check if we are closing tables after
1718  execution of complete query (i.e. we are on upper level) and will
1719  leave prelocked mode if needed.
1720 */
1722 {
1723  open_tables.clearDerivedTables();
1724 
1725  /*
1726  Mark all temporary tables used by this statement as free for reuse.
1727  */
1728  open_tables.mark_temp_tables_as_free_for_reuse();
1729  /*
1730  Let us commit transaction for statement. Since in 5.0 we only have
1731  one statement transaction and don't allow several nested statement
1732  transactions this call will do nothing if we are inside of stored
1733  function or trigger (i.e. statement transaction is already active and
1734  does not belong to statement for which we do close_thread_tables()).
1735  TODO: This should be fixed in later releases.
1736  */
1737  {
1738  main_da().can_overwrite_status= true;
1740  main_da().can_overwrite_status= false;
1741  transaction.stmt.reset();
1742  }
1743 
1744  if (open_tables.lock)
1745  {
1746  unlockTables(open_tables.lock);
1747  open_tables.lock= 0;
1748  }
1749  /*
1750  Note that we need to hold table::Cache::mutex() while changing the
1751  open_tables list. Another thread may work on it.
1752  (See: table::Cache::removeTable(), wait_completed_table())
1753  Closing a MERGE child before the parent would be fatal if the
1754  other thread tries to abort the MERGE lock in between.
1755  */
1756  if (open_tables.open_tables_)
1757  open_tables.close_open_tables();
1758 }
1759 
1761 {
1762  /*
1763  If table list consists only from tables from prelocking set, table list
1764  for new attempt should be empty, so we have to update list's root pointer.
1765  */
1766  if (lex().first_not_own_table() == *tables)
1767  *tables= 0;
1768  lex().chop_off_not_own_tables();
1769  for (TableList *tmp= *tables; tmp; tmp= tmp->next_global)
1770  tmp->table= 0;
1772 }
1773 
1775 {
1776  uint32_t counter;
1777  bool need_reopen;
1778 
1779  for ( ; ; )
1780  {
1781  if (open_tables_from_list(&tables, &counter))
1782  return true;
1783 
1784  if (not lock_tables(tables, counter, &need_reopen))
1785  break;
1786 
1787  if (not need_reopen)
1788  return true;
1789 
1790  close_tables_for_reopen(&tables);
1791  }
1792 
1793  return handle_derived(&lex(), &derived_prepare) || handle_derived(&lex(), &derived_filling);
1794 }
1795 
1796 /*
1797  @note "best_effort" is used in cases were if a failure occurred on this
1798  operation it would not be surprising because we are only removing because there
1799  might be an issue (lame engines).
1800 */
1801 
1802 bool Open_tables_state::rm_temporary_table(const identifier::Table &identifier, bool best_effort)
1803 {
1804  if (plugin::StorageEngine::dropTable(session_, identifier))
1805  return false;
1806  if (not best_effort)
1807  errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"), identifier.getSQLPath().c_str(), errno);
1808  return true;
1809 }
1810 
1811 bool Open_tables_state::rm_temporary_table(plugin::StorageEngine& base, const identifier::Table &identifier)
1812 {
1813  drizzled::error_t error;
1814  if (plugin::StorageEngine::dropTable(session_, base, identifier, error))
1815  return false;
1816  errmsg_printf(error::WARN, _("Could not remove temporary table: '%s', error: %d"), identifier.getSQLPath().c_str(), error);
1817  return true;
1818 }
1819 
1820 table::Singular& Session::getInstanceTable()
1821 {
1822  impl_->temporary_shares.push_back(new table::Singular); // This will not go into the tableshare cache, so no key is used.
1823  return impl_->temporary_shares.back();
1824 }
1825 
1826 
1845 table::Singular& Session::getInstanceTable(std::list<CreateField>& field_list)
1846 {
1847  impl_->temporary_shares.push_back(new table::Singular(this, field_list)); // This will not go into the tableshare cache, so no key is used.
1848  return impl_->temporary_shares.back();
1849 }
1850 
1851 void Session::clear_error(bool full)
1852 {
1853  if (main_da().is_error())
1854  main_da().reset_diagnostics_area();
1855 
1856  if (full)
1857  drizzle_reset_errors(*this, true);
1858 }
1859 
1860 void Session::clearDiagnostics()
1861 {
1862  main_da().reset_diagnostics_area();
1863 }
1864 
1878 bool Session::is_error() const
1879 {
1880  return impl_->diagnostics.is_error();
1881 }
1882 
1884 void Session::my_ok(ha_rows affected_rows, ha_rows found_rows_arg, uint64_t passed_id, const char *message)
1885 {
1886  main_da().set_ok_status(this, affected_rows, found_rows_arg, passed_id, message);
1887 }
1888 
1892 {
1893  main_da().set_eof_status(this);
1894 }
1895 
1897 {
1898  return variables.storage_engine ? variables.storage_engine : global_system_variables.storage_engine;
1899 }
1900 
1901 enum_tx_isolation Session::getTxIsolation() const
1902 {
1903  return (enum_tx_isolation)variables.tx_isolation;
1904 }
1905 
1906 drizzled::util::Storable* Session::getProperty0(const std::string& arg)
1907 {
1908  return impl_->properties[arg];
1909 }
1910 
1911 void Session::setProperty0(const std::string& arg, drizzled::util::Storable* value)
1912 {
1913  // assert(not _properties.count(arg));
1914  impl_->properties[arg]= value;
1915 }
1916 
1917 plugin::EventObserverList* Session::getSchemaObservers(const std::string &db_name)
1918 {
1919  if (impl_c::schema_event_observers_t::mapped_type* i= find_ptr(impl_->schema_event_observers, db_name))
1920  return *i;
1921  return NULL;
1922 }
1923 
1924 plugin::EventObserverList* Session::setSchemaObservers(const std::string &db_name, plugin::EventObserverList* observers)
1925 {
1926  impl_->schema_event_observers.erase(db_name);
1927  if (observers)
1928  impl_->schema_event_observers[db_name] = observers;
1929  return observers;
1930 }
1931 
1932 util::string::ptr Session::schema() const
1933 {
1934  return impl_->schema;
1935 }
1936 
1937 void Session::resetQueryString()
1938 {
1939  query.reset();
1940  impl_->state.reset();
1941 }
1942 
1943 const boost::shared_ptr<session::State>& Session::state()
1944 {
1945  return impl_->state;
1946 }
1947 
1948 const std::string& display::type(drizzled::Session::global_read_lock_t type)
1949 {
1950  static const std::string NONE= "NONE";
1951  static const std::string GOT_GLOBAL_READ_LOCK= "HAS GLOBAL READ LOCK";
1952  static const std::string MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT= "HAS GLOBAL READ LOCK WITH BLOCKING COMMIT";
1953 
1954  switch (type)
1955  {
1956  default:
1957  case Session::NONE:
1958  return NONE;
1959  case Session::GOT_GLOBAL_READ_LOCK:
1960  return GOT_GLOBAL_READ_LOCK;
1961  case Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT:
1962  return MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT;
1963  }
1964 }
1965 
1966 size_t display::max_string_length(drizzled::Session::global_read_lock_t)
1967 {
1968  return display::type(Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT).size();
1969 }
1970 
1971 } /* namespace drizzled */