diff --git a/apps/api/plane/license/utils/encryption.py b/apps/api/plane/license/utils/encryption.py index 8f43167c15a..029d6e6000e 100644 --- a/apps/api/plane/license/utils/encryption.py +++ b/apps/api/plane/license/utils/encryption.py @@ -1,17 +1,28 @@ -# Copyright (c) 2023-present Plane Software, Inc. and contributors -# SPDX-License-Identifier: AGPL-3.0-only -# See the LICENSE file for details. - +# Python imports import base64 import hashlib +import hmac +import logging from django.conf import settings from cryptography.fernet import Fernet from plane.utils.exception_logger import log_exception +logger = logging.getLogger("plane") + def derive_key(secret_key): - # Use a key derivation function to get a suitable encryption key + # Derive a deployment-specific salt via HMAC with a fixed application label. + # Using HMAC(key=secret_key, msg=b'plane:kdf:v2') separates the salt from + # the stretched password, preventing trivial precomputation even when an + # attacker knows the label, and ensures per-deployment uniqueness. + salt = hmac.new(secret_key.encode(), b"plane:kdf:v2", hashlib.sha256).digest() + dk = hashlib.pbkdf2_hmac("sha256", secret_key.encode(), salt, 100000) + return base64.urlsafe_b64encode(dk) + + +def derive_key_legacy(secret_key): + # Legacy key derivation using hardcoded salt — kept for backward compatibility dk = hashlib.pbkdf2_hmac("sha256", secret_key.encode(), b"salt", 100000) return base64.urlsafe_b64encode(dk) @@ -21,8 +32,8 @@ def encrypt_data(data): try: if data: cipher_suite = Fernet(derive_key(settings.SECRET_KEY)) - encrypted_data = cipher_suite.encrypt(data.encode()) - return encrypted_data.decode() # Convert bytes to string + encrypted_data = cipher_suite.encrypt(data.encode()) # Convert string to bytes + return encrypted_data.decode() else: return "" except Exception as e: @@ -32,13 +43,42 @@ def encrypt_data(data): # Decrypt data def decrypt_data(encrypted_data): + if not encrypted_data: + return "" + # Try current key derivation try: - if encrypted_data: - cipher_suite = Fernet(derive_key(settings.SECRET_KEY)) - decrypted_data = cipher_suite.decrypt(encrypted_data.encode()) # Convert string back to bytes - return decrypted_data.decode() - else: - return "" + cipher_suite = Fernet(derive_key(settings.SECRET_KEY)) + return cipher_suite.decrypt(encrypted_data.encode()).decode() + except Exception: + pass + # Fallback to legacy key derivation + try: + cipher_suite = Fernet(derive_key_legacy(settings.SECRET_KEY)) + plaintext = cipher_suite.decrypt(encrypted_data.encode()).decode() + logger.warning( + "Decrypted value using legacy key derivation. " + "Value should be re-encrypted with current scheme." + ) + return plaintext except Exception as e: log_exception(e) return "" + + +def decrypt_data_with_status(encrypted_data): + """Returns (plaintext, used_legacy) tuple.""" + if not encrypted_data: + return "", False + try: + cipher_suite = Fernet(derive_key(settings.SECRET_KEY)) + return cipher_suite.decrypt(encrypted_data.encode()).decode(), False + except Exception: + pass + try: + cipher_suite = Fernet(derive_key_legacy(settings.SECRET_KEY)) + plaintext = cipher_suite.decrypt(encrypted_data.encode()).decode() + logger.warning("Decrypted value using legacy key derivation.") + return plaintext, True + except Exception as e: + log_exception(e) + return "", False diff --git a/apps/api/plane/license/utils/instance_value.py b/apps/api/plane/license/utils/instance_value.py index 279eb217777..22344cbcc37 100644 --- a/apps/api/plane/license/utils/instance_value.py +++ b/apps/api/plane/license/utils/instance_value.py @@ -1,7 +1,3 @@ -# Copyright (c) 2023-present Plane Software, Inc. and contributors -# SPDX-License-Identifier: AGPL-3.0-only -# See the LICENSE file for details. - # Python imports import os @@ -10,50 +6,45 @@ # Module imports from plane.license.models import InstanceConfiguration -from plane.license.utils.encryption import decrypt_data +from plane.license.utils.encryption import decrypt_data_with_status, encrypt_data # Helper function to return value from the passed key def get_configuration_value(keys): environment_list = [] + if settings.SKIP_ENV_VAR: - # Get the configurations - instance_configuration = InstanceConfiguration.objects.values("key", "value", "is_encrypted") + instance_configuration = InstanceConfiguration.objects.filter( + key__in=[key.get("key") for key in keys] + ).values("key", "value", "is_encrypted") + + # Build a lookup dict for quick access + config_map = {item.get("key"): item for item in instance_configuration} for key in keys: - for item in instance_configuration: - if key.get("key") == item.get("key"): - if item.get("is_encrypted", False): - environment_list.append(decrypt_data(item.get("value"))) - else: - environment_list.append(item.get("value")) - - break + item = config_map.get(key.get("key")) + if item is not None: + if item.get("is_encrypted", False): + plaintext, used_legacy = decrypt_data_with_status(item.get("value")) + if used_legacy and plaintext: + try: + new_encrypted = encrypt_data(plaintext) + if new_encrypted: + # Optimistic-concurrency guard: only update if the + # stored ciphertext still matches what we read. + InstanceConfiguration.objects.filter( + key=key.get("key"), + value=item.get("value"), + ).update(value=new_encrypted) + except Exception: + pass + environment_list.append(plaintext) + else: + environment_list.append(item.get("value")) else: environment_list.append(key.get("default")) else: - # Get the configuration from os for key in keys: environment_list.append(os.environ.get(key.get("key"), key.get("default"))) - return tuple(environment_list) - - -def get_email_configuration(): - return get_configuration_value( - [ - {"key": "EMAIL_HOST", "default": os.environ.get("EMAIL_HOST")}, - {"key": "EMAIL_HOST_USER", "default": os.environ.get("EMAIL_HOST_USER")}, - { - "key": "EMAIL_HOST_PASSWORD", - "default": os.environ.get("EMAIL_HOST_PASSWORD"), - }, - {"key": "EMAIL_PORT", "default": os.environ.get("EMAIL_PORT", 587)}, - {"key": "EMAIL_USE_TLS", "default": os.environ.get("EMAIL_USE_TLS", "1")}, - {"key": "EMAIL_USE_SSL", "default": os.environ.get("EMAIL_USE_SSL", "0")}, - { - "key": "EMAIL_FROM", - "default": os.environ.get("EMAIL_FROM", "Team Plane "), - }, - ] - ) + return environment_list diff --git a/apps/api/plane/tests/unit/utils/test_encryption.py b/apps/api/plane/tests/unit/utils/test_encryption.py new file mode 100644 index 00000000000..709856b00e1 --- /dev/null +++ b/apps/api/plane/tests/unit/utils/test_encryption.py @@ -0,0 +1,188 @@ +# Python imports +import os +import sys + +import django +import pytest + +# Ensure the apps/api directory is on the path so we can import plane modules +repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..")) +api_root = os.path.join(repo_root, "apps", "api") +if api_root not in sys.path: + sys.path.insert(0, api_root) + +# Configure Django settings before importing plane modules +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.test") + +# Minimal Django settings setup for tests that don't need a full Django stack +try: + django.setup() +except Exception: + # If Django setup fails (e.g., no settings module), configure minimal settings + from django.conf import settings as django_settings + if not django_settings.configured: + django_settings.configure( + SECRET_KEY="test-secret-key-for-unit-tests-only", + INSTALLED_APPS=[], + DATABASES={}, + ) + +from plane.license.utils.encryption import ( + decrypt_data, + decrypt_data_with_status, + derive_key, + derive_key_legacy, + encrypt_data, +) + + +@pytest.fixture(autouse=True) +def set_secret_key(settings): + """Override SECRET_KEY for each test.""" + settings.SECRET_KEY = "test-secret-key-for-unit-tests" + + +class TestDeriveKey: + def test_different_secret_keys_produce_different_derived_keys(self): + """Different SECRET_KEY values must produce different derived keys.""" + key1 = derive_key("secret_key_one") + key2 = derive_key("secret_key_two") + assert key1 != key2 + + def test_same_secret_key_produces_same_derived_key(self): + """Same SECRET_KEY must always produce the same derived key (deterministic KDF).""" + key1 = derive_key("my_secret_key") + key2 = derive_key("my_secret_key") + assert key1 == key2 + + def test_new_and_legacy_keys_differ_for_same_secret(self): + """New KDF and legacy KDF must produce different keys for the same secret.""" + new_key = derive_key("some_secret") + legacy_key = derive_key_legacy("some_secret") + assert new_key != legacy_key + + +class TestEncryptData: + def test_encrypt_returns_non_empty_string(self, settings): + settings.SECRET_KEY = "test-secret-key" + result = encrypt_data("hello world") + assert result + assert isinstance(result, str) + + def test_encrypt_empty_string_returns_empty(self, settings): + settings.SECRET_KEY = "test-secret-key" + result = encrypt_data("") + assert result == "" + + def test_encrypt_none_returns_empty(self, settings): + settings.SECRET_KEY = "test-secret-key" + result = encrypt_data(None) + assert result == "" + + def test_encrypt_produces_different_ciphertexts_each_call(self, settings): + """Fernet uses a random IV, so the same plaintext encrypted twice must differ.""" + settings.SECRET_KEY = "test-secret-key" + plaintext = "repeated_sensitive_value" + ciphertexts = {encrypt_data(plaintext) for _ in range(5)} + # If all 5 encryptions produce the same ciphertext, the scheme is deterministic + # which is a security weakness — this MUST fail, not just warn. + assert len(ciphertexts) > 1, ( + "Deterministic encryption detected: all 5 encryptions of the same plaintext " + "produced identical ciphertexts. The encryption scheme must use a random IV/nonce." + ) + + +class TestDecryptData: + def test_decrypt_empty_string_returns_empty(self, settings): + settings.SECRET_KEY = "test-secret-key" + result = decrypt_data("") + assert result == "" + + def test_decrypt_none_returns_empty(self, settings): + settings.SECRET_KEY = "test-secret-key" + result = decrypt_data(None) + assert result == "" + + def test_roundtrip_integrity(self, settings): + """Encryption/decryption roundtrip must preserve data integrity.""" + settings.SECRET_KEY = "test-secret-key-for-roundtrip" + payloads = [ + "normal_value", + "special!@#$%^&*()chars", + "unicode_value_\u00e9\u00e0\u00fc", + "a" * 1000, + '{"json": "value", "nested": {"key": "val"}}', + ] + for payload in payloads: + encrypted = encrypt_data(payload) + decrypted = decrypt_data(encrypted) + assert decrypted == payload, f"Roundtrip failed for payload: {payload!r}" + + def test_wrong_key_returns_empty(self, settings): + """Decrypting with a different key must return empty string, not raise.""" + settings.SECRET_KEY = "original-secret-key" + encrypted = encrypt_data("sensitive_data") + + settings.SECRET_KEY = "completely-different-key" + result = decrypt_data(encrypted) + assert result == "" + + +class TestDecryptDataWithStatus: + def test_returns_tuple(self, settings): + settings.SECRET_KEY = "test-secret-key" + result = decrypt_data_with_status("") + assert isinstance(result, tuple) + assert len(result) == 2 + + def test_empty_input_returns_false_legacy(self, settings): + settings.SECRET_KEY = "test-secret-key" + plaintext, used_legacy = decrypt_data_with_status("") + assert plaintext == "" + assert used_legacy is False + + def test_current_encryption_not_legacy(self, settings): + """Values encrypted with the current scheme must not be flagged as legacy.""" + settings.SECRET_KEY = "test-secret-key" + encrypted = encrypt_data("my_value") + plaintext, used_legacy = decrypt_data_with_status(encrypted) + assert plaintext == "my_value" + assert used_legacy is False + + def test_legacy_encryption_detected(self, settings): + """Values encrypted with the legacy scheme must be flagged as legacy.""" + from cryptography.fernet import Fernet + + settings.SECRET_KEY = "test-secret-key" + # Encrypt using the legacy key derivation directly + legacy_key = derive_key_legacy(settings.SECRET_KEY) + legacy_cipher = Fernet(legacy_key) + legacy_encrypted = legacy_cipher.encrypt(b"legacy_value").decode() + + plaintext, used_legacy = decrypt_data_with_status(legacy_encrypted) + assert plaintext == "legacy_value" + assert used_legacy is True + + +class TestKeyIsolation: + @pytest.mark.parametrize("secret_key", [ + "secret1", + "secret2", + "a-]different-key!@#$", + ]) + def test_different_keys_produce_different_ciphertexts(self, settings, secret_key): + """Different SECRET_KEY values must produce different encrypted outputs + for the same plaintext, ensuring the key derivation provides uniqueness.""" + plaintext = "sensitive_data_12345" + other_key = secret_key + "_different" + + settings.SECRET_KEY = secret_key + ct1 = encrypt_data(plaintext) + + settings.SECRET_KEY = other_key + ct2 = encrypt_data(plaintext) + + assert ct1 != ct2, ( + "Different secret keys produced identical ciphertexts — " + "key derivation is not properly differentiating keys" + )