forked from minio/minio-py
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathminioadmin.py
More file actions
297 lines (248 loc) · 10 KB
/
Copy pathminioadmin.py
File metadata and controls
297 lines (248 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage,
# (C) 2021 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-public-methods
"""MinIO Admin wrapper using MinIO Client (mc) tool."""
from __future__ import absolute_import
import json
import subprocess
class MinioAdmin:
"""MinIO Admin wrapper using MinIO Client (mc) tool."""
def __init__(
self, target,
binary_path=None, config_dir=None, ignore_cert_check=False,
timeout=None, env=None,
):
self._target = target
self._timeout = timeout
self._env = env
self._base_args = [binary_path or "mc", "--json"]
if config_dir:
self._base_args += ["--config-dir", config_dir]
if ignore_cert_check:
self._base_args.append("--insecure")
self._base_args.append("admin")
def _run(self, args, multiline=False):
"""Execute mc command and return JSON output."""
proc = subprocess.run(
self._base_args + args,
capture_output=True,
timeout=self._timeout,
env=self._env,
check=True,
text=True,
)
if not proc.stdout:
return [] if multiline else {}
if multiline:
return [json.loads(line) for line in proc.stdout.splitlines()]
return json.loads(proc.stdout)
def service_restart(self):
"""Restart MinIO service."""
return self._run(["service", "restart", self._target])
def service_stop(self):
"""Stop MinIO service."""
return self._run(["service", "stop", self._target])
def update(self):
"""Update MinIO."""
return self._run(["update", self._target])
def info(self):
"""Get MinIO server information."""
return self._run(["info", self._target])
def user_add(self, access_key, secret_key):
"""Add a new user."""
return self._run(["user", "add", self._target, access_key, secret_key])
def user_disable(self, access_key):
"""Disable user."""
return self._run(["user", "disable", self._target, access_key])
def user_enable(self, access_key):
"""Enable user."""
return self._run(["user", "enable", self._target, access_key])
def user_remove(self, access_key):
"""Remove user."""
return self._run(["user", "remove", self._target, access_key])
def user_info(self, access_key):
"""Get user information."""
return self._run(["user", "info", self._target, access_key])
def user_list(self):
"""List users."""
return self._run(["user", "list", self._target], multiline=True)
def group_add(self, group_name, members):
"""Add users a new or existing group."""
return self._run(["group", "add", self._target, group_name] + members)
def group_disable(self, group_name):
"""Disable group."""
return self._run(["group", "disable", self._target, group_name])
def group_enable(self, group_name):
"""Enable group."""
return self._run(["group", "enable", self._target, group_name])
def group_remove(self, group_name, members=None):
"""Remove group or members from a group."""
return self._run(
["group", "remove", self._target, group_name] + (members or []),
)
def group_info(self, group_name):
"""Get group information."""
return self._run(["group", "info", self._target, group_name])
def group_list(self):
"""List groups."""
return self._run(["group", "list", self._target], multiline=True)
def policy_add(self, policy_name, policy_file):
"""Add new policy."""
return self._run(
["policy", "add", self._target, policy_name, policy_file],
)
def policy_remove(self, policy_name):
"""Remove policy."""
return self._run(["policy", "remove", self._target, policy_name])
def policy_info(self, policy_name):
"""Get policy information."""
return self._run(["policy", "info", self._target, policy_name])
def policy_list(self):
"""List policies."""
return self._run(["policy", "list", self._target], multiline=True)
def policy_set(self, policy_name, user=None, group=None):
"""Set IAM policy on a user or group."""
if (user is not None) ^ (group is not None):
return self._run(
[
"policy", "set", self._target, policy_name,
("user=" if user else "group=") + (user or group),
],
)
raise ValueError("either user or group must be set")
def policy_unset(self, policy_name, user=None, group=None):
"""Unset an IAM policy for a user or group."""
if (user is not None) ^ (group is not None):
return self._run(
[
"policy", "unset", self._target, policy_name,
("user=" if user else "group=") + (user or group),
],
)
raise ValueError("either user or group must be set")
def config_get(self, key=None):
"""Get configuration parameters."""
return self._run(
["config", "get", self._target] + [key] if key else [],
key,
)
def config_set(self, key, config):
"""Set configuration parameters."""
args = [name + "=" + value for name, value in config.items()]
return self._run(["config", "set", self._target, key] + args)
def config_reset(self, key, name=None):
"""Reset configuration parameters."""
if name:
key += ":" + name
return self._run(["config", "reset", self._target, key])
def config_remove(self, access_key):
"""Remove config."""
return self._run(["config", "remove", self._target, access_key])
def config_history(self):
"""Get historic configuration changes."""
return self._run(["config", "history", self._target], multiline=True)
def config_restore(self, restore_id):
"""Restore to a specific configuration history."""
return self._run(["config", "restore", self._target, restore_id])
def profile_start(self, profilers=()):
"""Start recording profile data."""
args = ["profile", "start"]
if profilers:
args += ["--type", ",".join(profilers)]
args.append(self._target)
return self._run(args)
def profile_stop(self):
"""Stop and download profile data."""
return self._run(["profile", "stop", self._target])
def top_locks(self):
"""Get a list of the 10 oldest locks on a MinIO cluster."""
return self._run(["top", "locks", self._target], multiline=True)
def prometheus_generate(self):
"""Generate prometheus configuration."""
return self._run(["prometheus", "generate", self._target])
def kms_key_create(self, key=None):
"""Create a new KMS master key."""
return self._run(
[
"kms", "key", "create", self._target, key
] + ([key] if key else []),
)
def kms_key_status(self, key=None):
"""Get status information of a KMS master key."""
return self._run(
[
"kms", "key", "status", self._target, key
] + ([key] if key else []),
)
def bucket_remote_add(
self, src_bucket, dest_url,
path=None, region=None, bandwidth=None, service=None,
):
"""Add a new remote target."""
args = [
"bucket", "remote", "add", self._target + "/" + src_bucket,
dest_url, "--service", service or "replication",
]
if path:
args += ["--path", path]
if region:
args += ["--region", region]
if bandwidth:
args += ["--bandwidth", bandwidth]
return self._run(args)
def bucket_remote_edit(self, src_bucket, dest_url, arn):
"""Edit credentials of remote target."""
return self._run(
[
"bucket", "remote", "edit", self._target + "/" + src_bucket,
dest_url, "--arn", arn,
],
)
def bucket_remote_list(self, src_bucket=None, service=None):
"""List remote targets."""
return self._run(
[
"bucket", "remote", "ls",
self._target + ("/" + src_bucket if src_bucket else ""),
"--service", service or "replication",
],
)
def bucket_remote_remove(self, src_bucket, arn):
"""Remove configured remote target."""
return self._run(
[
"bucket", "remote", "rm", self._target + "/" + src_bucket,
"--arn", arn,
],
)
def bucket_quota_set(self, bucket, fifo=None, hard=None):
"""Set bucket quota configuration."""
if fifo is None and hard is None:
raise ValueError("fifo or hard must be set")
args = ["bucket", "quota", self._target + "/" + bucket]
if fifo:
args += ["--fifo", fifo]
if hard:
args += ["--hard", hard]
return self._run(args)
def bucket_quota_clear(self, bucket):
"""Clear bucket quota configuration."""
return self._run(
["bucket", "quota", self._target + "/" + bucket, "--clear"],
)
def bucket_quota_get(self, bucket):
"""Get bucket quota configuration."""
return self._run(["bucket", "quota", self._target + "/" + bucket])