summaryrefslogtreecommitdiff
path: root/tests/test_sftp_big.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_sftp_big.py')
-rw-r--r--tests/test_sftp_big.py99
1 files changed, 49 insertions, 50 deletions
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 4643bcaa..d2d30f24 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -42,24 +42,23 @@ class TestBigSFTP(object):
numfiles = 100
try:
for i in range(numfiles):
- with sftp.open(
- "%s/file%d.txt" % (sftp.FOLDER, i), "w", 1
- ) as f:
- f.write("this is file #%d.\n" % i)
- sftp.chmod("%s/file%d.txt" % (sftp.FOLDER, i), o660)
+ target = f"{sftp.FOLDER}/file{i}.txt"
+ with sftp.open(target, "w", 1) as f:
+ f.write(f"this is file #{i}.\n")
+ sftp.chmod(target, o660)
# now make sure every file is there, by creating a list of filenmes
# and reading them in random order.
numlist = list(range(numfiles))
while len(numlist) > 0:
r = numlist[random.randint(0, len(numlist) - 1)]
- with sftp.open("%s/file%d.txt" % (sftp.FOLDER, r)) as f:
- assert f.readline() == "this is file #%d.\n" % r
+ with sftp.open(f"{sftp.FOLDER}/file{r}.txt") as f:
+ assert f.readline() == f"this is file #{r}.\n"
numlist.remove(r)
finally:
for i in range(numfiles):
try:
- sftp.remove("%s/file%d.txt" % (sftp.FOLDER, i))
+ sftp.remove(f"{sftp.FOLDER}/file{i}.txt")
except:
pass
@@ -70,7 +69,7 @@ class TestBigSFTP(object):
kblob = 1024 * b"x"
start = time.time()
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "w") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "w") as f:
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
@@ -78,21 +77,21 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
end = time.time()
- sys.stderr.write("%ds " % round(end - start))
+ sys.stderr.write(f"{round(end - start)}s")
start = time.time()
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "r") as f:
for n in range(1024):
data = f.read(1024)
assert data == kblob
end = time.time()
- sys.stderr.write("%ds " % round(end - start))
+ sys.stderr.write(f"{round(end - start)}s")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_big_file_pipelined(self, sftp):
"""
@@ -101,7 +100,7 @@ class TestBigSFTP(object):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
start = time.time()
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -110,13 +109,13 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
end = time.time()
- sys.stderr.write("%ds " % round(end - start))
+ sys.stderr.write(f"{round(end - start)}s")
start = time.time()
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
@@ -135,14 +134,14 @@ class TestBigSFTP(object):
n += chunk
end = time.time()
- sys.stderr.write("%ds " % round(end - start))
+ sys.stderr.write(f"{round(end - start)}s")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_prefetch_seek(self, sftp):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -151,14 +150,14 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
base_offset = (512 * 1024) + 17 * random.randint(
@@ -175,14 +174,14 @@ class TestBigSFTP(object):
assert data == k2blob[n_offset : n_offset + chunk]
offset += chunk
end = time.time()
- sys.stderr.write("%ds " % round(end - start))
+ sys.stderr.write(f"{round(end - start)}s")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_readv_seek(self, sftp):
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -191,14 +190,14 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f:
base_offset = (512 * 1024) + 17 * random.randint(
1000, 2000
)
@@ -215,9 +214,9 @@ class TestBigSFTP(object):
n_offset = offset % 1024
assert next(ret) == k2blob[n_offset : n_offset + chunk]
end = time.time()
- sys.stderr.write("%ds " % round(end - start))
+ sys.stderr.write(f"{round(end - start)}s")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_lots_of_prefetching(self, sftp):
"""
@@ -226,7 +225,7 @@ class TestBigSFTP(object):
"""
kblob = 1024 * b"x"
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "w") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "w") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -235,14 +234,14 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
for i in range(10):
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "r") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "r") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
for n in range(1024):
@@ -252,7 +251,7 @@ class TestBigSFTP(object):
sys.stderr.write(".")
sys.stderr.write(" ")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_prefetch_readv(self, sftp):
"""
@@ -260,7 +259,7 @@ class TestBigSFTP(object):
"""
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -269,10 +268,10 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
data = f.read(1024)
@@ -293,7 +292,7 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_large_readv(self, sftp):
"""
@@ -302,7 +301,7 @@ class TestBigSFTP(object):
"""
kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -311,10 +310,10 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
- with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
+ with sftp.open(f"{sftp.FOLDER}/hongry.txt", "rb") as f:
data = list(f.readv([(23 * 1024, 128 * 1024)]))
assert len(data) == 1
data = data[0]
@@ -322,7 +321,7 @@ class TestBigSFTP(object):
sys.stderr.write(" ")
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_big_file_big_buffer(self, sftp):
"""
@@ -331,15 +330,15 @@ class TestBigSFTP(object):
mblob = 1024 * 1024 * "x"
try:
with sftp.open(
- "%s/hongry.txt" % sftp.FOLDER, "w", 128 * 1024
+ f"{sftp.FOLDER}/hongry.txt", "w", 128 * 1024
) as f:
f.write(mblob)
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
def test_big_file_renegotiate(self, sftp):
"""
@@ -350,19 +349,19 @@ class TestBigSFTP(object):
k32blob = 32 * 1024 * "x"
try:
with sftp.open(
- "%s/hongry.txt" % sftp.FOLDER, "w", 128 * 1024
+ f"{sftp.FOLDER}/hongry.txt", "w", 128 * 1024
) as f:
for i in range(32):
f.write(k32blob)
assert (
- sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ sftp.stat(f"{sftp.FOLDER}/hongry.txt").st_size == 1024 * 1024
)
assert t.H != t.session_id
# try to read it too.
with sftp.open(
- "%s/hongry.txt" % sftp.FOLDER, "r", 128 * 1024
+ f"{sftp.FOLDER}/hongry.txt", "r", 128 * 1024
) as f:
file_size = f.stat().st_size
f.prefetch(file_size)
@@ -370,5 +369,5 @@ class TestBigSFTP(object):
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
finally:
- sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+ sftp.remove(f"{sftp.FOLDER}/hongry.txt")
t.packetizer.REKEY_BYTES = pow(2, 30)