1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import json
from ..utils import check_resource, minimum_version
class NetworkApiMixin(object):
@minimum_version('1.21')
def networks(self, names=None, ids=None):
filters = {}
if names:
filters['name'] = names
if ids:
filters['id'] = ids
params = {'filters': json.dumps(filters)}
url = self._url("/networks")
res = self._get(url, params=params)
return self._result(res, json=True)
@minimum_version('1.21')
def create_network(self, name, driver=None, options=None, ipam=None):
if options is not None and not isinstance(options, dict):
raise TypeError('options must be a dictionary')
data = {
'name': name,
'driver': driver,
'options': options,
'ipam': ipam,
}
url = self._url("/networks/create")
res = self._post_json(url, data=data)
return self._result(res, json=True)
@minimum_version('1.21')
def remove_network(self, net_id):
url = self._url("/networks/{0}", net_id)
res = self._delete(url)
self._raise_for_status(res)
@minimum_version('1.21')
def inspect_network(self, net_id):
url = self._url("/networks/{0}", net_id)
res = self._get(url)
return self._result(res, json=True)
@check_resource
@minimum_version('1.21')
def connect_container_to_network(self, container, net_id, aliases=None):
data = {
"Container": container,
"EndpointConfig": {
"Aliases": aliases,
},
}
url = self._url("/networks/{0}/connect", net_id)
self._post_json(url, data=data)
@check_resource
@minimum_version('1.21')
def disconnect_container_from_network(self, container, net_id):
data = {"container": container}
url = self._url("/networks/{0}/disconnect", net_id)
self._post_json(url, data=data)
|