summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2012-05-07 15:57:31 -0700
committerSage Weil <sage@newdream.net>2012-05-07 15:57:31 -0700
commitac903210d0cd80e6a1205d178483a16a6ae4f014 (patch)
treeacec2ad224cdc95ba969083e992f6df671f6b94e
parentefc0701cf97f6a936c8f253b5449216f309fe4a3 (diff)
parent6c2c883c175421d8c492c64154ed3e26bbdd6595 (diff)
downloadceph-ac903210d0cd80e6a1205d178483a16a6ae4f014.tar.gz
Merge branch 'wip-rgw-bench'
Conflicts: debian/rules
-rw-r--r--.gitmodules3
-rw-r--r--configure.ac18
-rw-r--r--debian/control16
-rw-r--r--debian/rest-bench.install1
-rwxr-xr-xdebian/rules3
-rw-r--r--src/Makefile.am49
-rw-r--r--src/common/obj_bencher.cc555
-rw-r--r--src/common/obj_bencher.h72
m---------src/libs30
-rw-r--r--src/osdc/rados_bencher.h571
-rw-r--r--src/rados.cc64
-rw-r--r--src/tools/rest_bench.cc644
12 files changed, 1415 insertions, 581 deletions
diff --git a/.gitmodules b/.gitmodules
index 2bfe0fa6852..c5a0932b7af 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,6 @@
[submodule "src/leveldb"]
path = src/leveldb
url = git://github.com/ceph/leveldb.git
+[submodule "src/libs3"]
+ path = src/libs3
+ url = git://github.com/ceph/libs3.git
diff --git a/configure.ac b/configure.ac
index b6ff4e3faa5..f7b0b9304d6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -308,6 +308,24 @@ AS_IF([test "x$with_system_leveldb" = xcheck],
[AC_CHECK_LIB([leveldb], [leveldb_open], [with_system_leveldb=yes], [], [-lsnappy -lpthread])])
AM_CONDITIONAL(WITH_SYSTEM_LEVELDB, [ test "$with_system_leveldb" = "yes" ])
+# use system libs3?
+AC_ARG_WITH([system-libs3],
+ [AS_HELP_STRING([--with-system-libs3], [use system libs3])],
+ ,
+ [with_system_libs3=no])
+AS_IF([test "x$with_system_libs3" = xyes],
+ [AC_CHECK_LIB([s3], [S3_initialize], [true], [AC_MSG_FAILURE([libs3 not found])], [-lpthread])])
+AS_IF([test "x$with_system_libs3" = xcheck],
+ [AC_SEARCH_LIBS([S3_initialize], [s3], [with_system_libs3=yes], [true], [-lpthread])])
+AM_CONDITIONAL(WITH_SYSTEM_LIBS3, [ test "$with_system_libs3" = "yes" ])
+
+# rest-bench?
+AC_ARG_WITH([rest-bench],
+ [AS_HELP_STRING([--with-rest-bench], [enables rest-bench])],
+ [],
+ [with_rest_bench=no])
+AM_CONDITIONAL(WITH_REST_BENCH, [ test "$with_rest_bench" = "yes" ])
+
# use libaio?
AC_ARG_WITH([libaio],
[AS_HELP_STRING([--without-libaio], [disable libaio use by journal])],
diff --git a/debian/control b/debian/control
index 0a251b01f06..b482caa8745 100644
--- a/debian/control
+++ b/debian/control
@@ -6,7 +6,7 @@ Vcs-Git: git://github.com/ceph/ceph.git
Vcs-Browser: https://github.com/ceph/ceph
Maintainer: Laszlo Boszormenyi (GCS) <gcs@debian.hu>
Uploaders: Sage Weil <sage@newdream.net>
-Build-Depends: debhelper (>= 6.0.7~), autotools-dev, autoconf, automake, libfuse-dev, libboost-dev (>= 1.34), libedit-dev, libcrypto++-dev, libtool, libexpat1-dev, libfcgi-dev, libatomic-ops-dev, libgoogle-perftools-dev [i386 amd64], pkg-config, libgtkmm-2.4-dev, python, libcurl4-gnutls-dev, libkeyutils-dev, uuid-dev, libaio-dev, python (>= 2.6.6-3~)
+Build-Depends: debhelper (>= 6.0.7~), autotools-dev, autoconf, automake, libfuse-dev, libboost-dev (>= 1.34), libedit-dev, libcrypto++-dev, libtool, libexpat1-dev, libfcgi-dev, libatomic-ops-dev, libgoogle-perftools-dev [i386 amd64], pkg-config, libgtkmm-2.4-dev, python, libcurl4-gnutls-dev, libkeyutils-dev, uuid-dev, libaio-dev, python (>= 2.6.6-3~), libxml2-dev
Standards-Version: 3.9.3
Package: ceph
@@ -271,6 +271,20 @@ Description: debugging symbols for radosgw
.
This package contains debugging symbols for radosgw.
+Package: rest-bench
+Architecture: linux-any
+Depends: ${shlibs:Depends}, ${misc:Depends}, ceph-common, xml2, curl
+Description: RESTful bencher that can be used to benchmark
+ radosgw performance.
+
+Package: rest-bench-dbg
+Architecture: linux-any
+Section: debug
+Priority: extra
+Depends: ${shlibs:Depends}, ${misc:Depends}, ceph-common, xml2, curl
+Description: RESTful bencher that can be used to benchmark
+ radosgw performance.
+
Package: obsync
Architecture: linux-any
Depends: ${misc:Depends}, python, python-boto, python-ceph, python-pyxattr, python-lxml
diff --git a/debian/rest-bench.install b/debian/rest-bench.install
new file mode 100644
index 00000000000..8535f20d5a4
--- /dev/null
+++ b/debian/rest-bench.install
@@ -0,0 +1 @@
+usr/bin/rest-bench
diff --git a/debian/rules b/debian/rules
index 5d02e96ac79..84f3daa2789 100755
--- a/debian/rules
+++ b/debian/rules
@@ -20,7 +20,7 @@ endif
export DEB_HOST_ARCH ?= $(shell dpkg-architecture -qDEB_HOST_ARCH)
-extraopts += --with-ocf
+extraopts += --with-ocf --with-rest-bench
ifeq ($(DEB_HOST_ARCH), armel)
# armel supports ARMv4t or above instructions sets.
@@ -116,6 +116,7 @@ binary-arch: build install
dh_strip -plibcephfs1 --dbg-package=libcephfs1-dbg
dh_strip -pradosgw --dbg-package=radosgw-dbg
dh_strip -pgceph --dbg-package=gceph-dbg
+ dh_strip -prest-bench --dbg-package=rest-bench-dbg
dh_compress
dh_fixperms
diff --git a/src/Makefile.am b/src/Makefile.am
index 4dd229dbb7e..00d22c90c2e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,6 +1,26 @@
AUTOMAKE_OPTIONS = gnu
-SUBDIRS = ocf
-DIST_SUBDIRS = gtest ocf leveldb
+SUBDIRS = ocf libs3
+DIST_SUBDIRS = gtest ocf leveldb libs3
+
+EXTRA_DIST = \
+ libs3/COPYING \
+ libs3/ChangeLog \
+ libs3/GNUmakefile \
+ libs3/GNUmakefile.mingw \
+ libs3/GNUmakefile.osx \
+ libs3/INSTALL \
+ libs3/LICENSE \
+ libs3/README \
+ libs3/TODO \
+ libs3/archlinux \
+ libs3/debian \
+ libs3/doxyfile \
+ libs3/inc \
+ libs3/libs3.spec \
+ libs3/mswin \
+ libs3/src \
+ libs3/test
+
CLEANFILES =
bin_PROGRAMS =
# like bin_PROGRAMS, but these targets are only built for debug builds
@@ -367,10 +387,27 @@ librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 \
-export-symbols-regex '^rbd_.*' $(PTHREAD_LIBS) $(EXTRALIBS)
lib_LTLIBRARIES += librbd.la
-rados_SOURCES = rados.cc rados_import.cc rados_export.cc rados_sync.cc
+rados_SOURCES = rados.cc rados_import.cc rados_export.cc rados_sync.cc common/obj_bencher.cc
rados_LDADD = libglobal.la librados.la $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
bin_PROGRAMS += rados
+if WITH_REST_BENCH
+
+rest_bench_SOURCES = tools/rest_bench.cc common/obj_bencher.cc
+rest_bench_LDADD = libglobal.la $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
+rest_bench_CXXFLAGS = ${AM_CXXFLAGS}
+bin_PROGRAMS += rest-bench
+
+if WITH_SYSTEM_LIBS3
+rest_bench_LDADD += -ls3
+else
+rest_bench_LDADD += libs3/build/lib/libs3.a -lcurl -lxml2
+rest_bench_CXXFLAGS += -I$(top_srcdir)/src/libs3/inc
+SUBDIRS += libs3
+endif
+
+endif
+
scratchtool_SOURCES = scratchtool.c
scratchtool_LDADD = librados.la $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
scratchtoolpp_SOURCES = scratchtoolpp.cc
@@ -867,7 +904,8 @@ EXTRALIBS += -lgcov
endif
# extra bits
-EXTRA_DIST = $(srcdir)/verify-mds-journal.sh $(srcdir)/vstart.sh $(srcdir)/stop.sh \
+EXTRA_DIST += \
+ $(srcdir)/verify-mds-journal.sh $(srcdir)/vstart.sh $(srcdir)/stop.sh \
ceph-run $(srcdir)/ceph_common.sh \
$(srcdir)/init-radosgw \
$(srcdir)/ceph-clsinfo $(srcdir)/make_version $(srcdir)/check_version \
@@ -1221,6 +1259,7 @@ noinst_HEADERS = \
common/environment.h\
common/likely.h\
common/lockdep.h\
+ common/obj_bencher.h\
common/snap_types.h\
common/Clock.h\
common/Cond.h\
@@ -1557,7 +1596,6 @@ noinst_HEADERS = \
osd/ReplicatedPG.h\
osd/Watch.h\
osd/osd_types.h\
- osdc/rados_bencher.h\
osdc/Blinker.h\
osdc/Filer.h\
osdc/Journaler.h\
@@ -1653,7 +1691,6 @@ if WITH_DEBUG
bin_PROGRAMS += $(bin_DEBUGPROGRAMS)
endif
-
project.tgz: clean
cov-configure -co /usr/bin/gcc
cov-configure -co /usr/bin/g++
diff --git a/src/common/obj_bencher.cc b/src/common/obj_bencher.cc
new file mode 100644
index 00000000000..05bfb72a457
--- /dev/null
+++ b/src/common/obj_bencher.cc
@@ -0,0 +1,555 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2009 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ * Series of functions to test your rados installation. Notice
+ * that this code is not terribly robust -- for instance, if you
+ * try and bench on a pool you don't have permission to access
+ * it will just loop forever.
+ */
+#include "common/Cond.h"
+#include "obj_bencher.h"
+
+#include <iostream>
+#include <fstream>
+
+#include <stdlib.h>
+#include <time.h>
+#include <sstream>
+
+
+const char *BENCH_DATA = "benchmark_write_data";
+
+static void generate_object_name(char *s, size_t size, int objnum, int pid = 0)
+{
+ char hostname[30];
+ gethostname(hostname, sizeof(hostname)-1);
+ hostname[sizeof(hostname)-1] = 0;
+ if (pid) {
+ snprintf(s, size, "%s_%d_object%d", hostname, pid, objnum);
+ } else {
+ snprintf(s, size, "%s_%d_object%d", hostname, getpid(), objnum);
+ }
+}
+
+static void sanitize_object_contents (bench_data *data, int length) {
+ memset(data->object_contents, 'z', length);
+}
+
+void *ObjBencher::status_printer(void *_bencher) {
+ ObjBencher *bencher = (ObjBencher *)_bencher;
+ bench_data& data = bencher->data;
+ Cond cond;
+ int i = 0;
+ int previous_writes = 0;
+ int cycleSinceChange = 0;
+ double avg_bandwidth;
+ double bandwidth;
+ utime_t ONE_SECOND;
+ ONE_SECOND.set_from_double(1.0);
+ bencher->lock.Lock();
+ while(!data.done) {
+ if (i % 20 == 0) {
+ if (i > 0)
+ cout << "min lat: " << data.min_latency
+ << " max lat: " << data.max_latency
+ << " avg lat: " << data.avg_latency << std::endl;
+ //I'm naughty and don't reset the fill
+ cout << setfill(' ')
+ << setw(5) << "sec"
+ << setw(8) << "Cur ops"
+ << setw(10) << "started"
+ << setw(10) << "finished"
+ << setw(10) << "avg MB/s"
+ << setw(10) << "cur MB/s"
+ << setw(10) << "last lat"
+ << setw(10) << "avg lat" << std::endl;
+ }
+ bandwidth = (double)(data.finished - previous_writes)
+ * (data.trans_size)
+ / (1024*1024)
+ / cycleSinceChange;
+ avg_bandwidth = (double) (data.trans_size) * (data.finished)
+ / (double)(ceph_clock_now(g_ceph_context) - data.start_time) / (1024*1024);
+ if (previous_writes != data.finished) {
+ previous_writes = data.finished;
+ cycleSinceChange = 0;
+ cout << setfill(' ')
+ << setw(5) << i
+ << setw(8) << data.in_flight
+ << setw(10) << data.started
+ << setw(10) << data.finished
+ << setw(10) << avg_bandwidth
+ << setw(10) << bandwidth
+ << setw(10) << (double)data.cur_latency
+ << setw(10) << data.avg_latency << std::endl;
+ }
+ else {
+ cout << setfill(' ')
+ << setw(5) << i
+ << setw(8) << data.in_flight
+ << setw(10) << data.started
+ << setw(10) << data.finished
+ << setw(10) << avg_bandwidth
+ << setw(10) << '0'
+ << setw(10) << '-'
+ << setw(10) << data.avg_latency << std::endl;
+ }
+ ++i;
+ ++cycleSinceChange;
+ cond.WaitInterval(g_ceph_context, bencher->lock, ONE_SECOND);
+ }
+ bencher->lock.Unlock();
+ return NULL;
+}
+
+int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
+ int object_size = op_size;
+ int num_objects = 0;
+ char* contentsChars = new char[op_size];
+ int r = 0;
+ int prevPid = 0;
+
+ //get data from previous write run, if available
+ if (operation != OP_WRITE) {
+ bufferlist object_data;
+ r = sync_read(BENCH_DATA, object_data, sizeof(int)*3);
+ if (r <= 0) {
+ delete[] contentsChars;
+ if (r == -2)
+ cerr << "Must write data before running a read benchmark!" << std::endl;
+ return r;
+ }
+ bufferlist::iterator p = object_data.begin();
+ ::decode(object_size, p);
+ ::decode(num_objects, p);
+ ::decode(prevPid, p);
+ } else {
+ object_size = op_size;
+ }
+
+ lock.Lock();
+ data.done = false;
+ data.object_size = object_size;
+ data.trans_size = op_size;
+ data.in_flight = 0;
+ data.started = 0;
+ data.finished = num_objects;
+ data.min_latency = 9999.0; // this better be higher than initial latency!
+ data.max_latency = 0;
+ data.avg_latency = 0;
+ data.object_contents = contentsChars;
+ lock.Unlock();
+
+ //fill in contentsChars deterministically so we can check returns
+ sanitize_object_contents(&data, data.object_size);
+
+ if (OP_WRITE == operation) {
+ r = write_bench(secondsToRun, concurrentios);
+ if (r != 0) goto out;
+ }
+ else if (OP_SEQ_READ == operation) {
+ r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid);
+ if (r != 0) goto out;
+ }
+ else if (OP_RAND_READ == operation) {
+ cerr << "Random test not implemented yet!" << std::endl;
+ r = -1;
+ }
+
+ out:
+ delete[] contentsChars;
+ return r;
+}
+
+struct lock_cond {
+ lock_cond(Mutex *_lock) : lock(_lock) {}
+ Mutex *lock;
+ Cond cond;
+};
+
+void _aio_cb(void *cb, void *arg) {
+ struct lock_cond *lc = (struct lock_cond *)arg;
+ lc->lock->Lock();
+ lc->cond.Signal();
+ lc->lock->Unlock();
+}
+
+int ObjBencher::write_bench(int secondsToRun, int concurrentios) {
+ cout << "Maintaining " << concurrentios << " concurrent writes of "
+ << data.object_size << " bytes for at least "
+ << secondsToRun << " seconds." << std::endl;
+
+ char* name[concurrentios];
+ bufferlist* contents[concurrentios];
+ double total_latency = 0;
+ utime_t start_times[concurrentios];
+ utime_t stopTime;
+ int r = 0;
+ bufferlist b_write;
+ lock_cond lc(&lock);
+ utime_t runtime;
+ utime_t timePassed;
+
+ r = completions_init(concurrentios);
+
+ //set up writes so I can start them together
+ for (int i = 0; i<concurrentios; ++i) {
+ name[i] = new char[128];
+ contents[i] = new bufferlist();
+ generate_object_name(name[i], 128, i);
+ snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", i);
+ contents[i]->append(data.object_contents, data.object_size);
+ }
+
+ pthread_t print_thread;
+
+ pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
+ lock.Lock();
+ data.start_time = ceph_clock_now(g_ceph_context);
+ lock.Unlock();
+ for (int i = 0; i<concurrentios; ++i) {
+ start_times[i] = ceph_clock_now(g_ceph_context);
+ r = create_completion(i, _aio_cb, (void *)&lc);
+ if (r < 0)
+ goto ERR;
+ r = aio_write(name[i], i, *contents[i], data.object_size);
+ if (r < 0) { //naughty, doesn't clean up heap
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ lock.Unlock();
+ }
+
+ //keep on adding new writes as old ones complete until we've passed minimum time
+ int slot;
+ bufferlist* newContents;
+ char* newName;
+
+ //don't need locking for reads because other thread doesn't write
+
+ runtime.set_from_double(secondsToRun);
+ stopTime = data.start_time + runtime;
+ slot = 0;
+ while( ceph_clock_now(g_ceph_context) < stopTime ) {
+ lock.Lock();
+ bool found = false;
+ while (1) {
+ int old_slot = slot;
+ do {
+ if (completion_is_done(slot)) {
+ found = true;
+ break;
+ }
+ slot++;
+ if (slot == concurrentios) {
+ slot = 0;
+ }
+ } while (slot != old_slot);
+ if (found)
+ break;
+ lc.cond.Wait(lock);
+ }
+ lock.Unlock();
+ //create new contents and name on the heap, and fill them
+ newContents = new bufferlist();
+ newName = new char[128];
+ generate_object_name(newName, 128, data.started);
+ snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", data.started);
+ newContents->append(data.object_contents, data.object_size);
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0) {
+ lock.Unlock();
+ goto ERR;
+ }
+ data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+ total_latency += data.cur_latency;
+ if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+ if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+ ++data.finished;
+ data.avg_latency = total_latency / data.finished;
+ --data.in_flight;
+ lock.Unlock();
+ release_completion(slot);
+ timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
+
+ //write new stuff to backend, then delete old stuff
+ //and save locations of new stuff for later deletion
+ start_times[slot] = ceph_clock_now(g_ceph_context);
+ r = create_completion(slot, _aio_cb, &lc);
+ if (r < 0)
+ goto ERR;
+ r = aio_write(newName, slot, *newContents, data.object_size);
+ if (r < 0) {//naughty; doesn't clean up heap space.
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ lock.Unlock();
+ delete[] name[slot];
+ delete contents[slot];
+ name[slot] = newName;
+ contents[slot] = newContents;
+ }
+
+ while (data.finished < data.started) {
+ slot = data.finished % concurrentios;
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0) {
+ lock.Unlock();
+ goto ERR;
+ }
+ data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+ total_latency += data.cur_latency;
+ if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+ if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+ ++data.finished;
+ data.avg_latency = total_latency / data.finished;
+ --data.in_flight;
+ lock.Unlock();
+ release_completion(slot);
+ delete[] name[slot];
+ delete contents[slot];
+ }
+
+ timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
+ lock.Lock();
+ data.done = true;
+ lock.Unlock();
+
+ pthread_join(print_thread, NULL);
+
+ double bandwidth;
+ bandwidth = ((double)data.finished)*((double)data.object_size)/(double)timePassed;
+ bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+ char bw[20];
+ snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
+
+ cout << "Total time run: " << timePassed << std::endl
+ << "Total writes made: " << data.finished << std::endl
+ << "Write size: " << data.object_size << std::endl
+ << "Bandwidth (MB/sec): " << bw << std::endl
+ << "Average Latency: " << data.avg_latency << std::endl
+ << "Max latency: " << data.max_latency << std::endl
+ << "Min latency: " << data.min_latency << std::endl;
+
+ //write object size/number data for read benchmarks
+ ::encode(data.object_size, b_write);
+ ::encode(data.finished, b_write);
+ ::encode(getpid(), b_write);
+ sync_write(BENCH_DATA, b_write, sizeof(int)*3);
+
+ completions_done();
+
+ return 0;
+
+ ERR:
+ lock.Lock();
+ data.done = 1;
+ lock.Unlock();
+ pthread_join(print_thread, NULL);
+ return -5;
+}
+
+int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
+ data.finished = 0;
+
+ lock_cond lc(&lock);
+ char* name[concurrentios];
+ bufferlist* contents[concurrentios];
+ int index[concurrentios];
+ int errors = 0;
+ utime_t start_time;
+ utime_t start_times[concurrentios];
+ utime_t time_to_run;
+ time_to_run.set_from_double(seconds_to_run);
+ double total_latency = 0;
+ int r = 0;
+ utime_t runtime;
+ sanitize_object_contents(&data, data.object_size); //clean it up once; subsequent
+ //changes will be safe because string length should remain the same
+
+ r = completions_init(concurrentios);
+ if (r < 0)
+ return r;
+
+ //set up initial reads
+ for (int i = 0; i < concurrentios; ++i) {
+ name[i] = new char[128];
+ generate_object_name(name[i], 128, i, pid);
+ contents[i] = new bufferlist();
+ }
+
+ pthread_t print_thread;
+ pthread_create(&print_thread, NULL, status_printer, (void *)this);
+
+ lock.Lock();
+ data.start_time = ceph_clock_now(g_ceph_context);
+ lock.Unlock();
+ utime_t finish_time = data.start_time + time_to_run;
+ //start initial reads
+ for (int i = 0; i < concurrentios; ++i) {
+ index[i] = i;
+ start_times[i] = ceph_clock_now(g_ceph_context);
+ create_completion(i, _aio_cb, (void *)&lc);
+ r = aio_read(name[i], i, contents[i], data.object_size);
+ if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
+ cerr << "r = " << r << std::endl;
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ lock.Unlock();
+ }
+
+ //keep on adding new reads as old ones complete
+ int slot;
+ char* newName;
+ bufferlist *cur_contents;
+
+ slot = 0;
+ while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) &&
+ num_objects > data.started) {
+ lock.Lock();
+ int old_slot = slot;
+ bool found = false;
+ while (1) {
+ do {
+ if (completion_is_done(slot)) {
+ found = true;
+ break;
+ }
+ slot++;
+ if (slot == concurrentios) {
+ slot = 0;
+ }
+ } while (slot != old_slot);
+ if (found) {
+ break;
+ }
+ lc.cond.Wait(lock);
+ }
+ lock.Unlock();
+ newName = new char[128];
+ generate_object_name(newName, 128, data.started, pid);
+ int current_index = index[slot];
+ index[slot] = data.started;
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0) {
+ cerr << "read got " << r << std::endl;
+ lock.Unlock();
+ goto ERR;
+ }
+ data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+ total_latency += data.cur_latency;
+ if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+ if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+ ++data.finished;
+ data.avg_latency = total_latency / data.finished;
+ --data.in_flight;
+ lock.Unlock();
+ release_completion(slot);
+ cur_contents = contents[slot];
+
+ //start new read and check data if requested
+ start_times[slot] = ceph_clock_now(g_ceph_context);
+ contents[slot] = new bufferlist();
+ create_completion(slot, _aio_cb, (void *)&lc);
+ r = aio_read(newName, slot, contents[slot], data.object_size);
+ if (r < 0) {
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
+ lock.Unlock();
+ if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
+ cerr << name[slot] << " is not correct!" << std::endl;
+ ++errors;
+ }
+ delete name[slot];
+ name[slot] = newName;
+ delete cur_contents;
+ }
+
+ //wait for final reads to complete
+ while (data.finished < data.started) {
+ slot = data.finished % concurrentios;
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0) {
+ cerr << "read got " << r << std::endl;
+ lock.Unlock();
+ goto ERR;
+ }
+ data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+ total_latency += data.cur_latency;
+ if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+ if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+ ++data.finished;
+ data.avg_latency = total_latency / data.finished;
+ --data.in_flight;
+ release_completion(slot);
+ snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
+ lock.Unlock();
+ if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
+ cerr << name[slot] << " is not correct!" << std::endl;
+ ++errors;
+ }
+ delete[] name[slot];
+ delete contents[slot];
+ }
+
+ runtime = ceph_clock_now(g_ceph_context) - data.start_time;
+ lock.Lock();
+ data.done = true;
+ lock.Unlock();
+
+ pthread_join(print_thread, NULL);
+
+ double bandwidth;
+ bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime;
+ bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+ char bw[20];
+ snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
+
+ cout << "Total time run: " << runtime << std::endl
+ << "Total reads made: " << data.finished << std::endl
+ << "Read size: " << data.object_size << std::endl
+ << "Bandwidth (MB/sec): " << bw << std::endl
+ << "Average Latency: " << data.avg_latency << std::endl
+ << "Max latency: " << data.max_latency << std::endl
+ << "Min latency: " << data.min_latency << std::endl;
+
+ completions_done();
+
+ return 0;
+
+ ERR:
+ lock.Lock();
+ data.done = 1;
+ lock.Unlock();
+ pthread_join(print_thread, NULL);
+ return -5;
+}
+
+
diff --git a/src/common/obj_bencher.h b/src/common/obj_bencher.h
new file mode 100644
index 00000000000..d4105274d0e
--- /dev/null
+++ b/src/common/obj_bencher.h
@@ -0,0 +1,72 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2009 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OBJ_BENCHER_H
+#define CEPH_OBJ_BENCHER_H
+
+#include "common/config.h"
+#include "common/Cond.h"
+
+struct bench_data {
+ bool done; //is the benchmark is done
+ int object_size; //the size of the objects
+ int trans_size; //size of the write/read to perform
+ // same as object_size for write tests
+ int in_flight; //number of reads/writes being waited on
+ int started;
+ int finished;
+ double min_latency;
+ double max_latency;
+ double avg_latency;
+ utime_t cur_latency; //latency of last completed transaction
+ utime_t start_time; //start time for benchmark
+ char *object_contents; //pointer to the contents written to each object
+};
+
+const int OP_WRITE = 1;
+const int OP_SEQ_READ = 2;
+const int OP_RAND_READ = 3;
+
+class ObjBencher {
+protected:
+ Mutex lock;
+
+ static void *status_printer(void *bencher);
+
+ struct bench_data data;
+
+ int write_bench(int secondsToRun, int concurrentios);
+ int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
+
+ virtual int completions_init(int concurrentios) = 0;
+ virtual void completions_done() = 0;
+
+ virtual int create_completion(int i, void (*cb)(void *, void*), void *arg) = 0;
+ virtual void release_completion(int slot) = 0;
+
+ virtual bool completion_is_done(int slot) = 0;
+ virtual int completion_wait(int slot) = 0;
+ virtual int completion_ret(int slot) = 0;
+
+ virtual int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) = 0;
+ virtual int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len) = 0;
+ virtual int sync_read(const std::string& oid, bufferlist& bl, size_t len) = 0;
+ virtual int sync_write(const std::string& oid, bufferlist& bl, size_t len) = 0;
+public:
+ ObjBencher() : lock("ObjBencher::lock") {}
+ virtual ~ObjBencher() {}
+ int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size);
+};
+
+
+#endif
diff --git a/src/libs3 b/src/libs3
new file mode 160000
+Subproject f049cb9840b38e96cb76f78242f067c154d1c66
diff --git a/src/osdc/rados_bencher.h b/src/osdc/rados_bencher.h
deleted file mode 100644
index 3f0e3802a84..00000000000
--- a/src/osdc/rados_bencher.h
+++ /dev/null
@@ -1,571 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2009 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- * Series of functions to test your rados installation. Notice
- * that this code is not terribly robust -- for instance, if you
- * try and bench on a pool you don't have permission to access
- * it will just loop forever.
- */
-#include "include/rados/librados.hpp"
-#include "common/config.h"
-#include "global/global_init.h"
-#include "common/Cond.h"
-#include <iostream>
-#include <fstream>
-
-#include <stdlib.h>
-#include <time.h>
-#include <sstream>
-
-Mutex dataLock("data mutex");
-const int OP_WRITE = 1;
-const int OP_SEQ_READ = 2;
-const int OP_RAND_READ = 3;
-const char *BENCH_DATA = "benchmark_write_data";
-
-struct bench_data {
- bool done; //is the benchmark is done
- int object_size; //the size of the objects
- int trans_size; //size of the write/read to perform
- // same as object_size for write tests
- int in_flight; //number of reads/writes being waited on
- int started;
- int finished;
- double min_latency;
- double max_latency;
- double avg_latency;
- utime_t cur_latency; //latency of last completed transaction
- utime_t start_time; //start time for benchmark
- char *object_contents; //pointer to the contents written to each object
-};
-
-void generate_object_name(char *s, size_t size, int objnum, int pid = 0)
-{
- char hostname[30];
- gethostname(hostname, sizeof(hostname)-1);
- hostname[sizeof(hostname)-1] = 0;
- if (pid) {
- snprintf(s, size, "%s_%d_object%d", hostname, pid, objnum);
- } else {
- snprintf(s, size, "%s_%d_object%d", hostname, getpid(), objnum);
- }
-}
-
-int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx,
- int secondsToRun, int concurrentios, bench_data *data);
-int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx,
- int secondsToRun, int concurrentios, bench_data *data,
- int writePid);
-void *status_printer(void * data_store);
-void sanitize_object_contents(bench_data *data, int length);
-
-int aio_bench(librados::Rados& rados, librados::IoCtx &io_ctx, int operation,
- int secondsToRun, int concurrentios, int op_size) {
- int object_size = op_size;
- int num_objects = 0;
- char* contentsChars = new char[op_size];
- int r = 0;
- int prevPid = 0;
-
- //get data from previous write run, if available
- if (operation != OP_WRITE) {
- bufferlist object_data;
- r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0);
- if (r <= 0) {
- delete[] contentsChars;
- if (r == -2)
- cerr << "Must write data before running a read benchmark!" << std::endl;
- return r;
- }
- bufferlist::iterator p = object_data.begin();
- ::decode(object_size, p);
- ::decode(num_objects, p);
- ::decode(prevPid, p);
- } else {
- object_size = op_size;
- }
-
- dataLock.Lock();
- bench_data *data = new bench_data();
- data->done = false;
- data->object_size = object_size;
- data->trans_size = op_size;
- data->in_flight = 0;
- data->started = 0;
- data->finished = num_objects;
- data->min_latency = 9999.0; // this better be higher than initial latency!
- data->max_latency = 0;
- data->avg_latency = 0;
- data->object_contents = contentsChars;
- dataLock.Unlock();
-
- //fill in contentsChars deterministically so we can check returns
- sanitize_object_contents(data, data->object_size);
-
- if (OP_WRITE == operation) {
- r = write_bench(rados, io_ctx, secondsToRun, concurrentios, data);
- if (r != 0) goto out;
- }
- else if (OP_SEQ_READ == operation) {
- r = seq_read_bench(rados, io_ctx, secondsToRun, concurrentios, data, prevPid);
- if (r != 0) goto out;
- }
- else if (OP_RAND_READ == operation) {
- cerr << "Random test not implemented yet!" << std::endl;
- r = -1;
- }
-
- out:
- delete[] contentsChars;
- delete data;
- return r;
-}
-
-void _aio_cb(void *cb, void *arg) {
- dataLock.Lock();
- Cond *cond = (Cond *) arg;
- cond->Signal();
- dataLock.Unlock();
-}
-
-int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx,
- int secondsToRun, int concurrentios, bench_data *data) {
- cout << "Maintaining " << concurrentios << " concurrent writes of "
- << data->object_size << " bytes for at least "
- << secondsToRun << " seconds." << std::endl;
-
- librados::AioCompletion* completions[concurrentios];
- char* name[concurrentios];
- bufferlist* contents[concurrentios];
- double total_latency = 0;
- utime_t start_times[concurrentios];
- utime_t stopTime;
- int r = 0;
- bufferlist b_write;
- Cond cond;
- utime_t runtime;
- utime_t timePassed;
-
- //set up writes so I can start them together
- for (int i = 0; i<concurrentios; ++i) {
- name[i] = new char[128];
- contents[i] = new bufferlist();
- generate_object_name(name[i], 128, i);
- snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i);
- contents[i]->append(data->object_contents, data->object_size);
- }
-
- pthread_t print_thread;
-
- pthread_create(&print_thread, NULL, status_printer, (void *)data);
- dataLock.Lock();
- data->start_time = ceph_clock_now(g_ceph_context);
- dataLock.Unlock();
- for (int i = 0; i<concurrentios; ++i) {
- start_times[i] = ceph_clock_now(g_ceph_context);
- completions[i] = rados.aio_create_completion((void *) &cond, 0,
- &_aio_cb);
- r = io_ctx.aio_write(name[i], completions[i], *contents[i], data->object_size, 0);
- if (r < 0) { //naughty, doesn't clean up heap
- goto ERR;
- }
- dataLock.Lock();
- ++data->started;
- ++data->in_flight;
- dataLock.Unlock();
- }
-
- //keep on adding new writes as old ones complete until we've passed minimum time
- int slot;
- bufferlist* newContents;
- char* newName;
-
- //don't need locking for reads because other thread doesn't write
-
- runtime.set_from_double(secondsToRun);
- stopTime = data->start_time + runtime;
- while( ceph_clock_now(g_ceph_context) < stopTime ) {
- dataLock.Lock();
- while (1) {
- for (slot = 0; slot < concurrentios; ++slot) {
- if (completions[slot]->is_safe()) {
- break;
- }
- }
- if (slot < concurrentios) {
- break;
- }
- cond.Wait(dataLock);
- }
- dataLock.Unlock();
- //create new contents and name on the heap, and fill them
- newContents = new bufferlist();
- newName = new char[128];
- generate_object_name(newName, 128, data->started);
- snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->started);
- newContents->append(data->object_contents, data->object_size);
- completions[slot]->wait_for_safe();
- dataLock.Lock();
- r = completions[slot]->get_return_value();
- if (r != 0) {
- dataLock.Unlock();
- goto ERR;
- }
- data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
- total_latency += data->cur_latency;
- if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
- if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
- ++data->finished;
- data->avg_latency = total_latency / data->finished;
- --data->in_flight;
- dataLock.Unlock();
- completions[slot]->release();
- completions[slot] = 0;
- timePassed = ceph_clock_now(g_ceph_context) - data->start_time;
-
- //write new stuff to rados, then delete old stuff
- //and save locations of new stuff for later deletion
- start_times[slot] = ceph_clock_now(g_ceph_context);
- completions[slot] = rados.aio_create_completion((void *) &cond, 0, &_aio_cb);
- r = io_ctx.aio_write(newName, completions[slot], *newContents, data->object_size, 0);
- if (r < 0) {//naughty; doesn't clean up heap space.
- goto ERR;
- }
- dataLock.Lock();
- ++data->started;
- ++data->in_flight;
- dataLock.Unlock();
- delete[] name[slot];
- delete contents[slot];
- name[slot] = newName;
- contents[slot] = newContents;
- }
-
- while (data->finished < data->started) {
- slot = data->finished % concurrentios;
- completions[slot]->wait_for_safe();
- dataLock.Lock();
- r = completions[slot]->get_return_value();
- if (r != 0) {
- dataLock.Unlock();
- goto ERR;
- }
- data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
- total_latency += data->cur_latency;
- if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
- if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
- ++data->finished;
- data->avg_latency = total_latency / data->finished;
- --data->in_flight;
- dataLock.Unlock();
- completions[slot]->release();
- completions[slot] = 0;
- delete[] name[slot];
- delete contents[slot];
- }
-
- timePassed = ceph_clock_now(g_ceph_context) - data->start_time;
- dataLock.Lock();
- data->done = true;
- dataLock.Unlock();
-
- pthread_join(print_thread, NULL);
-
- double bandwidth;
- bandwidth = ((double)data->finished)*((double)data->object_size)/(double)timePassed;
- bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
- char bw[20];
- snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
-
- cout << "Total time run: " << timePassed << std::endl
- << "Total writes made: " << data->finished << std::endl
- << "Write size: " << data->object_size << std::endl
- << "Bandwidth (MB/sec): " << bw << std::endl
- << "Average Latency: " << data->avg_latency << std::endl
- << "Max latency: " << data->max_latency << std::endl
- << "Min latency: " << data->min_latency << std::endl;
-
- //write object size/number data for read benchmarks
- ::encode(data->object_size, b_write);
- ::encode(data->finished, b_write);
- ::encode(getpid(), b_write);
- io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0);
- return 0;
-
- ERR:
- dataLock.Lock();
- data->done = 1;
- dataLock.Unlock();
- pthread_join(print_thread, NULL);
- return -5;
-}
-
-int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_to_run,
- int concurrentios, bench_data *write_data, int pid) {
- bench_data *data = new bench_data();
- data->done = false;
- data->object_size = write_data->object_size;
- data->trans_size = data->object_size;
- data->in_flight= 0;
- data->started = 0;
- data->finished = 0;
- data->min_latency = 9999.0;
- data->max_latency = 0;
- data->avg_latency = 0;
- data->object_contents = write_data->object_contents;
-
- Cond cond;
- librados::AioCompletion* completions[concurrentios];
- char* name[concurrentios];
- bufferlist* contents[concurrentios];
- int index[concurrentios];
- int errors = 0;
- utime_t start_time;
- utime_t start_times[concurrentios];
- utime_t time_to_run;
- time_to_run.set_from_double(seconds_to_run);
- double total_latency = 0;
- int r = 0;
- utime_t runtime;
- sanitize_object_contents(data, 128); //clean it up once; subsequent
- //changes will be safe because string length monotonically increases
-
- //set up initial reads
- for (int i = 0; i < concurrentios; ++i) {
- name[i] = new char[128];
- generate_object_name(name[i], 128, i, pid);
- contents[i] = new bufferlist();
- }
-
- pthread_t print_thread;
- pthread_create(&print_thread, NULL, status_printer, (void *)data);
-
- dataLock.Lock();
- data->start_time = ceph_clock_now(g_ceph_context);
- dataLock.Unlock();
- utime_t finish_time = data->start_time + time_to_run;
- //start initial reads
- for (int i = 0; i < concurrentios; ++i) {
- index[i] = i;
- start_times[i] = ceph_clock_now(g_ceph_context);
- completions[i] = rados.aio_create_completion((void *) &cond, &_aio_cb, 0);
- r = io_ctx.aio_read(name[i], completions[i], contents[i], data->object_size, 0);
- if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
- cerr << "r = " << r << std::endl;
- goto ERR;
- }
- dataLock.Lock();
- ++data->started;
- ++data->in_flight;
- dataLock.Unlock();
- }
-
- //keep on adding new reads as old ones complete
- int slot;
- char* newName;
- bufferlist *cur_contents;
-
- while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) &&
- write_data->finished > data->started) {
- dataLock.Lock();
- while (1) {
- for (slot = 0; slot < concurrentios; ++slot) {
- if (completions[slot]->is_complete()) {
- break;
- }
- }
- if (slot < concurrentios) {
- break;
- }
- cond.Wait(dataLock);
- }
- dataLock.Unlock();
- newName = new char[128];
- generate_object_name(newName, 128, data->started, pid);
- int current_index = index[slot];
- index[slot] = data->started;
- completions[slot]->wait_for_complete();
- dataLock.Lock();
- r = completions[slot]->get_return_value();
- if (r != 0) {
- cerr << "read got " << r << std::endl;
- dataLock.Unlock();
- goto ERR;
- }
- data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
- total_latency += data->cur_latency;
- if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
- if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
- ++data->finished;
- data->avg_latency = total_latency / data->finished;
- --data->in_flight;
- dataLock.Unlock();
- completions[slot]->release();
- completions[slot] = 0;
- cur_contents = contents[slot];
-
- //start new read and check data if requested
- start_times[slot] = ceph_clock_now(g_ceph_context);
- contents[slot] = new bufferlist();
- completions[slot] = rados.aio_create_completion((void *) &cond, &_aio_cb, 0);
- r = io_ctx.aio_read(newName, completions[slot], contents[slot], data->object_size, 0);
- if (r < 0) {
- goto ERR;
- }
- dataLock.Lock();
- ++data->started;
- ++data->in_flight;
- snprintf(data->object_contents, data->object_size, "I'm the %dth object!", current_index);
- dataLock.Unlock();
- if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) {
- cerr << name[slot] << " is not correct!" << std::endl;
- ++errors;
- }
- delete name[slot];
- name[slot] = newName;
- delete cur_contents;
- }
-
- //wait for final reads to complete
- while (data->finished < data->started) {
- slot = data->finished % concurrentios;
- completions[slot]->wait_for_complete();
- dataLock.Lock();
- r = completions[slot]->get_return_value();
- if (r != 0) {
- cerr << "read got " << r << std::endl;
- dataLock.Unlock();
- goto ERR;
- }
- data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
- total_latency += data->cur_latency;
- if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
- if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
- ++data->finished;
- data->avg_latency = total_latency / data->finished;
- --data->in_flight;
- completions[slot]-> release();
- completions[slot] = 0;
- snprintf(data->object_contents, data->object_size, "I'm the %dth object!", index[slot]);
- dataLock.Unlock();
- if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) {
- cerr << name[slot] << " is not correct!" << std::endl;
- ++errors;
- }
- delete name[slot];
- delete contents[slot];
- }
-
- runtime = ceph_clock_now(g_ceph_context) - data->start_time;
- dataLock.Lock();
- data->done = true;
- dataLock.Unlock();
-
- pthread_join(print_thread, NULL);
-
- double bandwidth;
- bandwidth = ((double)data->finished)*((double)data->object_size)/(double)runtime;
- bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
- char bw[20];
- snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
-
- cout << "Total time run: " << runtime << std::endl
- << "Total reads made: " << data->finished << std::endl
- << "Read size: " << data->object_size << std::endl
- << "Bandwidth (MB/sec): " << bw << std::endl
- << "Average Latency: " << data->avg_latency << std::endl
- << "Max latency: " << data->max_latency << std::endl
- << "Min latency: " << data->min_latency << std::endl;
-
- delete data;
- return 0;
-
- ERR:
- dataLock.Lock();
- data->done = 1;
- dataLock.Unlock();
- pthread_join(print_thread, NULL);
- return -5;
-}
-
-
-
-void *status_printer(void * data_store) {
- bench_data *data = (bench_data *) data_store;
- Cond cond;
- int i = 0;
- int previous_writes = 0;
- int cycleSinceChange = 0;
- double avg_bandwidth;
- double bandwidth;
- utime_t ONE_SECOND;
- ONE_SECOND.set_from_double(1.0);
- dataLock.Lock();
- while(!data->done) {
- if (i % 20 == 0) {
- if (i > 0)
- cout << "min lat: " << data->min_latency
- << " max lat: " << data->max_latency
- << " avg lat: " << data->avg_latency << std::endl;
- //I'm naughty and don't reset the fill
- cout << setfill(' ')
- << setw(5) << "sec"
- << setw(8) << "Cur ops"
- << setw(10) << "started"
- << setw(10) << "finished"
- << setw(10) << "avg MB/s"
- << setw(10) << "cur MB/s"
- << setw(10) << "last lat"
- << setw(10) << "avg lat" << std::endl;
- }
- bandwidth = (double)(data->finished - previous_writes)
- * (data->trans_size)
- / (1024*1024)
- / cycleSinceChange;
- avg_bandwidth = (double) (data->trans_size) * (data->finished)
- / (double)(ceph_clock_now(g_ceph_context) - data->start_time) / (1024*1024);
- if (previous_writes != data->finished) {
- previous_writes = data->finished;
- cycleSinceChange = 0;
- cout << setfill(' ')
- << setw(5) << i
- << setw(8) << data->in_flight
- << setw(10) << data->started
- << setw(10) << data->finished
- << setw(10) << avg_bandwidth
- << setw(10) << bandwidth
- << setw(10) << (double)data->cur_latency
- << setw(10) << data->avg_latency << std::endl;
- }
- else {
- cout << setfill(' ')
- << setw(5) << i
- << setw(8) << data->in_flight
- << setw(10) << data->started
- << setw(10) << data->finished
- << setw(10) << avg_bandwidth
- << setw(10) << '0'
- << setw(10) << '-'
- << setw(10) << data->avg_latency << std::endl;
- }
- ++i;
- ++cycleSinceChange;
- cond.WaitInterval(g_ceph_context, dataLock, ONE_SECOND);
- }
- dataLock.Unlock();
- return NULL;
-}
-
-inline void sanitize_object_contents (bench_data *data, int length) {
- for (int i = 0; i < length; ++i) {
- data->object_contents[i] = i % sizeof(char);
- }
-}
diff --git a/src/rados.cc b/src/rados.cc
index 9f6959204c9..526a9e4e314 100644
--- a/src/rados.cc
+++ b/src/rados.cc
@@ -17,7 +17,7 @@
#include "include/rados/librados.hpp"
using namespace librados;
-#include "osdc/rados_bencher.h"
+#include "common/obj_bencher.h"
#include "common/config.h"
#include "common/ceph_argparse.h"
@@ -25,6 +25,7 @@ using namespace librados;
#include "common/Cond.h"
#include "common/debug.h"
#include "common/Formatter.h"
+#include "common/obj_bencher.h"
#include "mds/inode_backtrace.h"
#include "auth/Crypto.h"
#include <iostream>
@@ -577,6 +578,64 @@ void LoadGen::cleanup()
}
}
+
+class RadosBencher : public ObjBencher {
+ librados::AioCompletion **completions;
+ librados::Rados& rados;
+ librados::IoCtx& io_ctx;
+protected:
+ int completions_init(int concurrentios) {
+ completions = new librados::AioCompletion *[concurrentios];
+ return 0;
+ }
+ void completions_done() {
+ delete[] completions;
+ completions = NULL;
+ }
+ int create_completion(int slot, void (*cb)(void *, void*), void *arg) {
+ completions[slot] = rados.aio_create_completion((void *) arg, 0, cb);
+
+ if (!completions[slot])
+ return -EINVAL;
+
+ return 0;
+ }
+ void release_completion(int slot) {
+ completions[slot]->release();
+ completions[slot] = 0;
+ }
+
+ int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) {
+ return io_ctx.aio_read(oid, completions[slot], pbl, len, 0);
+ }
+
+ int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len) {
+ return io_ctx.aio_write(oid, completions[slot], bl, len, 0);
+ }
+
+ int sync_read(const std::string& oid, bufferlist& bl, size_t len) {
+ return io_ctx.read(oid, bl, len, 0);
+ }
+ int sync_write(const std::string& oid, bufferlist& bl, size_t len) {
+ return io_ctx.write(oid, bl, len, 0);
+ }
+
+ bool completion_is_done(int slot) {
+ return completions[slot]->is_safe();
+ }
+
+ int completion_wait(int slot) {
+ return completions[slot]->wait_for_safe();
+ }
+ int completion_ret(int slot) {
+ return completions[slot]->get_return_value();
+ }
+
+public:
+ RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : completions(NULL), rados(_r), io_ctx(_i) {}
+ ~RadosBencher() { }
+};
+
/**********************************************
**********************************************/
@@ -1218,7 +1277,8 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
operation = OP_RAND_READ;
else
usage_exit();
- ret = aio_bench(rados, io_ctx, operation, seconds, concurrent_ios, op_size);
+ RadosBencher bencher(rados, io_ctx);
+ ret = bencher.aio_bench(operation, seconds, concurrent_ios, op_size);
if (ret != 0)
cerr << "error during benchmark: " << ret << std::endl;
}
diff --git a/src/tools/rest_bench.cc b/src/tools/rest_bench.cc
new file mode 100644
index 00000000000..05cda32c8d6
--- /dev/null
+++ b/src/tools/rest_bench.cc
@@ -0,0 +1,644 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/types.h"
+#include "include/atomic.h"
+
+#include "common/obj_bencher.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/ceph_argparse.h"
+#include "common/WorkQueue.h"
+#include "msg/Message.h"
+#include "global/global_init.h"
+
+#include "libs3.h"
+
+#include <deque>
+
+#include <errno.h>
+
+#define DEFAULT_USER_AGENT "rest-bench"
+#define DEFAULT_BUCKET "rest-bench-bucket"
+
+void usage(ostream& out)
+{
+ out << \
+"usage: rest-bench [options] <write|seq>\n"
+"BENCHMARK OPTIONS\n"
+" --seconds\n"
+" benchmak length (default: 60)\n"
+" -t concurrent_operations\n"
+" --concurrent-ios=concurrent_operations\n"
+" select bucket by name\n"
+" -b op-size\n"
+" --block-size=op-size\n"
+" set the size of write ops for put or benchmarking\n"
+"REST CONFIG OPTIONS\n"
+" --api-host=bhost\n"
+" host name\n"
+" --bucket=bucket\n"
+" select bucket by name\n"
+" --access-key=access_key\n"
+" access key to RESTful storage provider\n"
+" --secret=secret_key\n"
+" secret key for the specified access key\n"
+" --protocol=<http|https>\n"
+" protocol to be used (default: http)\n"
+" --uri_style=<path|vhost>\n"
+" uri style in requests (default: path)\n";
+}
+
+static void usage_exit()
+{
+ usage(cerr);
+ exit(1);
+}
+
+enum OpType {
+ OP_NONE = 0,
+ OP_GET_OBJ = 1,
+ OP_PUT_OBJ = 2,
+};
+
+struct req_context : public RefCountedObject {
+ bool complete;
+ S3Status status;
+ S3RequestContext *ctx;
+ void (*cb)(void *, void *);
+ void *arg;
+ bufferlist *in_bl;
+ bufferlist out_bl;
+ uint64_t off;
+ uint64_t len;
+ string oid;
+ Mutex lock;
+ Cond cond;
+ S3BucketContext *bucket_ctx;
+
+ bool should_destroy_ctx;
+
+ OpType op;
+
+ req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), len(0),
+ lock("req_context"), bucket_ctx(NULL), should_destroy_ctx(false), op(OP_NONE) {}
+ ~req_context() {
+ if (should_destroy_ctx) {
+ S3_destroy_request_context(ctx);
+ }
+ }
+
+ int init_ctx() {
+ S3Status status = S3_create_request_context(&ctx);
+ if (status != S3StatusOK) {
+ cerr << "failed to create context: " << S3_get_status_name(status) << std::endl;
+ return -EINVAL;
+ }
+ should_destroy_ctx = true;
+
+ return 0;
+ }
+
+ int ret() {
+ if (status != S3StatusOK) {
+ return -EINVAL;
+ }
+ return 0;
+ }
+};
+
+static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data)
+{
+ return S3StatusOK;
+}
+
+static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data)
+{
+ if (!cb_data)
+ return;
+
+ struct req_context *ctx = (struct req_context *)cb_data;
+
+ ctx->lock.Lock();
+ ctx->complete = true;
+ ctx->status = status;
+ ctx->lock.Unlock();
+
+ if (ctx->cb) {
+ ctx->cb((void *)ctx->cb, ctx->arg);
+ }
+
+ ctx->put();
+}
+
+static S3Status get_obj_callback(int size, const char *buf,
+ void *cb_data)
+{
+ if (!cb_data)
+ return S3StatusOK;
+
+ struct req_context *ctx = (struct req_context *)cb_data;
+
+ ctx->in_bl->append(buf, size);
+
+ return S3StatusOK;
+}
+
+static int put_obj_callback(int size, char *buf,
+ void *cb_data)
+{
+ if (!cb_data)
+ return 0;
+
+ struct req_context *ctx = (struct req_context *)cb_data;
+
+ int chunk = ctx->out_bl.length() - ctx->off;
+ if (!chunk)
+ return 0;
+
+ if (chunk > size)
+ chunk = size;
+
+ memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk);
+
+ ctx->off += chunk;
+
+ return chunk;
+}
+
+class RESTDispatcher {
+ deque<req_context *> m_req_queue;
+ ThreadPool m_tp;
+
+ S3ResponseHandler response_handler;
+ S3GetObjectHandler get_obj_handler;
+ S3PutObjectHandler put_obj_handler;
+
+ struct DispatcherWQ : public ThreadPool::WorkQueue<req_context> {
+ RESTDispatcher *dispatcher;
+ DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<req_context>("REST", timeout, suicide_timeout, tp), dispatcher(p) {}
+
+ bool _enqueue(req_context *req) {
+ dispatcher->m_req_queue.push_back(req);
+ _dump_queue();
+ return true;
+ }
+ void _dequeue(req_context *req) {
+ assert(0);
+ }
+ bool _empty() {
+ return dispatcher->m_req_queue.empty();
+ }
+ req_context *_dequeue() {
+ if (dispatcher->m_req_queue.empty())
+ return NULL;
+ req_context *req = dispatcher->m_req_queue.front();
+ dispatcher->m_req_queue.pop_front();
+ _dump_queue();
+ return req;
+ }
+ void _process(req_context *req) {
+ dispatcher->process_context(req);
+ }
+ void _dump_queue() {
+ deque<req_context *>::iterator iter;
+ if (dispatcher->m_req_queue.size() == 0) {
+ generic_dout(20) << "DispatcherWQ: empty" << dendl;
+ return;
+ }
+ generic_dout(20) << "DispatcherWQ:" << dendl;
+ for (iter = dispatcher->m_req_queue.begin(); iter != dispatcher->m_req_queue.end(); ++iter) {
+ generic_dout(20) << "req: " << hex << *iter << dec << dendl;
+ }
+ }
+ void _clear() {
+ assert(dispatcher->m_req_queue.empty());
+ }
+ } req_wq;
+
+public:
+ RESTDispatcher(CephContext *cct, int num_threads)
+ : m_tp(cct, "RESTDispatcher::m_tp", num_threads),
+ req_wq(this, g_conf->rgw_op_thread_timeout,
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
+
+
+ response_handler.propertiesCallback = properties_callback;
+ response_handler.completeCallback = complete_callback;
+
+ get_obj_handler.responseHandler = response_handler;
+ get_obj_handler.getObjectDataCallback = get_obj_callback;
+
+ put_obj_handler.responseHandler = response_handler;
+ put_obj_handler.putObjectDataCallback = put_obj_callback;
+
+ }
+ void process_context(req_context *ctx);
+ void get_obj(req_context *ctx);
+ void put_obj(req_context *ctx);
+
+ void queue(req_context *ctx) {
+ req_wq.queue(ctx);
+ }
+
+ void start() {
+ m_tp.start();
+ }
+};
+
+void RESTDispatcher::process_context(req_context *ctx)
+{
+ ctx->get();
+
+ switch (ctx->op) {
+ case OP_GET_OBJ:
+ get_obj(ctx);
+ break;
+ case OP_PUT_OBJ:
+ put_obj(ctx);
+ break;
+ default:
+ assert(0);
+ }
+
+ S3Status status = S3_runall_request_context(ctx->ctx);
+
+ if (status != S3StatusOK) {
+ cerr << "ERROR: S3_runall_request_context() returned " << S3_get_status_name(status) << std::endl;
+ ctx->status = status;
+ } else if (ctx->status != S3StatusOK) {
+ cerr << "ERROR: " << ctx->oid << ": " << S3_get_status_name(ctx->status) << std::endl;
+ }
+
+ ctx->lock.Lock();
+ ctx->cond.SignalAll();
+ ctx->lock.Unlock();
+
+ ctx->put();
+}
+
+void RESTDispatcher::put_obj(req_context *ctx)
+{
+ S3_put_object(ctx->bucket_ctx, ctx->oid.c_str(),
+ ctx->out_bl.length(),
+ NULL,
+ ctx->ctx,
+ &put_obj_handler, ctx);
+}
+
+void RESTDispatcher::get_obj(req_context *ctx)
+{
+ S3_get_object(ctx->bucket_ctx, ctx->oid.c_str(), NULL, 0, ctx->len, ctx->ctx,
+ &get_obj_handler, ctx);
+}
+
+class RESTBencher : public ObjBencher {
+ RESTDispatcher *dispatcher;
+ struct req_context **completions;
+ struct S3RequestContext **handles;
+ S3BucketContext bucket_ctx;
+ string user_agent;
+ string host;
+ string bucket;
+ S3Protocol protocol;
+ string access_key;
+ string secret;
+ int concurrentios;
+
+protected:
+ int rest_init() {
+ S3Status status = S3_initialize(user_agent.c_str(), S3_INIT_ALL, host.c_str());
+ if (status != S3StatusOK) {
+ cerr << "failed to init: " << S3_get_status_name(status) << std::endl;
+ return -EINVAL;
+ }
+
+
+ return 0;
+ }
+
+
+ int completions_init(int _concurrentios) {
+ concurrentios = _concurrentios;
+ completions = new req_context *[concurrentios];
+ handles = new S3RequestContext *[concurrentios];
+ for (int i = 0; i < concurrentios; i++) {
+ completions[i] = NULL;
+ S3Status status = S3_create_request_context(&handles[i]);
+ if (status != S3StatusOK) {
+ cerr << "failed to create context: " << S3_get_status_name(status) << std::endl;
+ return -EINVAL;
+ }
+ }
+ return 0;
+ }
+ void completions_done() {
+ delete[] completions;
+ completions = NULL;
+ for (int i = 0; i < concurrentios; i++) {
+ S3_destroy_request_context(handles[i]);
+ }
+ delete[] handles;
+ handles = NULL;
+ }
+ int create_completion(int slot, void (*cb)(void *, void*), void *arg) {
+ struct req_context *ctx = new req_context;
+ ctx->ctx = handles[slot];
+ ctx->cb = cb;
+ ctx->arg = arg;
+
+ completions[slot] = ctx;
+
+ return 0;
+ }
+ void release_completion(int slot) {
+ struct req_context *ctx = completions[slot];
+
+ ctx->put();
+ completions[slot] = 0;
+ }
+
+ int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) {
+ struct req_context *ctx = completions[slot];
+
+ ctx->get();
+ ctx->in_bl = pbl;
+ ctx->oid = oid;
+ ctx->len = len;
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->op = OP_GET_OBJ;
+
+ dispatcher->queue(ctx);
+
+ return 0;
+ }
+
+ int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len) {
+ struct req_context *ctx = completions[slot];
+
+ ctx->get();
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->out_bl = bl;
+ ctx->oid = oid;
+ ctx->len = len;
+ ctx->op = OP_PUT_OBJ;
+
+ dispatcher->queue(ctx);
+ return 0;
+ }
+
+ int sync_read(const std::string& oid, bufferlist& bl, size_t len) {
+ struct req_context *ctx = new req_context;
+ int ret = ctx->init_ctx();
+ if (ret < 0) {
+ return ret;
+ }
+ ctx->in_bl = &bl;
+ ctx->get();
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->oid = oid;
+ ctx->len = len;
+ ctx->op = OP_GET_OBJ;
+
+ dispatcher->process_context(ctx);
+ ret = ctx->ret();
+ ctx->put();
+ return bl.length();
+ }
+ int sync_write(const std::string& oid, bufferlist& bl, size_t len) {
+ struct req_context *ctx = new req_context;
+ int ret = ctx->init_ctx();
+ if (ret < 0) {
+ return ret;
+ }
+ ctx->get();
+ ctx->out_bl = bl;
+ ctx->bucket_ctx = &bucket_ctx;
+ ctx->oid = oid;
+ ctx->op = OP_PUT_OBJ;
+
+ dispatcher->process_context(ctx);
+ ret = ctx->ret();
+ ctx->put();
+ return ret;
+ }
+
+ bool completion_is_done(int slot) {
+ return completions[slot]->complete;
+ }
+
+ int completion_wait(int slot) {
+ req_context *ctx = completions[slot];
+
+ Mutex::Locker l(ctx->lock);
+
+ while (!ctx->complete) {
+ ctx->cond.Wait(ctx->lock);
+ }
+
+ return 0;
+ }
+
+ int completion_ret(int slot) {
+ S3Status status = completions[slot]->status;
+ if (status != S3StatusOK)
+ return -EIO;
+ return 0;
+ }
+
+public:
+ RESTBencher(RESTDispatcher *_dispatcher) : dispatcher(_dispatcher), completions(NULL) {
+ dispatcher->start();
+ }
+ ~RESTBencher() { }
+
+ int init(string& _agent, string& _host, string& _bucket, S3Protocol _protocol,
+ S3UriStyle uri_style, string& _access_key, string& _secret) {
+ user_agent = _agent;
+ host = _host;
+ bucket = _bucket;
+ protocol = _protocol;
+ access_key = _access_key;
+ secret = _secret;
+
+ bucket_ctx.hostName = NULL; // host.c_str();
+ bucket_ctx.bucketName = bucket.c_str();
+ bucket_ctx.protocol = protocol;
+ bucket_ctx.accessKeyId = access_key.c_str();
+ bucket_ctx.secretAccessKey = secret.c_str();
+ bucket_ctx.uriStyle = uri_style;
+
+ struct req_context *ctx = new req_context;
+
+ int ret = rest_init();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = ctx->init_ctx();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ctx->get();
+
+ S3ResponseHandler response_handler;
+ response_handler.propertiesCallback = properties_callback;
+ response_handler.completeCallback = complete_callback;
+
+ S3_create_bucket(protocol, access_key.c_str(), secret.c_str(), NULL,
+ bucket.c_str(), S3CannedAclPrivate,
+ NULL, /* locationConstraint */
+ NULL, /* requestContext */
+ &response_handler, /* handler */
+ (void *)ctx /* callbackData */);
+
+ ret = ctx->ret();
+ if (ret < 0) {
+ cerr << "ERROR: failed to create bucket: " << S3_get_status_name(ctx->status) << std::endl;
+ return ret;
+ }
+
+ ctx->put();
+
+ return 0;
+ }
+};
+
+int main(int argc, const char **argv)
+{
+ vector<const char*> args;
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+ common_init_finish(g_ceph_context);
+
+ std::map < std::string, std::string > opts;
+ std::vector<const char*>::iterator i;
+ std::string host;
+ std::string val;
+ std::string user_agent;
+ std::string access_key;
+ std::string secret;
+ std::string bucket = DEFAULT_BUCKET;
+ S3Protocol protocol = S3ProtocolHTTP;
+ S3UriStyle uri_style = S3UriStylePath;
+ std::string proto_str;
+ int concurrent_ios = 16;
+ int op_size = 1 << 22;
+ int seconds = 60;
+
+
+ for (i = args.begin(); i != args.end(); ) {
+ if (ceph_argparse_double_dash(args, i)) {
+ break;
+ } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
+ usage(cout);
+ exit(0);
+ } else if (ceph_argparse_witharg(args, i, &user_agent, "--agent", (char*)NULL)) {
+ /* nothing */
+ } else if (ceph_argparse_witharg(args, i, &access_key, "--access-key", (char*)NULL)) {
+ /* nothing */
+ } else if (ceph_argparse_witharg(args, i, &secret, "--secret", (char*)NULL)) {
+ /* nothing */
+ } else if (ceph_argparse_witharg(args, i, &bucket, "--bucket", (char*)NULL)) {
+ /* nothing */
+ } else if (ceph_argparse_witharg(args, i, &host, "--api-host", (char*)NULL)) {
+ cerr << "host=" << host << std::endl;
+ /* nothing */
+ } else if (ceph_argparse_witharg(args, i, &proto_str, "--protocol", (char*)NULL)) {
+ if (strcasecmp(proto_str.c_str(), "http") == 0) {
+ protocol = S3ProtocolHTTP;
+ } else if (strcasecmp(proto_str.c_str(), "http") == 0) {
+ protocol = S3ProtocolHTTPS;
+ } else {
+ cerr << "bad protocol" << std::endl;
+ usage_exit();
+ }
+ /* nothing */
+ } else if (ceph_argparse_witharg(args, i, &proto_str, "--uri-style", (char*)NULL)) {
+ if (strcasecmp(proto_str.c_str(), "vhost") == 0) {
+ uri_style = S3UriStyleVirtualHost;
+ } else if (strcasecmp(proto_str.c_str(), "path") == 0) {
+ uri_style = S3UriStylePath;
+ } else {
+ cerr << "bad protocol" << std::endl;
+ usage_exit();
+ }
+ } else if (ceph_argparse_witharg(args, i, &val, "-t", "--concurrent-ios", (char*)NULL)) {
+ concurrent_ios = strtol(val.c_str(), NULL, 10);
+ } else if (ceph_argparse_witharg(args, i, &val, "--seconds", (char*)NULL)) {
+ seconds = strtol(val.c_str(), NULL, 10);
+ } else if (ceph_argparse_witharg(args, i, &val, "-b", "--block-size", (char*)NULL)) {
+ op_size = strtol(val.c_str(), NULL, 10);
+ } else {
+ if (val[0] == '-')
+ usage_exit();
+ i++;
+ }
+ }
+
+ if (bucket.empty()) {
+ cerr << "rest-bench: bucket not specified" << std::endl;
+ usage_exit();
+ }
+ if (args.size() < 1)
+ usage_exit();
+ int operation = 0;
+ if (strcmp(args[0], "write") == 0)
+ operation = OP_WRITE;
+ else if (strcmp(args[0], "seq") == 0)
+ operation = OP_SEQ_READ;
+ else if (strcmp(args[0], "rand") == 0)
+ operation = OP_RAND_READ;
+ else
+ usage_exit();
+
+ if (host.empty()) {
+ cerr << "rest-bench: api host not provided." << std::endl;
+ usage_exit();
+ }
+
+ if (access_key.empty() || secret.empty()) {
+ cerr << "rest-bench: access key or secret was not provided" << std::endl;
+ usage_exit();
+ }
+
+ if (bucket.empty()) {
+ bucket = DEFAULT_BUCKET;
+ }
+
+ if (user_agent.empty())
+ user_agent = DEFAULT_USER_AGENT;
+
+ RESTDispatcher dispatcher(g_ceph_context, concurrent_ios);
+
+ RESTBencher bencher(&dispatcher);
+
+ int ret = bencher.init(user_agent, host, bucket, protocol, uri_style, access_key, secret);
+ if (ret < 0) {
+ cerr << "failed initializing benchmark" << std::endl;
+ exit(1);
+ }
+
+ ret = bencher.aio_bench(operation, seconds, concurrent_ios, op_size);
+ if (ret != 0) {
+ cerr << "error during benchmark: " << ret << std::endl;
+ }
+
+ return 0;
+}
+