1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vi:ts=4:et
# Note: this test is meant to be run from pycurl project root.
import pycurl
import unittest
import os.path
from . import procmgr, localhost, util
setup_module, teardown_module = procmgr.vsftpd_setup()
class PartialFileSource:
def __init__(self):
self.__buf = '1234567890.1234567890'
self.__maxread = None
self.__bufptr = 0
def read(self, size):
p = self.__bufptr
end = p+size
if self.__maxread:
end = min(self.__maxread, end)
ret = self.__buf[p:end]
self.__bufptr+= len(ret)
#print 20*">>>", "read(%s) ==> %s" % (size, len(ret))
return ret
def seek(self, offset, origin):
#print 20*">>>", "seek(%s, %s)" % (offset, origin)
self.__bufptr = offset
def set_maxread(self, maxread):
self.__maxread = maxread
class SeekCbTest(unittest.TestCase):
def test_seek_function(self):
c = util.DefaultCurl()
c.setopt(pycurl.UPLOAD, 1)
c.setopt(pycurl.URL, "ftp://%s:8321/tests/tmp/upload.txt" % localhost)
c.setopt(pycurl.RESUME_FROM, 0)
#c.setopt(pycurl.VERBOSE, 1)
upload_file = PartialFileSource()
c.setopt(pycurl.READFUNCTION, upload_file.read)
upload_file.set_maxread(10)
c.perform()
f = open(os.path.join(os.path.dirname(__file__), 'tmp', 'upload.txt'))
try:
content = f.read()
finally:
f.close()
self.assertEqual('1234567890', content)
c.close()
del c
del upload_file
c = util.DefaultCurl()
c.setopt(pycurl.URL, "ftp://%s:8321/tests/tmp/upload.txt" % localhost)
c.setopt(pycurl.RESUME_FROM, -1)
c.setopt(pycurl.UPLOAD, 1)
#c.setopt(pycurl.VERBOSE, 1)
upload_file = PartialFileSource()
c.setopt(pycurl.READFUNCTION, upload_file.read)
c.setopt(pycurl.SEEKFUNCTION, upload_file.seek)
c.perform()
c.close()
f = open(os.path.join(os.path.dirname(__file__), 'tmp', 'upload.txt'))
try:
content = f.read()
finally:
f.close()
self.assertEqual('1234567890.1234567890', content)
|