summaryrefslogtreecommitdiff
path: root/storage/maria/ha_s3.cc
blob: 5d37a681e9b1fcdd5a22b821cfdcc082ac42f8bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
/* Copyright (C) 2019 MariaDB Corppration AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the
   Free Software Foundation, Inc.
   51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
*/

/*
  Implementation of S3 storage engine.

  Storage format:

  The S3 engine is read only storage engine. The data is stored in
  same format as a non transactional Aria table in BLOCK_RECORD format.
  This makes it easy to cache both index and rows in the page cache.
  Data and index file are split into blocks of 's3_block_size', default
  4M.

  The table and it's associated files are stored in S3 into the following
  locations:

  frm file (for discovery):
  aws_bucket/database/table/frm

  First index block (contains description if the Aria file):
  aws_bucket/database/table/aria

  Rest of the index file:
  aws_bucket/database/table/index/block_number

  Data file:
  aws_bucket/database/table/data/block_number

  block_number is 6 digits decimal number, prefixed with 0
  (Can be larger than 6 numbers, the prefix is just for nice output)

  frm and base blocks are small (just the needed data).
  index and blocks are of size 's3_block_size'

  If compression is used, then original block size is s3_block_size
  but the stored block will be the size of the compressed block.

  Implementation:
  The s3 engine inherits from the ha_maria handler

  s3 will use it's own page cache to not interfere with normal Aria
  usage but also to ensure that the S3 page cache is large enough
  (with a 4M s3_block_size the engine will need a large cache to work,
  at least s3_block_size * 32. The default cache is 512M.
*/

#define MYSQL_SERVER 1
#include "maria_def.h"
#include "sql_class.h"
#include <mysys_err.h>
#include <libmarias3/marias3.h>
#include <discover.h>
#include "ha_s3.h"
#include "s3_func.h"
#include "aria_backup.h"

#define DEFAULT_AWS_HOST_NAME "s3.amazonaws.com"

static PAGECACHE s3_pagecache;
static ulong s3_block_size, s3_protocol_version;
static ulong s3_pagecache_division_limit, s3_pagecache_age_threshold;
static ulong s3_pagecache_file_hash_size;
static ulonglong s3_pagecache_buffer_size;
static char *s3_bucket, *s3_access_key=0, *s3_secret_key=0, *s3_region;
static char *s3_host_name;
static char *s3_tmp_access_key=0, *s3_tmp_secret_key=0;
static my_bool s3_debug= 0;
handlerton *s3_hton= 0;

/* Don't show access or secret keys to users if they exists */

static void update_access_key(MYSQL_THD thd,
                              struct st_mysql_sys_var *var,
                              void *var_ptr, const void *save)
{
  my_free(s3_access_key);
  s3_access_key= 0;
  /* Don't show real key to user in SHOW VARIABLES */
  if (s3_tmp_access_key[0])
  {
    s3_access_key= s3_tmp_access_key;
    s3_tmp_access_key= my_strdup("*****", MYF(MY_WME));
  }
}

static void update_secret_key(MYSQL_THD thd,
                              struct st_mysql_sys_var *var,
                              void *var_ptr, const void *save)
{
  my_free(s3_secret_key);
  s3_secret_key= 0;
  /* Don't show real key to user in SHOW VARIABLES */
  if (s3_tmp_secret_key[0])
  {
    s3_secret_key= s3_tmp_secret_key;
    s3_tmp_secret_key= my_strdup("*****", MYF(MY_WME));
  }
}

/* Define system variables for S3 */

static MYSQL_SYSVAR_ULONG(block_size, s3_block_size,
       PLUGIN_VAR_RQCMDARG,
       "Block size for S3", 0, 0,
       4*1024*1024, 65536, 16*1024*1024, 8192);

static MYSQL_SYSVAR_BOOL(debug, s3_debug,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
      "Generates trace file from libmarias3 on stderr for debugging",
       0, 0, 0);

static MYSQL_SYSVAR_ENUM(protocol_version, s3_protocol_version,
                         PLUGIN_VAR_RQCMDARG,
                         "Protocol used to communication with S3. One of "
                         "\"Auto\", \"Amazon\" or \"Original\".",
                         NULL, NULL, 0, &s3_protocol_typelib);

static MYSQL_SYSVAR_ULONG(pagecache_age_threshold,
       s3_pagecache_age_threshold, PLUGIN_VAR_RQCMDARG,
       "This characterizes the number of hits a hot block has to be untouched "
       "until it is considered aged enough to be downgraded to a warm block. "
       "This specifies the percentage ratio of that number of hits to the "
       "total number of blocks in the page cache.", 0, 0,
       300, 100, ~ (ulong) 0L, 100);

static MYSQL_SYSVAR_ULONGLONG(pagecache_buffer_size, s3_pagecache_buffer_size,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
       "The size of the buffer used for index blocks for S3 tables. "
       "Increase this to get better index handling (for all reads and "
       "multiple writes) to as much as you can afford.", 0, 0,
        128*1024*1024, 1024*1024*32, ~(ulonglong) 0, 8192);

static MYSQL_SYSVAR_ULONG(pagecache_division_limit,
                          s3_pagecache_division_limit,
       PLUGIN_VAR_RQCMDARG,
       "The minimum percentage of warm blocks in key cache", 0, 0,
       100,  1, 100, 1);

static MYSQL_SYSVAR_ULONG(pagecache_file_hash_size,
                          s3_pagecache_file_hash_size,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
       "Number of hash buckets for open files.  If you have a lot "
       "of S3 files open you should increase this for faster flush of "
       "changes. A good value is probably 1/10 of number of possible open "
       "S3 files.", 0,0, 512, 32, 16384, 1);

static MYSQL_SYSVAR_STR(bucket, s3_bucket,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
      "AWS bucket",
       0, 0, "MariaDB");
static MYSQL_SYSVAR_STR(host_name, s3_host_name,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
      "AWS bucket",
       0, 0, DEFAULT_AWS_HOST_NAME);
static MYSQL_SYSVAR_STR(access_key, s3_tmp_access_key,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC,
      "AWS access key",
       0, update_access_key, "");
static MYSQL_SYSVAR_STR(secret_key, s3_tmp_secret_key,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC,
      "AWS secret key",
       0, update_secret_key, "");
static MYSQL_SYSVAR_STR(region, s3_region,
       PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
      "AWS region",
       0, 0, "");

ha_create_table_option s3_table_option_list[]=
{
  /*
    one numeric option, with the default of UINT_MAX32, valid
    range of values 0..UINT_MAX32, and a "block size" of 10
    (any value must be divisible by 10).
  */
  HA_TOPTION_SYSVAR("s3_block_size", s3_block_size, block_size),
  HA_TOPTION_ENUM("compression_algorithm", compression_algorithm, "none,zlib",
                  0),
  HA_TOPTION_END
};


/*****************************************************************************
 S3 handler code
******************************************************************************/

/**
   Create S3 handler
*/


ha_s3::ha_s3(handlerton *hton, TABLE_SHARE *table_arg)
  :ha_maria(hton, table_arg), in_alter_table(0)
{
  /* Remove things that S3 doesn't support */
  int_table_flags&= ~(HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
                      HA_CAN_EXPORT);
  can_enable_indexes= 0;
}


/**
   Remember the handler to use for s3_block_read()

   @note
   In the future the ms3_st objects could be stored in
   a list in share. In this case we would however need a mutex
   to access the next free one. By using st_my_thread_var we
   can avoid the mutex with the small cost of having to call
   register handler in all handler functions that will access
   the page cache
*/

void ha_s3::register_handler(MARIA_HA *file)
{
  struct st_my_thread_var *thread= my_thread_var;
  thread->keycache_file= (void*) file;
}


/**
   Write a row

   When generating the table as part of ALTER TABLE, writes are allowed.
   When table is moved to S3, writes are not allowed.
*/

int ha_s3::write_row(const uchar *buf)
{
  if (in_alter_table)
    return ha_maria::write_row(buf);
  return HA_ERR_WRONG_COMMAND;
}

/* Return true if S3 can be used */

static my_bool s3_usable()
{
  return (s3_access_key != 0 && s3_secret_key != 0 && s3_region != 0 &&
          s3_bucket != 0);
}


static my_bool s3_info_init(S3_INFO *info)
{
  if (!s3_usable())
    return 1;
  info->protocol_version= (uint8_t) s3_protocol_version;
  lex_string_set(&info->host_name,  s3_host_name);
  lex_string_set(&info->access_key, s3_access_key);
  lex_string_set(&info->secret_key, s3_secret_key);
  lex_string_set(&info->region,     s3_region);
  lex_string_set(&info->bucket,     s3_bucket);
  return 0;
}

/**
   Fill information in S3_INFO including paths to table and database

   Notes:
     Database and table name are set even if s3 variables are not
     initialized. This is needed by s3::drop_table
*/

static my_bool s3_info_init(S3_INFO *s3_info, const char *path,
                            char *database_buff, size_t database_length)
{
  set_database_and_table_from_path(s3_info, path);
  /* Fix database as it's not \0 terminated */
  strmake(database_buff, s3_info->database.str,
          MY_MIN(database_length, s3_info->database.length));
  s3_info->database.str= database_buff;
  return s3_info_init(s3_info);
}


/**
  Drop S3 table
*/

int ha_s3::delete_table(const char *name)
{
  ms3_st *s3_client;
  S3_INFO s3_info;
  int error;
  char database[NAME_LEN+1];
  DBUG_ENTER("ha_s3::delete_table");

  error= s3_info_init(&s3_info, name, database, sizeof(database)-1);

  /* If internal on disk temporary table, let Aria take care of it */
  if (!strncmp(s3_info.table.str, "#sql-", 5))
    DBUG_RETURN(ha_maria::delete_table(name));

  if (error)
    DBUG_RETURN(HA_ERR_UNSUPPORTED);

  if (!(s3_client= s3_open_connection(&s3_info)))
    DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
  error= aria_delete_from_s3(s3_client, s3_info.bucket.str,
                             s3_info.database.str,
                             s3_info.table.str,0);
  ms3_deinit(s3_client);
  DBUG_RETURN(error);
}

/**
   Copy an Aria table to S3 or rename a table in S3

   The copy happens as part of the rename in ALTER TABLE when all data
   is in an Aria table and we now have to copy it to S3.

   If the table is an old table already in S3, we should just rename it.
*/

int ha_s3::rename_table(const char *from, const char *to)
{
  S3_INFO to_s3_info, from_s3_info;
  char to_name[FN_REFLEN], from_name[FN_REFLEN], frm_name[FN_REFLEN];
  ms3_st *s3_client;
  MY_STAT stat_info;
  int error;
  bool is_partition= (strstr(from, "#P#") != NULL) ||
                     (strstr(to, "#P#") != NULL);
  DBUG_ENTER("ha_s3::rename_table");

  if (s3_info_init(&to_s3_info, to, to_name, NAME_LEN))
    DBUG_RETURN(HA_ERR_UNSUPPORTED);
  if (!(s3_client= s3_open_connection(&to_s3_info)))
    DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);

  /*
    Check if this is a on disk table created by ALTER TABLE that should be
    copied to S3. We know this is the case if the table is a temporary table
    and the .MAI file for the table is on disk
  */
  fn_format(frm_name, from, "", reg_ext, MYF(0));
  if (!strncmp(from + dirname_length(from), "#sql-", 5) &&
      (is_partition || my_stat(frm_name, &stat_info, MYF(0))))
  {
    /*
      The table is a temporary table as part of ALTER TABLE.
      Copy the on disk temporary Aria table to S3.
    */
    error= aria_copy_to_s3(s3_client, to_s3_info.bucket.str, from,
                           to_s3_info.database.str,
                           to_s3_info.table.str,
                           0, 0, 0, 0, !is_partition);
    if (!error)
    {
      /* Remove original files table files, keep .frm */
      fn_format(from_name, from, "", MARIA_NAME_DEXT,
                MY_APPEND_EXT|MY_UNPACK_FILENAME);
      my_delete(from_name, MYF(MY_WME | ME_WARNING));
      fn_format(from_name, from, "", MARIA_NAME_IEXT,
                MY_APPEND_EXT|MY_UNPACK_FILENAME);
      my_delete(from_name, MYF(MY_WME | ME_WARNING));
    }
  }
  else
  {
    /* The table is an internal S3 table. Do the renames */
    s3_info_init(&from_s3_info, from, from_name, NAME_LEN);

    error= aria_rename_s3(s3_client, to_s3_info.bucket.str,
                          from_s3_info.database.str,
                          from_s3_info.table.str,
                          to_s3_info.database.str,
                          to_s3_info.table.str,
                          !is_partition &&
                          !current_thd->lex->alter_info.partition_flags);
  }
  ms3_deinit(s3_client);
  DBUG_RETURN(error);
}


/**
   Create a s3 table.

   @notes
   One can only create an s3 table as part of ALTER TABLE
   The table is created as a non transactional Aria table with
   BLOCK_RECORD format
*/

int ha_s3::create(const char *name, TABLE *table_arg,
                  HA_CREATE_INFO *ha_create_info)
{
  uchar *frm_ptr;
  size_t frm_len;
  int error;
  TABLE_SHARE *share= table_arg->s;
  DBUG_ENTER("ha_s3::create");

  if (!(ha_create_info->options & HA_CREATE_TMP_ALTER) ||
      ha_create_info->tmp_table())
    DBUG_RETURN(HA_ERR_WRONG_COMMAND);

  if (share->table_type == TABLE_TYPE_SEQUENCE)
    DBUG_RETURN(HA_ERR_UNSUPPORTED);

  if (ha_create_info->tmp_table())
    DBUG_RETURN(HA_ERR_UNSUPPORTED);

  if (!s3_usable())
    DBUG_RETURN(HA_ERR_UNSUPPORTED);

  /* Force the table to a format suitable for S3 */
  ha_create_info->row_type= ROW_TYPE_PAGE;
  ha_create_info->transactional= HA_CHOICE_NO;
  error= ha_maria::create(name, table_arg, ha_create_info);
  if (error)
    DBUG_RETURN(error);

  /* Create the .frm file. Needed for ha_s3::rename_table() later  */
  if (!table_arg->s->read_frm_image((const uchar**) &frm_ptr, &frm_len))
  {
    table_arg->s->write_frm_image(frm_ptr, frm_len);
    table_arg->s->free_frm_image(frm_ptr);
  }

  DBUG_RETURN(0);
}

/**
   Open table


   @notes
   Table is read only, except if opened by ALTER as in this case we
   are creating the S3 table.
*/

int ha_s3::open(const char *name, int mode, uint open_flags)
{
  int res;
  S3_INFO s3_info;
  DBUG_ENTER("ha_s3:open");

  if (!s3_usable())
    DBUG_RETURN(HA_ERR_UNSUPPORTED);

  if (mode != O_RDONLY && !(open_flags & HA_OPEN_FOR_CREATE))
    DBUG_RETURN(EACCES);

  open_args= 0;
  if (!(open_flags & HA_OPEN_FOR_CREATE))
  {
    (void) s3_info_init(&s3_info);
    s3_info.tabledef_version= table->s->tabledef_version;
    
    /* Pass the above arguments to maria_open() */
    open_args= &s3_info;
  }

  if (!(res= ha_maria::open(name, mode, open_flags)))
  {
    if ((open_flags & HA_OPEN_FOR_CREATE))
      in_alter_table= 1;
    else
    {
      /*
        We have to modify the pagecache callbacks for the data file,
        index file and for bitmap handling
      */
      file->s->pagecache= &s3_pagecache;
      file->dfile.big_block_size= file->s->kfile.big_block_size=
        file->s->bitmap.file.big_block_size= file->s->base.s3_block_size;
      file->s->kfile.head_blocks= file->s->base.keystart / file->s->block_size;
    }
  }
  open_args= 0;
  DBUG_RETURN(res);
}


/******************************************************************************
 Storage engine handler definitions
******************************************************************************/

/**
   Free all resources for s3
*/

static handler *s3_create_handler(handlerton *hton,
                                  TABLE_SHARE * table,
                                  MEM_ROOT *mem_root)
{
  return new (mem_root) ha_s3(hton, table);
}


static int s3_hton_panic(handlerton *hton, ha_panic_function flag)
{
  if (flag == HA_PANIC_CLOSE && s3_hton)
  {
    end_pagecache(&s3_pagecache, TRUE);
    s3_deinit_library();
    my_free(s3_access_key);
    my_free(s3_secret_key);
    s3_access_key= s3_secret_key= 0;
    s3_hton= 0;
  }
  return 0;
}


/**
  Check if a table is in S3 as part of discovery
*/

static int s3_discover_table(handlerton *hton, THD* thd, TABLE_SHARE *share)
{
  S3_INFO s3_info;
  S3_BLOCK block;
  ms3_st *s3_client;
  int error;
  DBUG_ENTER("s3_discover_table");

  if (s3_info_init(&s3_info))
    DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
  if (!(s3_client= s3_open_connection(&s3_info)))
    DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);

  s3_info.database=   share->db;
  s3_info.table=      share->table_name;

  if (s3_get_frm(s3_client, &s3_info, &block))
  {
    s3_free(&block);
    ms3_deinit(s3_client);
    DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
  }
  error= share->init_from_binary_frm_image(thd, 1,
                                           block.str, block.length);
  s3_free(&block);
  ms3_deinit(s3_client);
  DBUG_RETURN((my_errno= error));
}


/**
  Check if a table exists

   @return 0 frm doesn't exists
   @return 1 frm exists
*/

static int s3_discover_table_existance(handlerton *hton, const char *db,
                                       const char *table_name)
{
  S3_INFO s3_info;
  ms3_st *s3_client;
  int res;
  DBUG_ENTER("s3_discover_table_existance");

  /* Ignore names in "mysql" database to speed up boot */
  if (!strcmp(db, MYSQL_SCHEMA_NAME.str))
    DBUG_RETURN(0);

  if (s3_info_init(&s3_info))
    DBUG_RETURN(0);
  if (!(s3_client= s3_open_connection(&s3_info)))
    DBUG_RETURN(0);

  s3_info.database.str=    db;
  s3_info.database.length= strlen(db);
  s3_info.table.str=       table_name;
  s3_info.table.length=    strlen(table_name);

  res= s3_frm_exists(s3_client, &s3_info);
  ms3_deinit(s3_client);
  DBUG_RETURN(res == 0);                        // Return 1 if exists
}


/**
  Return a list of all S3 tables in a database
*/

static int s3_discover_table_names(handlerton *hton __attribute__((unused)),
                                   LEX_CSTRING *db,
                                   MY_DIR *dir __attribute__((unused)),
                                   handlerton::discovered_list *result)
{
  char aws_path[AWS_PATH_LENGTH];
  S3_INFO s3_info;
  ms3_st *s3_client;
  ms3_list_st *list, *org_list= 0;
  int error;
  DBUG_ENTER("s3_discover_table_names");

  /* Ignore names in "mysql" database to speed up boot */
  if (!strcmp(db->str, MYSQL_SCHEMA_NAME.str))
    DBUG_RETURN(0);

  if (s3_info_init(&s3_info))
    DBUG_RETURN(0);
  if (!(s3_client= s3_open_connection(&s3_info)))
    DBUG_RETURN(0);

  strxnmov(aws_path, sizeof(aws_path)-1, db->str, "/", NullS);

  if ((error= ms3_list_dir(s3_client, s3_info.bucket.str, aws_path, &org_list)))
    goto end;
  
  for (list= org_list ; list ; list= list->next)
  {
    const char *name= list->key + db->length + 1;   // Skip database and /
    size_t name_length= strlen(name)-1;             // Remove end /
    result->add_table(name, name_length);
  }
  if (org_list)
    ms3_list_free(org_list);
end:
  ms3_deinit(s3_client);
  DBUG_RETURN(0);
}

/**
  Update the .frm file in S3
*/

static int s3_notify_tabledef_changed(handlerton *hton __attribute__((unused)),
                                      LEX_CSTRING *db, LEX_CSTRING *table,
                                      LEX_CUSTRING *frm,
                                      LEX_CUSTRING *org_tabledef_version)
{
  char aws_path[AWS_PATH_LENGTH];
  S3_INFO s3_info;
  ms3_st *s3_client;
  int error= 0;
  DBUG_ENTER("s3_notify_tabledef_changed");

  if (s3_info_init(&s3_info))
    DBUG_RETURN(0);
  if (!(s3_client= s3_open_connection(&s3_info)))
    DBUG_RETURN(0);

  s3_info.database=    *db;
  s3_info.table=       *table;
  s3_info.tabledef_version= *org_tabledef_version;
  if (s3_check_frm_version(s3_client, &s3_info))
  {
    error= 1;
    goto err;
  }
    
  strxnmov(aws_path, sizeof(aws_path)-1, db->str, "/", table->str, "/frm",
           NullS);

  if (s3_put_object(s3_client, s3_info.bucket.str, aws_path, (uchar*) frm->str,
                         frm->length, 0))
    error= 2;

err:
  ms3_deinit(s3_client);
  DBUG_RETURN(error);
}
  

static int ha_s3_init(void *p)
{
  bool res;
  static const char *no_exts[]= { 0 };

  /* This can happen if Aria fails to start */
  if (!maria_hton)
    return HA_ERR_INITIALIZATION;

  s3_hton= (handlerton *)p;

  /* Use Aria engine as a base */
  memcpy(s3_hton, maria_hton, sizeof(*s3_hton));
  s3_hton->db_type= DB_TYPE_S3;
  s3_hton->create= s3_create_handler;
  s3_hton->panic=  s3_hton_panic;
  s3_hton->table_options= s3_table_option_list;
  s3_hton->discover_table= s3_discover_table;
  s3_hton->discover_table_names= s3_discover_table_names;
  s3_hton->discover_table_existence= s3_discover_table_existance;
  s3_hton->notify_tabledef_changed= s3_notify_tabledef_changed;
  s3_hton->tablefile_extensions= no_exts;
  s3_hton->commit= 0;
  s3_hton->rollback= 0;
  s3_hton->checkpoint_state= 0;
  s3_hton->flush_logs= 0;
  s3_hton->show_status= 0;
  s3_hton->prepare_for_backup= 0;
  s3_hton->end_backup= 0;
  s3_hton->flags= 0;
  /* Copy global arguments to s3_access_key and s3_secret_key */
  update_access_key(0,0,0,0);
  update_secret_key(0,0,0,0);

  if ((res= !init_pagecache(&s3_pagecache,
                            (size_t) s3_pagecache_buffer_size,
                            s3_pagecache_division_limit,
                            s3_pagecache_age_threshold, maria_block_size,
                            s3_pagecache_file_hash_size, 0)))
    s3_hton= 0;
  s3_pagecache.big_block_read= s3_block_read;
  s3_pagecache.big_block_free= s3_free;
  s3_init_library();
  if (s3_debug)
    ms3_debug();
  return res ? HA_ERR_INITIALIZATION : 0;
}

static SHOW_VAR status_variables[]= {
  {"pagecache_blocks_not_flushed",
   (char*) &s3_pagecache.global_blocks_changed, SHOW_LONG},
  {"pagecache_blocks_unused",
   (char*) &s3_pagecache.blocks_unused, SHOW_LONG},
  {"pagecache_blocks_used",
   (char*) &s3_pagecache.blocks_used, SHOW_LONG},
  {"pagecache_read_requests",
   (char*) &s3_pagecache.global_cache_r_requests, SHOW_LONGLONG},
  {"pagecache_reads",
   (char*) &s3_pagecache.global_cache_read, SHOW_LONGLONG},
  {NullS, NullS, SHOW_LONG}
};


static struct st_mysql_sys_var* system_variables[]= {
  MYSQL_SYSVAR(block_size),
  MYSQL_SYSVAR(debug),
  MYSQL_SYSVAR(protocol_version),
  MYSQL_SYSVAR(pagecache_age_threshold),
  MYSQL_SYSVAR(pagecache_buffer_size),
  MYSQL_SYSVAR(pagecache_division_limit),
  MYSQL_SYSVAR(pagecache_file_hash_size),
  MYSQL_SYSVAR(host_name),
  MYSQL_SYSVAR(bucket),
  MYSQL_SYSVAR(access_key),
  MYSQL_SYSVAR(secret_key),
  MYSQL_SYSVAR(region),

  NULL
};

struct st_mysql_storage_engine s3_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };

maria_declare_plugin(s3)
{
  MYSQL_STORAGE_ENGINE_PLUGIN,
  &s3_storage_engine,
  "S3",
  "MariaDB Corporation Ab",
  "Read only table stored in S3. Created by running "
  "ALTER TABLE table_name ENGINE=s3",
  PLUGIN_LICENSE_GPL,
  ha_s3_init,                   /* Plugin Init      */
  NULL,                         /* Plugin Deinit    */
  0x0100,                       /* 1.0              */
  status_variables,             /* status variables */
  system_variables,             /* system variables */
  "1.0",                        /* string version   */
  MariaDB_PLUGIN_MATURITY_ALPHA /* maturity         */
}
maria_declare_plugin_end;