# Copyright 2023 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
The configuration module provides various functions to set and get the supported
configuration parameters.
Common imported modules in corresponding API examples are as follows:
.. code-block::
from mindspore.mindrecord import set_enc_key, set_enc_mode, set_dec_mode
"""
import os
import shutil
import stat
import time
from mindspore import log as logger
from mindspore._c_expression import _encrypt, _decrypt_data
from .shardutils import MIN_FILE_SIZE
__all__ = ['set_enc_key',
'set_enc_mode',
'set_dec_mode']
# default encode key
ENC_KEY = None
ENC_MODE = "AES-GCM"
DEC_MODE = None
HASH_MODE = None
# the final mindrecord after encode should be like below
# 1. for create new mindrecord
# mindrecord -> enc_mindrecord+'ENCRYPT'
# 2. for read mindrecord
# enc_mindrecord+'ENCRYPT' -> mindrecord
# mindrecord file encode end flag, we will append 'ENCRYPT' to the end of file
ENCRYPT_END_FLAG = str('ENCRYPT').encode('utf-8')
# directory which stored decrypt mindrecord files
DECRYPT_DIRECTORY = ".decrypt_mindrecord"
DECRYPT_DIRECTORY_LIST = []
# time for warning when encrypt/decrypt takes too long time
ENCRYPT_TIME = 0
DECRYPT_TIME = 0
WARNING_INTERVAL = 30 # 30s
[docs]def set_enc_key(enc_key):
"""
Set the encode key.
Args:
enc_key (str): Str-type key used for encryption. The valid length is 16, 24, or 32.
``None`` indicates that encryption is not enabled.
Raises:
ValueError: The input is not str or length error.
Examples:
>>> from mindspore.mindrecord import set_enc_key
>>>
>>> set_enc_key("0123456789012345")
"""
global ENC_KEY
if enc_key is None:
ENC_KEY = None
return
if not isinstance(enc_key, str):
raise ValueError("The input enc_key is not str.")
if len(enc_key) not in [16, 24, 32]:
raise ValueError("The length of input enc_key is not 16, 24, 32.")
ENC_KEY = enc_key
def _get_enc_key():
"""Get the encode key. If the enc_key is not set, it will return ``None``."""
global ENC_KEY
return ENC_KEY
[docs]def set_enc_mode(enc_mode="AES-GCM"):
"""
Set the encode mode.
Args:
enc_mode (Union[str, function], optional): This parameter is valid only when enc_key is not set to ``None`` .
Specifies the encryption mode or customized encryption function, currently supports ``"AES-GCM"`` .
Default: ``"AES-GCM"`` . If it is customized encryption, users need
to ensure its correctness, the security of the encryption algorithm and raise exceptions when errors occur.
Raises:
ValueError: The input is not valid encode mode or callable function.
Examples:
>>> from mindspore.mindrecord import set_enc_mode
>>>
>>> set_enc_mode("AES-GCM")
"""
global ENC_MODE
if callable(enc_mode):
ENC_MODE = enc_mode
return
if not isinstance(enc_mode, str):
raise ValueError("The input enc_mode is not str.")
if enc_mode not in ["AES-GCM"]:
raise ValueError("The input enc_mode is invalid.")
ENC_MODE = enc_mode
def _get_enc_mode():
"""Get the encode mode. If the enc_mode is not set, it will return default encode mode ``"AES-GCM"``."""
global ENC_MODE
return ENC_MODE
[docs]def set_dec_mode(dec_mode="AES-GCM"):
"""
Set the decode mode.
If the built-in `enc_mode` is used and `dec_mode` is not specified, the encryption algorithm specified by `enc_mode`
is used for decryption. If you are using customized encryption function, you must specify customized decryption
function at read time.
Args:
dec_mode (Union[str, function], optional): This parameter is valid only when enc_key is not set to ``None`` .
Specifies the decryption mode or customized decryption function, currently supports ``"AES-GCM"`` .
Default: ``"AES-GCM"`` . ``None`` indicates that decryption
mode is not defined. If it is customized decryption, users need to ensure its correctness and raise
exceptions when errors occur.
Raises:
ValueError: The input is not valid decode mode or callable function.
Examples:
>>> from mindspore.mindrecord import set_dec_mode
>>>
>>> set_dec_mode("AES-GCM")
"""
global DEC_MODE
if dec_mode is None:
DEC_MODE = None
return
if callable(dec_mode):
DEC_MODE = dec_mode
return
if not isinstance(dec_mode, str):
raise ValueError("The input dec_mode is not str.")
if dec_mode not in ["AES-GCM"]:
raise ValueError("The input dec_mode is invalid.")
DEC_MODE = dec_mode
def _get_dec_mode():
"""Get the decode mode. If the dec_mode is not set, it will return encode mode."""
global ENC_MODE
global DEC_MODE
if DEC_MODE is None:
if callable(ENC_MODE):
raise RuntimeError("You use custom encryption, so you must also define custom decryption.")
return ENC_MODE
return DEC_MODE
def _get_enc_mode_as_str():
"""Get the encode mode as string. The length of mode should be 7."""
global ENC_MODE
valid_enc_mode = ""
if callable(ENC_MODE):
valid_enc_mode = "UDF-ENC" # "UDF-ENC"
else:
valid_enc_mode = ENC_MODE
if len(valid_enc_mode) != 7:
raise RuntimeError("The length of enc_mode string is not 7.")
return str(valid_enc_mode).encode('utf-8')
def _get_dec_mode_as_str():
"""Get the decode mode as string. The length of mode should be 7."""
global ENC_MODE
global DEC_MODE
valid_dec_mode = ""
if DEC_MODE is None:
if callable(ENC_MODE):
raise RuntimeError("You use custom encryption, so you must also define custom decryption.")
valid_dec_mode = ENC_MODE # "AES-GCM"
elif callable(DEC_MODE):
valid_dec_mode = "UDF-ENC" # "UDF-ENC"
else:
valid_dec_mode = DEC_MODE
if len(valid_dec_mode) != 7:
raise RuntimeError("The length of enc_mode string is not 7.")
return str(valid_dec_mode).encode('utf-8')
def encrypt(filename, enc_key, enc_mode):
"""Encrypt the file and the original file will be deleted"""
if not os.path.exists(filename):
raise RuntimeError("The input: {} is not exists.".format(filename))
if not os.path.isfile(filename):
raise RuntimeError("The input: {} should be a regular file.".format(filename))
logger.info("Begin to encrypt file: {}.".format(filename))
start = time.time()
offset = 64 * 1024 * 1024 ## read the offset 64M
current_offset = 0 ## use this to seek file
file_size = os.path.getsize(filename)
f = open(filename, 'rb')
# create new encrypt file
encrypt_filename = filename + ".encrypt"
f_encrypt = open(encrypt_filename, 'wb')
try:
if callable(enc_mode):
enc_mode(f, file_size, f_encrypt, enc_key)
else:
# read the file with offset and do encrypt
# original mindrecord file like:
# |64M|64M|64M|64M|...
# encrypted mindrecord file like:
# len+encrypt_data|len+encrypt_data|len+encrypt_data|...|0|enc_mode|ENCRYPT_END_FLAG
while True:
if file_size - current_offset >= offset:
read_size = offset
elif file_size - current_offset > 0:
read_size = file_size - current_offset
else:
# have read the entire file
break
try:
f.seek(current_offset)
except Exception as e: # pylint: disable=W0703
f.close()
f_encrypt.close()
raise RuntimeError("Seek the file: {} to position: {} failed. Error: {}"
.format(filename, current_offset, str(e)))
data = f.read(read_size)
encode_data = _encrypt(data, len(data), enc_key, len(enc_key), enc_mode)
# write length of data to encrypt file
f_encrypt.write(int(len(encode_data)).to_bytes(length=4, byteorder='big', signed=True))
# write data to encrypt file
f_encrypt.write(encode_data)
current_offset += read_size
except Exception as e:
f.close()
f_encrypt.close()
os.chmod(encrypt_filename, stat.S_IRUSR | stat.S_IWUSR)
raise e
f.close()
# writing 0 at the end indicates that all encrypted data has been written.
f_encrypt.write(int(0).to_bytes(length=4, byteorder='big', signed=True))
# write enc_mode
f_encrypt.write(_get_enc_mode_as_str())
# write ENCRYPT_END_FLAG
f_encrypt.write(ENCRYPT_END_FLAG)
f_encrypt.close()
end = time.time()
global ENCRYPT_TIME
ENCRYPT_TIME += end - start
if ENCRYPT_TIME > WARNING_INTERVAL:
logger.warning("It takes another " + str(WARNING_INTERVAL) + "s to encrypt the mindrecord file.")
ENCRYPT_TIME = ENCRYPT_TIME - WARNING_INTERVAL
# change the file mode
os.chmod(encrypt_filename, stat.S_IRUSR | stat.S_IWUSR)
# move the encrypt file to origin file
shutil.move(encrypt_filename, filename)
return True
def _get_encrypt_end_flag(filename):
"""get encrypt end flag from the file"""
if not os.path.exists(filename):
raise RuntimeError("The input: {} is not exists.".format(filename))
if not os.path.isfile(filename):
raise RuntimeError("The input: {} should be a regular file.".format(filename))
# get the file size first
file_size = os.path.getsize(filename)
offset = file_size - len(ENCRYPT_END_FLAG)
f = open(filename, 'rb')
# get the encrypt end flag which is 'ENCRYPT'
try:
f.seek(offset)
except Exception as e: # pylint: disable=W0703
f.close()
raise RuntimeError("Seek the file: {} to position: {} failed. Error: {}".format(filename, offset, str(e)))
data = f.read(len(ENCRYPT_END_FLAG))
f.close()
return data
def _get_enc_mode_from_file(filename):
"""get encrypt end flag from the file"""
if not os.path.exists(filename):
raise RuntimeError("The input: {} is not exists.".format(filename))
if not os.path.isfile(filename):
raise RuntimeError("The input: {} should be a regular file.".format(filename))
# get the file size first
file_size = os.path.getsize(filename)
offset = file_size - len(ENCRYPT_END_FLAG) - 7
f = open(filename, 'rb')
# get the encrypt end flag which is 'ENCRYPT'
try:
f.seek(offset)
except Exception as e: # pylint: disable=W0703
f.close()
raise RuntimeError("Seek the file: {} to position: {} failed. Error: {}".format(filename, offset, str(e)))
# read the enc_mode str which length is 7
data = f.read(7)
f.close()
return data
def decrypt(filename, enc_key, dec_mode):
"""decrypt the file by enc_key and dec_mode"""
if not os.path.exists(filename):
raise RuntimeError("The input: {} is not exists.".format(filename))
if not os.path.isfile(filename):
raise RuntimeError("The input: {} should be a regular file.".format(filename))
whole_file_size = os.path.getsize(filename)
if whole_file_size < MIN_FILE_SIZE:
raise RuntimeError("Invalid file, the size of mindrecord file: " + str(whole_file_size) +
" is smaller than the lower limit: " + str(MIN_FILE_SIZE) +
".\n Please check file path: " + filename +
" and use 'FileWriter' to generate valid mindrecord files.")
global DECRYPT_DIRECTORY_LIST
# check ENCRYPT_END_FLAG
stored_encrypt_end_flag = _get_encrypt_end_flag(filename)
if _get_enc_key() is not None:
if stored_encrypt_end_flag != ENCRYPT_END_FLAG:
raise RuntimeError("The mindrecord file is not encrypted. You can set " +
"'mindspore.mindrecord.config.set_enc_key(None)' to disable the decryption.")
else:
if stored_encrypt_end_flag == ENCRYPT_END_FLAG:
raise RuntimeError("The mindrecord file is encrypted. You need to configure " +
"'mindspore.mindrecord.config.set_enc_key(...)' and " +
"'mindspore.mindrecord.config.set_enc_mode(...)' for decryption.")
return filename
# check dec_mode with enc_mode
enc_mode_from_file = _get_enc_mode_from_file(filename)
if enc_mode_from_file != _get_dec_mode_as_str():
raise RuntimeError("Failed to decrypt data, please check if enc_key and enc_mode / dec_mode is valid.")
logger.info("Begin to decrypt file: {}.".format(filename))
start = time.time()
file_size = os.path.getsize(filename) - len(ENCRYPT_END_FLAG)
f = open(filename, 'rb')
real_path_filename = os.path.realpath(filename)
parent_dir = os.path.dirname(real_path_filename)
only_filename = os.path.basename(real_path_filename)
current_decrypt_dir = os.path.join(parent_dir, DECRYPT_DIRECTORY)
if not os.path.exists(current_decrypt_dir):
os.mkdir(current_decrypt_dir)
os.chmod(current_decrypt_dir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
logger.info("Create directory: {} to store decrypt mindrecord files."
.format(os.path.join(parent_dir, DECRYPT_DIRECTORY)))
if current_decrypt_dir not in DECRYPT_DIRECTORY_LIST:
DECRYPT_DIRECTORY_LIST.append(current_decrypt_dir)
logger.warning("The decrypt mindrecord file will be stored in [" + current_decrypt_dir + "] directory. "
"If you don't use it anymore after train / eval, you need to delete it manually.")
# create new decrypt file
decrypt_filename = os.path.join(current_decrypt_dir, only_filename)
if os.path.isfile(decrypt_filename):
# the file which had been decrypted early maybe update by user, so we remove the old decrypted one
os.remove(decrypt_filename)
f_decrypt = open(decrypt_filename, 'wb+')
try:
if callable(dec_mode):
dec_mode(f, file_size, f_decrypt, enc_key)
else:
# read the file and do decrypt
# encrypted mindrecord file like:
# len+encrypt_data|len+encrypt_data|len+encrypt_data|...|0|enc_mode|ENCRYPT_END_FLAG
current_offset = 0 ## use this to seek file
length = int().from_bytes(f.read(4), byteorder='big', signed=True)
while length != 0:
# current_offset is the encrypted data
current_offset += 4
try:
f.seek(current_offset)
except Exception as e: # pylint: disable=W0703
f.close()
raise RuntimeError("Seek the file: {} to position: {} failed. Error: {}"
.format(filename, current_offset, str(e)))
data = f.read(length)
decode_data = _decrypt_data(data, len(data), enc_key, len(enc_key), dec_mode)
if decode_data is None:
raise RuntimeError("Failed to decrypt data, " +
"please check if enc_key and enc_mode / dec_mode is valid.")
# write to decrypt file
f_decrypt.write(decode_data)
# current_offset is the length of next encrypted data block
current_offset += length
try:
f.seek(current_offset)
except Exception as e: # pylint: disable=W0703
f.close()
raise RuntimeError("Seek the file: {} to position: {} failed. Error: {}"
.format(filename, current_offset, str(e)))
length = int().from_bytes(f.read(4), byteorder='big', signed=True)
except Exception as e:
f.close()
f_decrypt.close()
os.chmod(decrypt_filename, stat.S_IRUSR | stat.S_IWUSR)
raise e
f.close()
f_decrypt.close()
end = time.time()
global DECRYPT_TIME
DECRYPT_TIME += end - start
if DECRYPT_TIME > WARNING_INTERVAL:
logger.warning("It takes another " + str(WARNING_INTERVAL) + "s to decrypt the mindrecord file.")
DECRYPT_TIME = DECRYPT_TIME - WARNING_INTERVAL
# change the file mode
os.chmod(decrypt_filename, stat.S_IRUSR | stat.S_IWUSR)
return decrypt_filename