diff options
author | Sage Weil <sage@newdream.net> | 2012-05-07 15:57:31 -0700 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2012-05-07 15:57:31 -0700 |
commit | ac903210d0cd80e6a1205d178483a16a6ae4f014 (patch) | |
tree | acec2ad224cdc95ba969083e992f6df671f6b94e | |
parent | efc0701cf97f6a936c8f253b5449216f309fe4a3 (diff) | |
parent | 6c2c883c175421d8c492c64154ed3e26bbdd6595 (diff) | |
download | ceph-ac903210d0cd80e6a1205d178483a16a6ae4f014.tar.gz |
Merge branch 'wip-rgw-bench'
Conflicts:
debian/rules
-rw-r--r-- | .gitmodules | 3 | ||||
-rw-r--r-- | configure.ac | 18 | ||||
-rw-r--r-- | debian/control | 16 | ||||
-rw-r--r-- | debian/rest-bench.install | 1 | ||||
-rwxr-xr-x | debian/rules | 3 | ||||
-rw-r--r-- | src/Makefile.am | 49 | ||||
-rw-r--r-- | src/common/obj_bencher.cc | 555 | ||||
-rw-r--r-- | src/common/obj_bencher.h | 72 | ||||
m--------- | src/libs3 | 0 | ||||
-rw-r--r-- | src/osdc/rados_bencher.h | 571 | ||||
-rw-r--r-- | src/rados.cc | 64 | ||||
-rw-r--r-- | src/tools/rest_bench.cc | 644 |
12 files changed, 1415 insertions, 581 deletions
diff --git a/.gitmodules b/.gitmodules index 2bfe0fa6852..c5a0932b7af 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "src/leveldb"] path = src/leveldb url = git://github.com/ceph/leveldb.git +[submodule "src/libs3"] + path = src/libs3 + url = git://github.com/ceph/libs3.git diff --git a/configure.ac b/configure.ac index b6ff4e3faa5..f7b0b9304d6 100644 --- a/configure.ac +++ b/configure.ac @@ -308,6 +308,24 @@ AS_IF([test "x$with_system_leveldb" = xcheck], [AC_CHECK_LIB([leveldb], [leveldb_open], [with_system_leveldb=yes], [], [-lsnappy -lpthread])]) AM_CONDITIONAL(WITH_SYSTEM_LEVELDB, [ test "$with_system_leveldb" = "yes" ]) +# use system libs3? +AC_ARG_WITH([system-libs3], + [AS_HELP_STRING([--with-system-libs3], [use system libs3])], + , + [with_system_libs3=no]) +AS_IF([test "x$with_system_libs3" = xyes], + [AC_CHECK_LIB([s3], [S3_initialize], [true], [AC_MSG_FAILURE([libs3 not found])], [-lpthread])]) +AS_IF([test "x$with_system_libs3" = xcheck], + [AC_SEARCH_LIBS([S3_initialize], [s3], [with_system_libs3=yes], [true], [-lpthread])]) +AM_CONDITIONAL(WITH_SYSTEM_LIBS3, [ test "$with_system_libs3" = "yes" ]) + +# rest-bench? +AC_ARG_WITH([rest-bench], + [AS_HELP_STRING([--with-rest-bench], [enables rest-bench])], + [], + [with_rest_bench=no]) +AM_CONDITIONAL(WITH_REST_BENCH, [ test "$with_rest_bench" = "yes" ]) + # use libaio? AC_ARG_WITH([libaio], [AS_HELP_STRING([--without-libaio], [disable libaio use by journal])], diff --git a/debian/control b/debian/control index 0a251b01f06..b482caa8745 100644 --- a/debian/control +++ b/debian/control @@ -6,7 +6,7 @@ Vcs-Git: git://github.com/ceph/ceph.git Vcs-Browser: https://github.com/ceph/ceph Maintainer: Laszlo Boszormenyi (GCS) <gcs@debian.hu> Uploaders: Sage Weil <sage@newdream.net> -Build-Depends: debhelper (>= 6.0.7~), autotools-dev, autoconf, automake, libfuse-dev, libboost-dev (>= 1.34), libedit-dev, libcrypto++-dev, libtool, libexpat1-dev, libfcgi-dev, libatomic-ops-dev, libgoogle-perftools-dev [i386 amd64], pkg-config, libgtkmm-2.4-dev, python, libcurl4-gnutls-dev, libkeyutils-dev, uuid-dev, libaio-dev, python (>= 2.6.6-3~) +Build-Depends: debhelper (>= 6.0.7~), autotools-dev, autoconf, automake, libfuse-dev, libboost-dev (>= 1.34), libedit-dev, libcrypto++-dev, libtool, libexpat1-dev, libfcgi-dev, libatomic-ops-dev, libgoogle-perftools-dev [i386 amd64], pkg-config, libgtkmm-2.4-dev, python, libcurl4-gnutls-dev, libkeyutils-dev, uuid-dev, libaio-dev, python (>= 2.6.6-3~), libxml2-dev Standards-Version: 3.9.3 Package: ceph @@ -271,6 +271,20 @@ Description: debugging symbols for radosgw . This package contains debugging symbols for radosgw. +Package: rest-bench +Architecture: linux-any +Depends: ${shlibs:Depends}, ${misc:Depends}, ceph-common, xml2, curl +Description: RESTful bencher that can be used to benchmark + radosgw performance. + +Package: rest-bench-dbg +Architecture: linux-any +Section: debug +Priority: extra +Depends: ${shlibs:Depends}, ${misc:Depends}, ceph-common, xml2, curl +Description: RESTful bencher that can be used to benchmark + radosgw performance. + Package: obsync Architecture: linux-any Depends: ${misc:Depends}, python, python-boto, python-ceph, python-pyxattr, python-lxml diff --git a/debian/rest-bench.install b/debian/rest-bench.install new file mode 100644 index 00000000000..8535f20d5a4 --- /dev/null +++ b/debian/rest-bench.install @@ -0,0 +1 @@ +usr/bin/rest-bench diff --git a/debian/rules b/debian/rules index 5d02e96ac79..84f3daa2789 100755 --- a/debian/rules +++ b/debian/rules @@ -20,7 +20,7 @@ endif export DEB_HOST_ARCH ?= $(shell dpkg-architecture -qDEB_HOST_ARCH) -extraopts += --with-ocf +extraopts += --with-ocf --with-rest-bench ifeq ($(DEB_HOST_ARCH), armel) # armel supports ARMv4t or above instructions sets. @@ -116,6 +116,7 @@ binary-arch: build install dh_strip -plibcephfs1 --dbg-package=libcephfs1-dbg dh_strip -pradosgw --dbg-package=radosgw-dbg dh_strip -pgceph --dbg-package=gceph-dbg + dh_strip -prest-bench --dbg-package=rest-bench-dbg dh_compress dh_fixperms diff --git a/src/Makefile.am b/src/Makefile.am index 4dd229dbb7e..00d22c90c2e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,26 @@ AUTOMAKE_OPTIONS = gnu -SUBDIRS = ocf -DIST_SUBDIRS = gtest ocf leveldb +SUBDIRS = ocf libs3 +DIST_SUBDIRS = gtest ocf leveldb libs3 + +EXTRA_DIST = \ + libs3/COPYING \ + libs3/ChangeLog \ + libs3/GNUmakefile \ + libs3/GNUmakefile.mingw \ + libs3/GNUmakefile.osx \ + libs3/INSTALL \ + libs3/LICENSE \ + libs3/README \ + libs3/TODO \ + libs3/archlinux \ + libs3/debian \ + libs3/doxyfile \ + libs3/inc \ + libs3/libs3.spec \ + libs3/mswin \ + libs3/src \ + libs3/test + CLEANFILES = bin_PROGRAMS = # like bin_PROGRAMS, but these targets are only built for debug builds @@ -367,10 +387,27 @@ librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 \ -export-symbols-regex '^rbd_.*' $(PTHREAD_LIBS) $(EXTRALIBS) lib_LTLIBRARIES += librbd.la -rados_SOURCES = rados.cc rados_import.cc rados_export.cc rados_sync.cc +rados_SOURCES = rados.cc rados_import.cc rados_export.cc rados_sync.cc common/obj_bencher.cc rados_LDADD = libglobal.la librados.la $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) bin_PROGRAMS += rados +if WITH_REST_BENCH + +rest_bench_SOURCES = tools/rest_bench.cc common/obj_bencher.cc +rest_bench_LDADD = libglobal.la $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) +rest_bench_CXXFLAGS = ${AM_CXXFLAGS} +bin_PROGRAMS += rest-bench + +if WITH_SYSTEM_LIBS3 +rest_bench_LDADD += -ls3 +else +rest_bench_LDADD += libs3/build/lib/libs3.a -lcurl -lxml2 +rest_bench_CXXFLAGS += -I$(top_srcdir)/src/libs3/inc +SUBDIRS += libs3 +endif + +endif + scratchtool_SOURCES = scratchtool.c scratchtool_LDADD = librados.la $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) scratchtoolpp_SOURCES = scratchtoolpp.cc @@ -867,7 +904,8 @@ EXTRALIBS += -lgcov endif # extra bits -EXTRA_DIST = $(srcdir)/verify-mds-journal.sh $(srcdir)/vstart.sh $(srcdir)/stop.sh \ +EXTRA_DIST += \ + $(srcdir)/verify-mds-journal.sh $(srcdir)/vstart.sh $(srcdir)/stop.sh \ ceph-run $(srcdir)/ceph_common.sh \ $(srcdir)/init-radosgw \ $(srcdir)/ceph-clsinfo $(srcdir)/make_version $(srcdir)/check_version \ @@ -1221,6 +1259,7 @@ noinst_HEADERS = \ common/environment.h\ common/likely.h\ common/lockdep.h\ + common/obj_bencher.h\ common/snap_types.h\ common/Clock.h\ common/Cond.h\ @@ -1557,7 +1596,6 @@ noinst_HEADERS = \ osd/ReplicatedPG.h\ osd/Watch.h\ osd/osd_types.h\ - osdc/rados_bencher.h\ osdc/Blinker.h\ osdc/Filer.h\ osdc/Journaler.h\ @@ -1653,7 +1691,6 @@ if WITH_DEBUG bin_PROGRAMS += $(bin_DEBUGPROGRAMS) endif - project.tgz: clean cov-configure -co /usr/bin/gcc cov-configure -co /usr/bin/g++ diff --git a/src/common/obj_bencher.cc b/src/common/obj_bencher.cc new file mode 100644 index 00000000000..05bfb72a457 --- /dev/null +++ b/src/common/obj_bencher.cc @@ -0,0 +1,555 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2009 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + * Series of functions to test your rados installation. Notice + * that this code is not terribly robust -- for instance, if you + * try and bench on a pool you don't have permission to access + * it will just loop forever. + */ +#include "common/Cond.h" +#include "obj_bencher.h" + +#include <iostream> +#include <fstream> + +#include <stdlib.h> +#include <time.h> +#include <sstream> + + +const char *BENCH_DATA = "benchmark_write_data"; + +static void generate_object_name(char *s, size_t size, int objnum, int pid = 0) +{ + char hostname[30]; + gethostname(hostname, sizeof(hostname)-1); + hostname[sizeof(hostname)-1] = 0; + if (pid) { + snprintf(s, size, "%s_%d_object%d", hostname, pid, objnum); + } else { + snprintf(s, size, "%s_%d_object%d", hostname, getpid(), objnum); + } +} + +static void sanitize_object_contents (bench_data *data, int length) { + memset(data->object_contents, 'z', length); +} + +void *ObjBencher::status_printer(void *_bencher) { + ObjBencher *bencher = (ObjBencher *)_bencher; + bench_data& data = bencher->data; + Cond cond; + int i = 0; + int previous_writes = 0; + int cycleSinceChange = 0; + double avg_bandwidth; + double bandwidth; + utime_t ONE_SECOND; + ONE_SECOND.set_from_double(1.0); + bencher->lock.Lock(); + while(!data.done) { + if (i % 20 == 0) { + if (i > 0) + cout << "min lat: " << data.min_latency + << " max lat: " << data.max_latency + << " avg lat: " << data.avg_latency << std::endl; + //I'm naughty and don't reset the fill + cout << setfill(' ') + << setw(5) << "sec" + << setw(8) << "Cur ops" + << setw(10) << "started" + << setw(10) << "finished" + << setw(10) << "avg MB/s" + << setw(10) << "cur MB/s" + << setw(10) << "last lat" + << setw(10) << "avg lat" << std::endl; + } + bandwidth = (double)(data.finished - previous_writes) + * (data.trans_size) + / (1024*1024) + / cycleSinceChange; + avg_bandwidth = (double) (data.trans_size) * (data.finished) + / (double)(ceph_clock_now(g_ceph_context) - data.start_time) / (1024*1024); + if (previous_writes != data.finished) { + previous_writes = data.finished; + cycleSinceChange = 0; + cout << setfill(' ') + << setw(5) << i + << setw(8) << data.in_flight + << setw(10) << data.started + << setw(10) << data.finished + << setw(10) << avg_bandwidth + << setw(10) << bandwidth + << setw(10) << (double)data.cur_latency + << setw(10) << data.avg_latency << std::endl; + } + else { + cout << setfill(' ') + << setw(5) << i + << setw(8) << data.in_flight + << setw(10) << data.started + << setw(10) << data.finished + << setw(10) << avg_bandwidth + << setw(10) << '0' + << setw(10) << '-' + << setw(10) << data.avg_latency << std::endl; + } + ++i; + ++cycleSinceChange; + cond.WaitInterval(g_ceph_context, bencher->lock, ONE_SECOND); + } + bencher->lock.Unlock(); + return NULL; +} + +int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) { + int object_size = op_size; + int num_objects = 0; + char* contentsChars = new char[op_size]; + int r = 0; + int prevPid = 0; + + //get data from previous write run, if available + if (operation != OP_WRITE) { + bufferlist object_data; + r = sync_read(BENCH_DATA, object_data, sizeof(int)*3); + if (r <= 0) { + delete[] contentsChars; + if (r == -2) + cerr << "Must write data before running a read benchmark!" << std::endl; + return r; + } + bufferlist::iterator p = object_data.begin(); + ::decode(object_size, p); + ::decode(num_objects, p); + ::decode(prevPid, p); + } else { + object_size = op_size; + } + + lock.Lock(); + data.done = false; + data.object_size = object_size; + data.trans_size = op_size; + data.in_flight = 0; + data.started = 0; + data.finished = num_objects; + data.min_latency = 9999.0; // this better be higher than initial latency! + data.max_latency = 0; + data.avg_latency = 0; + data.object_contents = contentsChars; + lock.Unlock(); + + //fill in contentsChars deterministically so we can check returns + sanitize_object_contents(&data, data.object_size); + + if (OP_WRITE == operation) { + r = write_bench(secondsToRun, concurrentios); + if (r != 0) goto out; + } + else if (OP_SEQ_READ == operation) { + r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid); + if (r != 0) goto out; + } + else if (OP_RAND_READ == operation) { + cerr << "Random test not implemented yet!" << std::endl; + r = -1; + } + + out: + delete[] contentsChars; + return r; +} + +struct lock_cond { + lock_cond(Mutex *_lock) : lock(_lock) {} + Mutex *lock; + Cond cond; +}; + +void _aio_cb(void *cb, void *arg) { + struct lock_cond *lc = (struct lock_cond *)arg; + lc->lock->Lock(); + lc->cond.Signal(); + lc->lock->Unlock(); +} + +int ObjBencher::write_bench(int secondsToRun, int concurrentios) { + cout << "Maintaining " << concurrentios << " concurrent writes of " + << data.object_size << " bytes for at least " + << secondsToRun << " seconds." << std::endl; + + char* name[concurrentios]; + bufferlist* contents[concurrentios]; + double total_latency = 0; + utime_t start_times[concurrentios]; + utime_t stopTime; + int r = 0; + bufferlist b_write; + lock_cond lc(&lock); + utime_t runtime; + utime_t timePassed; + + r = completions_init(concurrentios); + + //set up writes so I can start them together + for (int i = 0; i<concurrentios; ++i) { + name[i] = new char[128]; + contents[i] = new bufferlist(); + generate_object_name(name[i], 128, i); + snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", i); + contents[i]->append(data.object_contents, data.object_size); + } + + pthread_t print_thread; + + pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this); + lock.Lock(); + data.start_time = ceph_clock_now(g_ceph_context); + lock.Unlock(); + for (int i = 0; i<concurrentios; ++i) { + start_times[i] = ceph_clock_now(g_ceph_context); + r = create_completion(i, _aio_cb, (void *)&lc); + if (r < 0) + goto ERR; + r = aio_write(name[i], i, *contents[i], data.object_size); + if (r < 0) { //naughty, doesn't clean up heap + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); + } + + //keep on adding new writes as old ones complete until we've passed minimum time + int slot; + bufferlist* newContents; + char* newName; + + //don't need locking for reads because other thread doesn't write + + runtime.set_from_double(secondsToRun); + stopTime = data.start_time + runtime; + slot = 0; + while( ceph_clock_now(g_ceph_context) < stopTime ) { + lock.Lock(); + bool found = false; + while (1) { + int old_slot = slot; + do { + if (completion_is_done(slot)) { + found = true; + break; + } + slot++; + if (slot == concurrentios) { + slot = 0; + } + } while (slot != old_slot); + if (found) + break; + lc.cond.Wait(lock); + } + lock.Unlock(); + //create new contents and name on the heap, and fill them + newContents = new bufferlist(); + newName = new char[128]; + generate_object_name(newName, 128, data.started); + snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", data.started); + newContents->append(data.object_contents, data.object_size); + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0) { + lock.Unlock(); + goto ERR; + } + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); + release_completion(slot); + timePassed = ceph_clock_now(g_ceph_context) - data.start_time; + + //write new stuff to backend, then delete old stuff + //and save locations of new stuff for later deletion + start_times[slot] = ceph_clock_now(g_ceph_context); + r = create_completion(slot, _aio_cb, &lc); + if (r < 0) + goto ERR; + r = aio_write(newName, slot, *newContents, data.object_size); + if (r < 0) {//naughty; doesn't clean up heap space. + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); + delete[] name[slot]; + delete contents[slot]; + name[slot] = newName; + contents[slot] = newContents; + } + + while (data.finished < data.started) { + slot = data.finished % concurrentios; + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0) { + lock.Unlock(); + goto ERR; + } + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); + release_completion(slot); + delete[] name[slot]; + delete contents[slot]; + } + + timePassed = ceph_clock_now(g_ceph_context) - data.start_time; + lock.Lock(); + data.done = true; + lock.Unlock(); + + pthread_join(print_thread, NULL); + + double bandwidth; + bandwidth = ((double)data.finished)*((double)data.object_size)/(double)timePassed; + bandwidth = bandwidth/(1024*1024); // we want it in MB/sec + char bw[20]; + snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); + + cout << "Total time run: " << timePassed << std::endl + << "Total writes made: " << data.finished << std::endl + << "Write size: " << data.object_size << std::endl + << "Bandwidth (MB/sec): " << bw << std::endl + << "Average Latency: " << data.avg_latency << std::endl + << "Max latency: " << data.max_latency << std::endl + << "Min latency: " << data.min_latency << std::endl; + + //write object size/number data for read benchmarks + ::encode(data.object_size, b_write); + ::encode(data.finished, b_write); + ::encode(getpid(), b_write); + sync_write(BENCH_DATA, b_write, sizeof(int)*3); + + completions_done(); + + return 0; + + ERR: + lock.Lock(); + data.done = 1; + lock.Unlock(); + pthread_join(print_thread, NULL); + return -5; +} + +int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) { + data.finished = 0; + + lock_cond lc(&lock); + char* name[concurrentios]; + bufferlist* contents[concurrentios]; + int index[concurrentios]; + int errors = 0; + utime_t start_time; + utime_t start_times[concurrentios]; + utime_t time_to_run; + time_to_run.set_from_double(seconds_to_run); + double total_latency = 0; + int r = 0; + utime_t runtime; + sanitize_object_contents(&data, data.object_size); //clean it up once; subsequent + //changes will be safe because string length should remain the same + + r = completions_init(concurrentios); + if (r < 0) + return r; + + //set up initial reads + for (int i = 0; i < concurrentios; ++i) { + name[i] = new char[128]; + generate_object_name(name[i], 128, i, pid); + contents[i] = new bufferlist(); + } + + pthread_t print_thread; + pthread_create(&print_thread, NULL, status_printer, (void *)this); + + lock.Lock(); + data.start_time = ceph_clock_now(g_ceph_context); + lock.Unlock(); + utime_t finish_time = data.start_time + time_to_run; + //start initial reads + for (int i = 0; i < concurrentios; ++i) { + index[i] = i; + start_times[i] = ceph_clock_now(g_ceph_context); + create_completion(i, _aio_cb, (void *)&lc); + r = aio_read(name[i], i, contents[i], data.object_size); + if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! + cerr << "r = " << r << std::endl; + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); + } + + //keep on adding new reads as old ones complete + int slot; + char* newName; + bufferlist *cur_contents; + + slot = 0; + while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) && + num_objects > data.started) { + lock.Lock(); + int old_slot = slot; + bool found = false; + while (1) { + do { + if (completion_is_done(slot)) { + found = true; + break; + } + slot++; + if (slot == concurrentios) { + slot = 0; + } + } while (slot != old_slot); + if (found) { + break; + } + lc.cond.Wait(lock); + } + lock.Unlock(); + newName = new char[128]; + generate_object_name(newName, 128, data.started, pid); + int current_index = index[slot]; + index[slot] = data.started; + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0) { + cerr << "read got " << r << std::endl; + lock.Unlock(); + goto ERR; + } + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); + release_completion(slot); + cur_contents = contents[slot]; + + //start new read and check data if requested + start_times[slot] = ceph_clock_now(g_ceph_context); + contents[slot] = new bufferlist(); + create_completion(slot, _aio_cb, (void *)&lc); + r = aio_read(newName, slot, contents[slot], data.object_size); + if (r < 0) { + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index); + lock.Unlock(); + if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) { + cerr << name[slot] << " is not correct!" << std::endl; + ++errors; + } + delete name[slot]; + name[slot] = newName; + delete cur_contents; + } + + //wait for final reads to complete + while (data.finished < data.started) { + slot = data.finished % concurrentios; + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0) { + cerr << "read got " << r << std::endl; + lock.Unlock(); + goto ERR; + } + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + release_completion(slot); + snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]); + lock.Unlock(); + if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) { + cerr << name[slot] << " is not correct!" << std::endl; + ++errors; + } + delete[] name[slot]; + delete contents[slot]; + } + + runtime = ceph_clock_now(g_ceph_context) - data.start_time; + lock.Lock(); + data.done = true; + lock.Unlock(); + + pthread_join(print_thread, NULL); + + double bandwidth; + bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime; + bandwidth = bandwidth/(1024*1024); // we want it in MB/sec + char bw[20]; + snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); + + cout << "Total time run: " << runtime << std::endl + << "Total reads made: " << data.finished << std::endl + << "Read size: " << data.object_size << std::endl + << "Bandwidth (MB/sec): " << bw << std::endl + << "Average Latency: " << data.avg_latency << std::endl + << "Max latency: " << data.max_latency << std::endl + << "Min latency: " << data.min_latency << std::endl; + + completions_done(); + + return 0; + + ERR: + lock.Lock(); + data.done = 1; + lock.Unlock(); + pthread_join(print_thread, NULL); + return -5; +} + + diff --git a/src/common/obj_bencher.h b/src/common/obj_bencher.h new file mode 100644 index 00000000000..d4105274d0e --- /dev/null +++ b/src/common/obj_bencher.h @@ -0,0 +1,72 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2009 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_OBJ_BENCHER_H +#define CEPH_OBJ_BENCHER_H + +#include "common/config.h" +#include "common/Cond.h" + +struct bench_data { + bool done; //is the benchmark is done + int object_size; //the size of the objects + int trans_size; //size of the write/read to perform + // same as object_size for write tests + int in_flight; //number of reads/writes being waited on + int started; + int finished; + double min_latency; + double max_latency; + double avg_latency; + utime_t cur_latency; //latency of last completed transaction + utime_t start_time; //start time for benchmark + char *object_contents; //pointer to the contents written to each object +}; + +const int OP_WRITE = 1; +const int OP_SEQ_READ = 2; +const int OP_RAND_READ = 3; + +class ObjBencher { +protected: + Mutex lock; + + static void *status_printer(void *bencher); + + struct bench_data data; + + int write_bench(int secondsToRun, int concurrentios); + int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid); + + virtual int completions_init(int concurrentios) = 0; + virtual void completions_done() = 0; + + virtual int create_completion(int i, void (*cb)(void *, void*), void *arg) = 0; + virtual void release_completion(int slot) = 0; + + virtual bool completion_is_done(int slot) = 0; + virtual int completion_wait(int slot) = 0; + virtual int completion_ret(int slot) = 0; + + virtual int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) = 0; + virtual int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len) = 0; + virtual int sync_read(const std::string& oid, bufferlist& bl, size_t len) = 0; + virtual int sync_write(const std::string& oid, bufferlist& bl, size_t len) = 0; +public: + ObjBencher() : lock("ObjBencher::lock") {} + virtual ~ObjBencher() {} + int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size); +}; + + +#endif diff --git a/src/libs3 b/src/libs3 new file mode 160000 +Subproject f049cb9840b38e96cb76f78242f067c154d1c66 diff --git a/src/osdc/rados_bencher.h b/src/osdc/rados_bencher.h deleted file mode 100644 index 3f0e3802a84..00000000000 --- a/src/osdc/rados_bencher.h +++ /dev/null @@ -1,571 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2009 Sage Weil <sage@newdream.net> - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - * Series of functions to test your rados installation. Notice - * that this code is not terribly robust -- for instance, if you - * try and bench on a pool you don't have permission to access - * it will just loop forever. - */ -#include "include/rados/librados.hpp" -#include "common/config.h" -#include "global/global_init.h" -#include "common/Cond.h" -#include <iostream> -#include <fstream> - -#include <stdlib.h> -#include <time.h> -#include <sstream> - -Mutex dataLock("data mutex"); -const int OP_WRITE = 1; -const int OP_SEQ_READ = 2; -const int OP_RAND_READ = 3; -const char *BENCH_DATA = "benchmark_write_data"; - -struct bench_data { - bool done; //is the benchmark is done - int object_size; //the size of the objects - int trans_size; //size of the write/read to perform - // same as object_size for write tests - int in_flight; //number of reads/writes being waited on - int started; - int finished; - double min_latency; - double max_latency; - double avg_latency; - utime_t cur_latency; //latency of last completed transaction - utime_t start_time; //start time for benchmark - char *object_contents; //pointer to the contents written to each object -}; - -void generate_object_name(char *s, size_t size, int objnum, int pid = 0) -{ - char hostname[30]; - gethostname(hostname, sizeof(hostname)-1); - hostname[sizeof(hostname)-1] = 0; - if (pid) { - snprintf(s, size, "%s_%d_object%d", hostname, pid, objnum); - } else { - snprintf(s, size, "%s_%d_object%d", hostname, getpid(), objnum); - } -} - -int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, - int secondsToRun, int concurrentios, bench_data *data); -int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, - int secondsToRun, int concurrentios, bench_data *data, - int writePid); -void *status_printer(void * data_store); -void sanitize_object_contents(bench_data *data, int length); - -int aio_bench(librados::Rados& rados, librados::IoCtx &io_ctx, int operation, - int secondsToRun, int concurrentios, int op_size) { - int object_size = op_size; - int num_objects = 0; - char* contentsChars = new char[op_size]; - int r = 0; - int prevPid = 0; - - //get data from previous write run, if available - if (operation != OP_WRITE) { - bufferlist object_data; - r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0); - if (r <= 0) { - delete[] contentsChars; - if (r == -2) - cerr << "Must write data before running a read benchmark!" << std::endl; - return r; - } - bufferlist::iterator p = object_data.begin(); - ::decode(object_size, p); - ::decode(num_objects, p); - ::decode(prevPid, p); - } else { - object_size = op_size; - } - - dataLock.Lock(); - bench_data *data = new bench_data(); - data->done = false; - data->object_size = object_size; - data->trans_size = op_size; - data->in_flight = 0; - data->started = 0; - data->finished = num_objects; - data->min_latency = 9999.0; // this better be higher than initial latency! - data->max_latency = 0; - data->avg_latency = 0; - data->object_contents = contentsChars; - dataLock.Unlock(); - - //fill in contentsChars deterministically so we can check returns - sanitize_object_contents(data, data->object_size); - - if (OP_WRITE == operation) { - r = write_bench(rados, io_ctx, secondsToRun, concurrentios, data); - if (r != 0) goto out; - } - else if (OP_SEQ_READ == operation) { - r = seq_read_bench(rados, io_ctx, secondsToRun, concurrentios, data, prevPid); - if (r != 0) goto out; - } - else if (OP_RAND_READ == operation) { - cerr << "Random test not implemented yet!" << std::endl; - r = -1; - } - - out: - delete[] contentsChars; - delete data; - return r; -} - -void _aio_cb(void *cb, void *arg) { - dataLock.Lock(); - Cond *cond = (Cond *) arg; - cond->Signal(); - dataLock.Unlock(); -} - -int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, - int secondsToRun, int concurrentios, bench_data *data) { - cout << "Maintaining " << concurrentios << " concurrent writes of " - << data->object_size << " bytes for at least " - << secondsToRun << " seconds." << std::endl; - - librados::AioCompletion* completions[concurrentios]; - char* name[concurrentios]; - bufferlist* contents[concurrentios]; - double total_latency = 0; - utime_t start_times[concurrentios]; - utime_t stopTime; - int r = 0; - bufferlist b_write; - Cond cond; - utime_t runtime; - utime_t timePassed; - - //set up writes so I can start them together - for (int i = 0; i<concurrentios; ++i) { - name[i] = new char[128]; - contents[i] = new bufferlist(); - generate_object_name(name[i], 128, i); - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i); - contents[i]->append(data->object_contents, data->object_size); - } - - pthread_t print_thread; - - pthread_create(&print_thread, NULL, status_printer, (void *)data); - dataLock.Lock(); - data->start_time = ceph_clock_now(g_ceph_context); - dataLock.Unlock(); - for (int i = 0; i<concurrentios; ++i) { - start_times[i] = ceph_clock_now(g_ceph_context); - completions[i] = rados.aio_create_completion((void *) &cond, 0, - &_aio_cb); - r = io_ctx.aio_write(name[i], completions[i], *contents[i], data->object_size, 0); - if (r < 0) { //naughty, doesn't clean up heap - goto ERR; - } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - dataLock.Unlock(); - } - - //keep on adding new writes as old ones complete until we've passed minimum time - int slot; - bufferlist* newContents; - char* newName; - - //don't need locking for reads because other thread doesn't write - - runtime.set_from_double(secondsToRun); - stopTime = data->start_time + runtime; - while( ceph_clock_now(g_ceph_context) < stopTime ) { - dataLock.Lock(); - while (1) { - for (slot = 0; slot < concurrentios; ++slot) { - if (completions[slot]->is_safe()) { - break; - } - } - if (slot < concurrentios) { - break; - } - cond.Wait(dataLock); - } - dataLock.Unlock(); - //create new contents and name on the heap, and fill them - newContents = new bufferlist(); - newName = new char[128]; - generate_object_name(newName, 128, data->started); - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->started); - newContents->append(data->object_contents, data->object_size); - completions[slot]->wait_for_safe(); - dataLock.Lock(); - r = completions[slot]->get_return_value(); - if (r != 0) { - dataLock.Unlock(); - goto ERR; - } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - dataLock.Unlock(); - completions[slot]->release(); - completions[slot] = 0; - timePassed = ceph_clock_now(g_ceph_context) - data->start_time; - - //write new stuff to rados, then delete old stuff - //and save locations of new stuff for later deletion - start_times[slot] = ceph_clock_now(g_ceph_context); - completions[slot] = rados.aio_create_completion((void *) &cond, 0, &_aio_cb); - r = io_ctx.aio_write(newName, completions[slot], *newContents, data->object_size, 0); - if (r < 0) {//naughty; doesn't clean up heap space. - goto ERR; - } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - dataLock.Unlock(); - delete[] name[slot]; - delete contents[slot]; - name[slot] = newName; - contents[slot] = newContents; - } - - while (data->finished < data->started) { - slot = data->finished % concurrentios; - completions[slot]->wait_for_safe(); - dataLock.Lock(); - r = completions[slot]->get_return_value(); - if (r != 0) { - dataLock.Unlock(); - goto ERR; - } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - dataLock.Unlock(); - completions[slot]->release(); - completions[slot] = 0; - delete[] name[slot]; - delete contents[slot]; - } - - timePassed = ceph_clock_now(g_ceph_context) - data->start_time; - dataLock.Lock(); - data->done = true; - dataLock.Unlock(); - - pthread_join(print_thread, NULL); - - double bandwidth; - bandwidth = ((double)data->finished)*((double)data->object_size)/(double)timePassed; - bandwidth = bandwidth/(1024*1024); // we want it in MB/sec - char bw[20]; - snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); - - cout << "Total time run: " << timePassed << std::endl - << "Total writes made: " << data->finished << std::endl - << "Write size: " << data->object_size << std::endl - << "Bandwidth (MB/sec): " << bw << std::endl - << "Average Latency: " << data->avg_latency << std::endl - << "Max latency: " << data->max_latency << std::endl - << "Min latency: " << data->min_latency << std::endl; - - //write object size/number data for read benchmarks - ::encode(data->object_size, b_write); - ::encode(data->finished, b_write); - ::encode(getpid(), b_write); - io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0); - return 0; - - ERR: - dataLock.Lock(); - data->done = 1; - dataLock.Unlock(); - pthread_join(print_thread, NULL); - return -5; -} - -int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_to_run, - int concurrentios, bench_data *write_data, int pid) { - bench_data *data = new bench_data(); - data->done = false; - data->object_size = write_data->object_size; - data->trans_size = data->object_size; - data->in_flight= 0; - data->started = 0; - data->finished = 0; - data->min_latency = 9999.0; - data->max_latency = 0; - data->avg_latency = 0; - data->object_contents = write_data->object_contents; - - Cond cond; - librados::AioCompletion* completions[concurrentios]; - char* name[concurrentios]; - bufferlist* contents[concurrentios]; - int index[concurrentios]; - int errors = 0; - utime_t start_time; - utime_t start_times[concurrentios]; - utime_t time_to_run; - time_to_run.set_from_double(seconds_to_run); - double total_latency = 0; - int r = 0; - utime_t runtime; - sanitize_object_contents(data, 128); //clean it up once; subsequent - //changes will be safe because string length monotonically increases - - //set up initial reads - for (int i = 0; i < concurrentios; ++i) { - name[i] = new char[128]; - generate_object_name(name[i], 128, i, pid); - contents[i] = new bufferlist(); - } - - pthread_t print_thread; - pthread_create(&print_thread, NULL, status_printer, (void *)data); - - dataLock.Lock(); - data->start_time = ceph_clock_now(g_ceph_context); - dataLock.Unlock(); - utime_t finish_time = data->start_time + time_to_run; - //start initial reads - for (int i = 0; i < concurrentios; ++i) { - index[i] = i; - start_times[i] = ceph_clock_now(g_ceph_context); - completions[i] = rados.aio_create_completion((void *) &cond, &_aio_cb, 0); - r = io_ctx.aio_read(name[i], completions[i], contents[i], data->object_size, 0); - if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! - cerr << "r = " << r << std::endl; - goto ERR; - } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - dataLock.Unlock(); - } - - //keep on adding new reads as old ones complete - int slot; - char* newName; - bufferlist *cur_contents; - - while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) && - write_data->finished > data->started) { - dataLock.Lock(); - while (1) { - for (slot = 0; slot < concurrentios; ++slot) { - if (completions[slot]->is_complete()) { - break; - } - } - if (slot < concurrentios) { - break; - } - cond.Wait(dataLock); - } - dataLock.Unlock(); - newName = new char[128]; - generate_object_name(newName, 128, data->started, pid); - int current_index = index[slot]; - index[slot] = data->started; - completions[slot]->wait_for_complete(); - dataLock.Lock(); - r = completions[slot]->get_return_value(); - if (r != 0) { - cerr << "read got " << r << std::endl; - dataLock.Unlock(); - goto ERR; - } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - dataLock.Unlock(); - completions[slot]->release(); - completions[slot] = 0; - cur_contents = contents[slot]; - - //start new read and check data if requested - start_times[slot] = ceph_clock_now(g_ceph_context); - contents[slot] = new bufferlist(); - completions[slot] = rados.aio_create_completion((void *) &cond, &_aio_cb, 0); - r = io_ctx.aio_read(newName, completions[slot], contents[slot], data->object_size, 0); - if (r < 0) { - goto ERR; - } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", current_index); - dataLock.Unlock(); - if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) { - cerr << name[slot] << " is not correct!" << std::endl; - ++errors; - } - delete name[slot]; - name[slot] = newName; - delete cur_contents; - } - - //wait for final reads to complete - while (data->finished < data->started) { - slot = data->finished % concurrentios; - completions[slot]->wait_for_complete(); - dataLock.Lock(); - r = completions[slot]->get_return_value(); - if (r != 0) { - cerr << "read got " << r << std::endl; - dataLock.Unlock(); - goto ERR; - } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - completions[slot]-> release(); - completions[slot] = 0; - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", index[slot]); - dataLock.Unlock(); - if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) { - cerr << name[slot] << " is not correct!" << std::endl; - ++errors; - } - delete name[slot]; - delete contents[slot]; - } - - runtime = ceph_clock_now(g_ceph_context) - data->start_time; - dataLock.Lock(); - data->done = true; - dataLock.Unlock(); - - pthread_join(print_thread, NULL); - - double bandwidth; - bandwidth = ((double)data->finished)*((double)data->object_size)/(double)runtime; - bandwidth = bandwidth/(1024*1024); // we want it in MB/sec - char bw[20]; - snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); - - cout << "Total time run: " << runtime << std::endl - << "Total reads made: " << data->finished << std::endl - << "Read size: " << data->object_size << std::endl - << "Bandwidth (MB/sec): " << bw << std::endl - << "Average Latency: " << data->avg_latency << std::endl - << "Max latency: " << data->max_latency << std::endl - << "Min latency: " << data->min_latency << std::endl; - - delete data; - return 0; - - ERR: - dataLock.Lock(); - data->done = 1; - dataLock.Unlock(); - pthread_join(print_thread, NULL); - return -5; -} - - - -void *status_printer(void * data_store) { - bench_data *data = (bench_data *) data_store; - Cond cond; - int i = 0; - int previous_writes = 0; - int cycleSinceChange = 0; - double avg_bandwidth; - double bandwidth; - utime_t ONE_SECOND; - ONE_SECOND.set_from_double(1.0); - dataLock.Lock(); - while(!data->done) { - if (i % 20 == 0) { - if (i > 0) - cout << "min lat: " << data->min_latency - << " max lat: " << data->max_latency - << " avg lat: " << data->avg_latency << std::endl; - //I'm naughty and don't reset the fill - cout << setfill(' ') - << setw(5) << "sec" - << setw(8) << "Cur ops" - << setw(10) << "started" - << setw(10) << "finished" - << setw(10) << "avg MB/s" - << setw(10) << "cur MB/s" - << setw(10) << "last lat" - << setw(10) << "avg lat" << std::endl; - } - bandwidth = (double)(data->finished - previous_writes) - * (data->trans_size) - / (1024*1024) - / cycleSinceChange; - avg_bandwidth = (double) (data->trans_size) * (data->finished) - / (double)(ceph_clock_now(g_ceph_context) - data->start_time) / (1024*1024); - if (previous_writes != data->finished) { - previous_writes = data->finished; - cycleSinceChange = 0; - cout << setfill(' ') - << setw(5) << i - << setw(8) << data->in_flight - << setw(10) << data->started - << setw(10) << data->finished - << setw(10) << avg_bandwidth - << setw(10) << bandwidth - << setw(10) << (double)data->cur_latency - << setw(10) << data->avg_latency << std::endl; - } - else { - cout << setfill(' ') - << setw(5) << i - << setw(8) << data->in_flight - << setw(10) << data->started - << setw(10) << data->finished - << setw(10) << avg_bandwidth - << setw(10) << '0' - << setw(10) << '-' - << setw(10) << data->avg_latency << std::endl; - } - ++i; - ++cycleSinceChange; - cond.WaitInterval(g_ceph_context, dataLock, ONE_SECOND); - } - dataLock.Unlock(); - return NULL; -} - -inline void sanitize_object_contents (bench_data *data, int length) { - for (int i = 0; i < length; ++i) { - data->object_contents[i] = i % sizeof(char); - } -} diff --git a/src/rados.cc b/src/rados.cc index 9f6959204c9..526a9e4e314 100644 --- a/src/rados.cc +++ b/src/rados.cc @@ -17,7 +17,7 @@ #include "include/rados/librados.hpp" using namespace librados; -#include "osdc/rados_bencher.h" +#include "common/obj_bencher.h" #include "common/config.h" #include "common/ceph_argparse.h" @@ -25,6 +25,7 @@ using namespace librados; #include "common/Cond.h" #include "common/debug.h" #include "common/Formatter.h" +#include "common/obj_bencher.h" #include "mds/inode_backtrace.h" #include "auth/Crypto.h" #include <iostream> @@ -577,6 +578,64 @@ void LoadGen::cleanup() } } + +class RadosBencher : public ObjBencher { + librados::AioCompletion **completions; + librados::Rados& rados; + librados::IoCtx& io_ctx; +protected: + int completions_init(int concurrentios) { + completions = new librados::AioCompletion *[concurrentios]; + return 0; + } + void completions_done() { + delete[] completions; + completions = NULL; + } + int create_completion(int slot, void (*cb)(void *, void*), void *arg) { + completions[slot] = rados.aio_create_completion((void *) arg, 0, cb); + + if (!completions[slot]) + return -EINVAL; + + return 0; + } + void release_completion(int slot) { + completions[slot]->release(); + completions[slot] = 0; + } + + int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) { + return io_ctx.aio_read(oid, completions[slot], pbl, len, 0); + } + + int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len) { + return io_ctx.aio_write(oid, completions[slot], bl, len, 0); + } + + int sync_read(const std::string& oid, bufferlist& bl, size_t len) { + return io_ctx.read(oid, bl, len, 0); + } + int sync_write(const std::string& oid, bufferlist& bl, size_t len) { + return io_ctx.write(oid, bl, len, 0); + } + + bool completion_is_done(int slot) { + return completions[slot]->is_safe(); + } + + int completion_wait(int slot) { + return completions[slot]->wait_for_safe(); + } + int completion_ret(int slot) { + return completions[slot]->get_return_value(); + } + +public: + RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : completions(NULL), rados(_r), io_ctx(_i) {} + ~RadosBencher() { } +}; + /********************************************** **********************************************/ @@ -1218,7 +1277,8 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts, operation = OP_RAND_READ; else usage_exit(); - ret = aio_bench(rados, io_ctx, operation, seconds, concurrent_ios, op_size); + RadosBencher bencher(rados, io_ctx); + ret = bencher.aio_bench(operation, seconds, concurrent_ios, op_size); if (ret != 0) cerr << "error during benchmark: " << ret << std::endl; } diff --git a/src/tools/rest_bench.cc b/src/tools/rest_bench.cc new file mode 100644 index 00000000000..05cda32c8d6 --- /dev/null +++ b/src/tools/rest_bench.cc @@ -0,0 +1,644 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "include/types.h" +#include "include/atomic.h" + +#include "common/obj_bencher.h" +#include "common/config.h" +#include "common/debug.h" +#include "common/ceph_argparse.h" +#include "common/WorkQueue.h" +#include "msg/Message.h" +#include "global/global_init.h" + +#include "libs3.h" + +#include <deque> + +#include <errno.h> + +#define DEFAULT_USER_AGENT "rest-bench" +#define DEFAULT_BUCKET "rest-bench-bucket" + +void usage(ostream& out) +{ + out << \ +"usage: rest-bench [options] <write|seq>\n" +"BENCHMARK OPTIONS\n" +" --seconds\n" +" benchmak length (default: 60)\n" +" -t concurrent_operations\n" +" --concurrent-ios=concurrent_operations\n" +" select bucket by name\n" +" -b op-size\n" +" --block-size=op-size\n" +" set the size of write ops for put or benchmarking\n" +"REST CONFIG OPTIONS\n" +" --api-host=bhost\n" +" host name\n" +" --bucket=bucket\n" +" select bucket by name\n" +" --access-key=access_key\n" +" access key to RESTful storage provider\n" +" --secret=secret_key\n" +" secret key for the specified access key\n" +" --protocol=<http|https>\n" +" protocol to be used (default: http)\n" +" --uri_style=<path|vhost>\n" +" uri style in requests (default: path)\n"; +} + +static void usage_exit() +{ + usage(cerr); + exit(1); +} + +enum OpType { + OP_NONE = 0, + OP_GET_OBJ = 1, + OP_PUT_OBJ = 2, +}; + +struct req_context : public RefCountedObject { + bool complete; + S3Status status; + S3RequestContext *ctx; + void (*cb)(void *, void *); + void *arg; + bufferlist *in_bl; + bufferlist out_bl; + uint64_t off; + uint64_t len; + string oid; + Mutex lock; + Cond cond; + S3BucketContext *bucket_ctx; + + bool should_destroy_ctx; + + OpType op; + + req_context() : complete(false), status(S3StatusOK), ctx(NULL), cb(NULL), arg(NULL), in_bl(NULL), off(0), len(0), + lock("req_context"), bucket_ctx(NULL), should_destroy_ctx(false), op(OP_NONE) {} + ~req_context() { + if (should_destroy_ctx) { + S3_destroy_request_context(ctx); + } + } + + int init_ctx() { + S3Status status = S3_create_request_context(&ctx); + if (status != S3StatusOK) { + cerr << "failed to create context: " << S3_get_status_name(status) << std::endl; + return -EINVAL; + } + should_destroy_ctx = true; + + return 0; + } + + int ret() { + if (status != S3StatusOK) { + return -EINVAL; + } + return 0; + } +}; + +static S3Status properties_callback(const S3ResponseProperties *properties, void *cb_data) +{ + return S3StatusOK; +} + +static void complete_callback(S3Status status, const S3ErrorDetails *details, void *cb_data) +{ + if (!cb_data) + return; + + struct req_context *ctx = (struct req_context *)cb_data; + + ctx->lock.Lock(); + ctx->complete = true; + ctx->status = status; + ctx->lock.Unlock(); + + if (ctx->cb) { + ctx->cb((void *)ctx->cb, ctx->arg); + } + + ctx->put(); +} + +static S3Status get_obj_callback(int size, const char *buf, + void *cb_data) +{ + if (!cb_data) + return S3StatusOK; + + struct req_context *ctx = (struct req_context *)cb_data; + + ctx->in_bl->append(buf, size); + + return S3StatusOK; +} + +static int put_obj_callback(int size, char *buf, + void *cb_data) +{ + if (!cb_data) + return 0; + + struct req_context *ctx = (struct req_context *)cb_data; + + int chunk = ctx->out_bl.length() - ctx->off; + if (!chunk) + return 0; + + if (chunk > size) + chunk = size; + + memcpy(buf, ctx->out_bl.c_str() + ctx->off, chunk); + + ctx->off += chunk; + + return chunk; +} + +class RESTDispatcher { + deque<req_context *> m_req_queue; + ThreadPool m_tp; + + S3ResponseHandler response_handler; + S3GetObjectHandler get_obj_handler; + S3PutObjectHandler put_obj_handler; + + struct DispatcherWQ : public ThreadPool::WorkQueue<req_context> { + RESTDispatcher *dispatcher; + DispatcherWQ(RESTDispatcher *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue<req_context>("REST", timeout, suicide_timeout, tp), dispatcher(p) {} + + bool _enqueue(req_context *req) { + dispatcher->m_req_queue.push_back(req); + _dump_queue(); + return true; + } + void _dequeue(req_context *req) { + assert(0); + } + bool _empty() { + return dispatcher->m_req_queue.empty(); + } + req_context *_dequeue() { + if (dispatcher->m_req_queue.empty()) + return NULL; + req_context *req = dispatcher->m_req_queue.front(); + dispatcher->m_req_queue.pop_front(); + _dump_queue(); + return req; + } + void _process(req_context *req) { + dispatcher->process_context(req); + } + void _dump_queue() { + deque<req_context *>::iterator iter; + if (dispatcher->m_req_queue.size() == 0) { + generic_dout(20) << "DispatcherWQ: empty" << dendl; + return; + } + generic_dout(20) << "DispatcherWQ:" << dendl; + for (iter = dispatcher->m_req_queue.begin(); iter != dispatcher->m_req_queue.end(); ++iter) { + generic_dout(20) << "req: " << hex << *iter << dec << dendl; + } + } + void _clear() { + assert(dispatcher->m_req_queue.empty()); + } + } req_wq; + +public: + RESTDispatcher(CephContext *cct, int num_threads) + : m_tp(cct, "RESTDispatcher::m_tp", num_threads), + req_wq(this, g_conf->rgw_op_thread_timeout, + g_conf->rgw_op_thread_suicide_timeout, &m_tp) { + + + response_handler.propertiesCallback = properties_callback; + response_handler.completeCallback = complete_callback; + + get_obj_handler.responseHandler = response_handler; + get_obj_handler.getObjectDataCallback = get_obj_callback; + + put_obj_handler.responseHandler = response_handler; + put_obj_handler.putObjectDataCallback = put_obj_callback; + + } + void process_context(req_context *ctx); + void get_obj(req_context *ctx); + void put_obj(req_context *ctx); + + void queue(req_context *ctx) { + req_wq.queue(ctx); + } + + void start() { + m_tp.start(); + } +}; + +void RESTDispatcher::process_context(req_context *ctx) +{ + ctx->get(); + + switch (ctx->op) { + case OP_GET_OBJ: + get_obj(ctx); + break; + case OP_PUT_OBJ: + put_obj(ctx); + break; + default: + assert(0); + } + + S3Status status = S3_runall_request_context(ctx->ctx); + + if (status != S3StatusOK) { + cerr << "ERROR: S3_runall_request_context() returned " << S3_get_status_name(status) << std::endl; + ctx->status = status; + } else if (ctx->status != S3StatusOK) { + cerr << "ERROR: " << ctx->oid << ": " << S3_get_status_name(ctx->status) << std::endl; + } + + ctx->lock.Lock(); + ctx->cond.SignalAll(); + ctx->lock.Unlock(); + + ctx->put(); +} + +void RESTDispatcher::put_obj(req_context *ctx) +{ + S3_put_object(ctx->bucket_ctx, ctx->oid.c_str(), + ctx->out_bl.length(), + NULL, + ctx->ctx, + &put_obj_handler, ctx); +} + +void RESTDispatcher::get_obj(req_context *ctx) +{ + S3_get_object(ctx->bucket_ctx, ctx->oid.c_str(), NULL, 0, ctx->len, ctx->ctx, + &get_obj_handler, ctx); +} + +class RESTBencher : public ObjBencher { + RESTDispatcher *dispatcher; + struct req_context **completions; + struct S3RequestContext **handles; + S3BucketContext bucket_ctx; + string user_agent; + string host; + string bucket; + S3Protocol protocol; + string access_key; + string secret; + int concurrentios; + +protected: + int rest_init() { + S3Status status = S3_initialize(user_agent.c_str(), S3_INIT_ALL, host.c_str()); + if (status != S3StatusOK) { + cerr << "failed to init: " << S3_get_status_name(status) << std::endl; + return -EINVAL; + } + + + return 0; + } + + + int completions_init(int _concurrentios) { + concurrentios = _concurrentios; + completions = new req_context *[concurrentios]; + handles = new S3RequestContext *[concurrentios]; + for (int i = 0; i < concurrentios; i++) { + completions[i] = NULL; + S3Status status = S3_create_request_context(&handles[i]); + if (status != S3StatusOK) { + cerr << "failed to create context: " << S3_get_status_name(status) << std::endl; + return -EINVAL; + } + } + return 0; + } + void completions_done() { + delete[] completions; + completions = NULL; + for (int i = 0; i < concurrentios; i++) { + S3_destroy_request_context(handles[i]); + } + delete[] handles; + handles = NULL; + } + int create_completion(int slot, void (*cb)(void *, void*), void *arg) { + struct req_context *ctx = new req_context; + ctx->ctx = handles[slot]; + ctx->cb = cb; + ctx->arg = arg; + + completions[slot] = ctx; + + return 0; + } + void release_completion(int slot) { + struct req_context *ctx = completions[slot]; + + ctx->put(); + completions[slot] = 0; + } + + int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) { + struct req_context *ctx = completions[slot]; + + ctx->get(); + ctx->in_bl = pbl; + ctx->oid = oid; + ctx->len = len; + ctx->bucket_ctx = &bucket_ctx; + ctx->op = OP_GET_OBJ; + + dispatcher->queue(ctx); + + return 0; + } + + int aio_write(const std::string& oid, int slot, bufferlist& bl, size_t len) { + struct req_context *ctx = completions[slot]; + + ctx->get(); + ctx->bucket_ctx = &bucket_ctx; + ctx->out_bl = bl; + ctx->oid = oid; + ctx->len = len; + ctx->op = OP_PUT_OBJ; + + dispatcher->queue(ctx); + return 0; + } + + int sync_read(const std::string& oid, bufferlist& bl, size_t len) { + struct req_context *ctx = new req_context; + int ret = ctx->init_ctx(); + if (ret < 0) { + return ret; + } + ctx->in_bl = &bl; + ctx->get(); + ctx->bucket_ctx = &bucket_ctx; + ctx->oid = oid; + ctx->len = len; + ctx->op = OP_GET_OBJ; + + dispatcher->process_context(ctx); + ret = ctx->ret(); + ctx->put(); + return bl.length(); + } + int sync_write(const std::string& oid, bufferlist& bl, size_t len) { + struct req_context *ctx = new req_context; + int ret = ctx->init_ctx(); + if (ret < 0) { + return ret; + } + ctx->get(); + ctx->out_bl = bl; + ctx->bucket_ctx = &bucket_ctx; + ctx->oid = oid; + ctx->op = OP_PUT_OBJ; + + dispatcher->process_context(ctx); + ret = ctx->ret(); + ctx->put(); + return ret; + } + + bool completion_is_done(int slot) { + return completions[slot]->complete; + } + + int completion_wait(int slot) { + req_context *ctx = completions[slot]; + + Mutex::Locker l(ctx->lock); + + while (!ctx->complete) { + ctx->cond.Wait(ctx->lock); + } + + return 0; + } + + int completion_ret(int slot) { + S3Status status = completions[slot]->status; + if (status != S3StatusOK) + return -EIO; + return 0; + } + +public: + RESTBencher(RESTDispatcher *_dispatcher) : dispatcher(_dispatcher), completions(NULL) { + dispatcher->start(); + } + ~RESTBencher() { } + + int init(string& _agent, string& _host, string& _bucket, S3Protocol _protocol, + S3UriStyle uri_style, string& _access_key, string& _secret) { + user_agent = _agent; + host = _host; + bucket = _bucket; + protocol = _protocol; + access_key = _access_key; + secret = _secret; + + bucket_ctx.hostName = NULL; // host.c_str(); + bucket_ctx.bucketName = bucket.c_str(); + bucket_ctx.protocol = protocol; + bucket_ctx.accessKeyId = access_key.c_str(); + bucket_ctx.secretAccessKey = secret.c_str(); + bucket_ctx.uriStyle = uri_style; + + struct req_context *ctx = new req_context; + + int ret = rest_init(); + if (ret < 0) { + return ret; + } + + ret = ctx->init_ctx(); + if (ret < 0) { + return ret; + } + + ctx->get(); + + S3ResponseHandler response_handler; + response_handler.propertiesCallback = properties_callback; + response_handler.completeCallback = complete_callback; + + S3_create_bucket(protocol, access_key.c_str(), secret.c_str(), NULL, + bucket.c_str(), S3CannedAclPrivate, + NULL, /* locationConstraint */ + NULL, /* requestContext */ + &response_handler, /* handler */ + (void *)ctx /* callbackData */); + + ret = ctx->ret(); + if (ret < 0) { + cerr << "ERROR: failed to create bucket: " << S3_get_status_name(ctx->status) << std::endl; + return ret; + } + + ctx->put(); + + return 0; + } +}; + +int main(int argc, const char **argv) +{ + vector<const char*> args; + argv_to_vec(argc, argv, args); + env_to_vec(args); + + global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); + + std::map < std::string, std::string > opts; + std::vector<const char*>::iterator i; + std::string host; + std::string val; + std::string user_agent; + std::string access_key; + std::string secret; + std::string bucket = DEFAULT_BUCKET; + S3Protocol protocol = S3ProtocolHTTP; + S3UriStyle uri_style = S3UriStylePath; + std::string proto_str; + int concurrent_ios = 16; + int op_size = 1 << 22; + int seconds = 60; + + + for (i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { + usage(cout); + exit(0); + } else if (ceph_argparse_witharg(args, i, &user_agent, "--agent", (char*)NULL)) { + /* nothing */ + } else if (ceph_argparse_witharg(args, i, &access_key, "--access-key", (char*)NULL)) { + /* nothing */ + } else if (ceph_argparse_witharg(args, i, &secret, "--secret", (char*)NULL)) { + /* nothing */ + } else if (ceph_argparse_witharg(args, i, &bucket, "--bucket", (char*)NULL)) { + /* nothing */ + } else if (ceph_argparse_witharg(args, i, &host, "--api-host", (char*)NULL)) { + cerr << "host=" << host << std::endl; + /* nothing */ + } else if (ceph_argparse_witharg(args, i, &proto_str, "--protocol", (char*)NULL)) { + if (strcasecmp(proto_str.c_str(), "http") == 0) { + protocol = S3ProtocolHTTP; + } else if (strcasecmp(proto_str.c_str(), "http") == 0) { + protocol = S3ProtocolHTTPS; + } else { + cerr << "bad protocol" << std::endl; + usage_exit(); + } + /* nothing */ + } else if (ceph_argparse_witharg(args, i, &proto_str, "--uri-style", (char*)NULL)) { + if (strcasecmp(proto_str.c_str(), "vhost") == 0) { + uri_style = S3UriStyleVirtualHost; + } else if (strcasecmp(proto_str.c_str(), "path") == 0) { + uri_style = S3UriStylePath; + } else { + cerr << "bad protocol" << std::endl; + usage_exit(); + } + } else if (ceph_argparse_witharg(args, i, &val, "-t", "--concurrent-ios", (char*)NULL)) { + concurrent_ios = strtol(val.c_str(), NULL, 10); + } else if (ceph_argparse_witharg(args, i, &val, "--seconds", (char*)NULL)) { + seconds = strtol(val.c_str(), NULL, 10); + } else if (ceph_argparse_witharg(args, i, &val, "-b", "--block-size", (char*)NULL)) { + op_size = strtol(val.c_str(), NULL, 10); + } else { + if (val[0] == '-') + usage_exit(); + i++; + } + } + + if (bucket.empty()) { + cerr << "rest-bench: bucket not specified" << std::endl; + usage_exit(); + } + if (args.size() < 1) + usage_exit(); + int operation = 0; + if (strcmp(args[0], "write") == 0) + operation = OP_WRITE; + else if (strcmp(args[0], "seq") == 0) + operation = OP_SEQ_READ; + else if (strcmp(args[0], "rand") == 0) + operation = OP_RAND_READ; + else + usage_exit(); + + if (host.empty()) { + cerr << "rest-bench: api host not provided." << std::endl; + usage_exit(); + } + + if (access_key.empty() || secret.empty()) { + cerr << "rest-bench: access key or secret was not provided" << std::endl; + usage_exit(); + } + + if (bucket.empty()) { + bucket = DEFAULT_BUCKET; + } + + if (user_agent.empty()) + user_agent = DEFAULT_USER_AGENT; + + RESTDispatcher dispatcher(g_ceph_context, concurrent_ios); + + RESTBencher bencher(&dispatcher); + + int ret = bencher.init(user_agent, host, bucket, protocol, uri_style, access_key, secret); + if (ret < 0) { + cerr << "failed initializing benchmark" << std::endl; + exit(1); + } + + ret = bencher.aio_bench(operation, seconds, concurrent_ios, op_size); + if (ret != 0) { + cerr << "error during benchmark: " << ret << std::endl; + } + + return 0; +} + |