forked from minio/minio-py
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsse.py
More file actions
111 lines (88 loc) · 3.36 KB
/
Copy pathsse.py
File metadata and controls
111 lines (88 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage,
# (C) 2018 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
minio.sse
~~~~~~~~~~~~~~~~~~~
This module contains core API parsers.
:copyright: (c) 2018 by MinIO, Inc.
:license: Apache 2.0, see LICENSE for more details.
"""
from __future__ import absolute_import, annotations
import base64
import json
from abc import ABCMeta, abstractmethod
from typing import Any, cast
class Sse:
"""Server-side encryption base class."""
__metaclass__ = ABCMeta
@abstractmethod
def headers(self) -> dict[str, str]:
"""Return headers."""
def tls_required(self) -> bool: # pylint: disable=no-self-use
"""Return TLS required to use this server-side encryption."""
return True
def copy_headers(self) -> dict[str, str]: # pylint: disable=no-self-use
"""Return copy headers."""
return {}
class SseCustomerKey(Sse):
""" Server-side encryption - customer key type."""
def __init__(self, key: bytes):
if len(key) != 32:
raise ValueError(
"SSE-C keys need to be 256 bit base64 encoded",
)
b64key = base64.b64encode(key).decode()
from .helpers import \
md5sum_hash # pylint: disable=import-outside-toplevel
md5key = cast(str, md5sum_hash(key))
self._headers: dict[str, str] = {
"X-Amz-Server-Side-Encryption-Customer-Algorithm": "AES256",
"X-Amz-Server-Side-Encryption-Customer-Key": b64key,
"X-Amz-Server-Side-Encryption-Customer-Key-MD5": md5key,
}
self._copy_headers: dict[str, str] = {
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Algorithm":
"AES256",
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key": b64key,
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key-MD5":
md5key,
}
def headers(self) -> dict[str, str]:
return self._headers.copy()
def copy_headers(self) -> dict[str, str]:
return self._copy_headers.copy()
class SseKMS(Sse):
"""Server-side encryption - KMS type."""
def __init__(self, key: str, context: dict[str, Any]):
self._headers = {
"X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id": key,
"X-Amz-Server-Side-Encryption": "aws:kms"
}
if context:
data = bytes(json.dumps(context), "utf-8")
self._headers["X-Amz-Server-Side-Encryption-Context"] = (
base64.b64encode(data).decode()
)
def headers(self) -> dict[str, str]:
return self._headers.copy()
class SseS3(Sse):
"""Server-side encryption - S3 type."""
def headers(self) -> dict[str, str]:
return {
"X-Amz-Server-Side-Encryption": "AES256"
}
def tls_required(self) -> bool:
return False