Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions apps/api/plane/license/utils/encryption.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

# Python imports
import base64
import hashlib
import hmac
import logging
from django.conf import settings
from cryptography.fernet import Fernet

from plane.utils.exception_logger import log_exception

logger = logging.getLogger("plane")


def derive_key(secret_key):
# Use a key derivation function to get a suitable encryption key
# Derive a deployment-specific salt via HMAC with a fixed application label.
# Using HMAC(key=secret_key, msg=b'plane:kdf:v2') separates the salt from
# the stretched password, preventing trivial precomputation even when an
# attacker knows the label, and ensures per-deployment uniqueness.
salt = hmac.new(secret_key.encode(), b"plane:kdf:v2", hashlib.sha256).digest()
dk = hashlib.pbkdf2_hmac("sha256", secret_key.encode(), salt, 100000)
return base64.urlsafe_b64encode(dk)


def derive_key_legacy(secret_key):
# Legacy key derivation using hardcoded salt — kept for backward compatibility
dk = hashlib.pbkdf2_hmac("sha256", secret_key.encode(), b"salt", 100000)
return base64.urlsafe_b64encode(dk)

Expand All @@ -21,8 +32,8 @@ def encrypt_data(data):
try:
if data:
cipher_suite = Fernet(derive_key(settings.SECRET_KEY))
encrypted_data = cipher_suite.encrypt(data.encode())
return encrypted_data.decode() # Convert bytes to string
encrypted_data = cipher_suite.encrypt(data.encode()) # Convert string to bytes
return encrypted_data.decode()
else:
return ""
except Exception as e:
Expand All @@ -32,13 +43,42 @@ def encrypt_data(data):

# Decrypt data
def decrypt_data(encrypted_data):
if not encrypted_data:
return ""
# Try current key derivation
try:
if encrypted_data:
cipher_suite = Fernet(derive_key(settings.SECRET_KEY))
decrypted_data = cipher_suite.decrypt(encrypted_data.encode()) # Convert string back to bytes
return decrypted_data.decode()
else:
return ""
cipher_suite = Fernet(derive_key(settings.SECRET_KEY))
return cipher_suite.decrypt(encrypted_data.encode()).decode()
except Exception:
pass
# Fallback to legacy key derivation
try:
cipher_suite = Fernet(derive_key_legacy(settings.SECRET_KEY))
plaintext = cipher_suite.decrypt(encrypted_data.encode()).decode()
logger.warning(
"Decrypted value using legacy key derivation. "
"Value should be re-encrypted with current scheme."
)
return plaintext
except Exception as e:
log_exception(e)
return ""


def decrypt_data_with_status(encrypted_data):
"""Returns (plaintext, used_legacy) tuple."""
if not encrypted_data:
return "", False
try:
cipher_suite = Fernet(derive_key(settings.SECRET_KEY))
return cipher_suite.decrypt(encrypted_data.encode()).decode(), False
except Exception:
pass
try:
cipher_suite = Fernet(derive_key_legacy(settings.SECRET_KEY))
plaintext = cipher_suite.decrypt(encrypted_data.encode()).decode()
logger.warning("Decrypted value using legacy key derivation.")
return plaintext, True
except Exception as e:
log_exception(e)
return "", False
65 changes: 28 additions & 37 deletions apps/api/plane/license/utils/instance_value.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

# Python imports
import os

Expand All @@ -10,50 +6,45 @@

# Module imports
from plane.license.models import InstanceConfiguration
from plane.license.utils.encryption import decrypt_data
from plane.license.utils.encryption import decrypt_data_with_status, encrypt_data


# Helper function to return value from the passed key
def get_configuration_value(keys):
environment_list = []

if settings.SKIP_ENV_VAR:
# Get the configurations
instance_configuration = InstanceConfiguration.objects.values("key", "value", "is_encrypted")
instance_configuration = InstanceConfiguration.objects.filter(
key__in=[key.get("key") for key in keys]
).values("key", "value", "is_encrypted")

# Build a lookup dict for quick access
config_map = {item.get("key"): item for item in instance_configuration}

for key in keys:
for item in instance_configuration:
if key.get("key") == item.get("key"):
if item.get("is_encrypted", False):
environment_list.append(decrypt_data(item.get("value")))
else:
environment_list.append(item.get("value"))

break
item = config_map.get(key.get("key"))
if item is not None:
if item.get("is_encrypted", False):
plaintext, used_legacy = decrypt_data_with_status(item.get("value"))
if used_legacy and plaintext:
try:
new_encrypted = encrypt_data(plaintext)
if new_encrypted:
# Optimistic-concurrency guard: only update if the
# stored ciphertext still matches what we read.
InstanceConfiguration.objects.filter(
key=key.get("key"),
value=item.get("value"),
).update(value=new_encrypted)
except Exception:
pass
environment_list.append(plaintext)
else:
environment_list.append(item.get("value"))
else:
environment_list.append(key.get("default"))
else:
# Get the configuration from os
for key in keys:
environment_list.append(os.environ.get(key.get("key"), key.get("default")))

return tuple(environment_list)


def get_email_configuration():
return get_configuration_value(
[
{"key": "EMAIL_HOST", "default": os.environ.get("EMAIL_HOST")},
{"key": "EMAIL_HOST_USER", "default": os.environ.get("EMAIL_HOST_USER")},
{
"key": "EMAIL_HOST_PASSWORD",
"default": os.environ.get("EMAIL_HOST_PASSWORD"),
},
{"key": "EMAIL_PORT", "default": os.environ.get("EMAIL_PORT", 587)},
{"key": "EMAIL_USE_TLS", "default": os.environ.get("EMAIL_USE_TLS", "1")},
{"key": "EMAIL_USE_SSL", "default": os.environ.get("EMAIL_USE_SSL", "0")},
{
"key": "EMAIL_FROM",
"default": os.environ.get("EMAIL_FROM", "Team Plane <team@mailer.plane.so>"),
},
]
)
return environment_list
188 changes: 188 additions & 0 deletions apps/api/plane/tests/unit/utils/test_encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Python imports
import os
import sys

import django
import pytest

# Ensure the apps/api directory is on the path so we can import plane modules
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
api_root = os.path.join(repo_root, "apps", "api")
if api_root not in sys.path:
sys.path.insert(0, api_root)

# Configure Django settings before importing plane modules
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "plane.settings.test")

# Minimal Django settings setup for tests that don't need a full Django stack
try:
django.setup()
except Exception:
# If Django setup fails (e.g., no settings module), configure minimal settings
from django.conf import settings as django_settings
if not django_settings.configured:
django_settings.configure(
SECRET_KEY="test-secret-key-for-unit-tests-only",
INSTALLED_APPS=[],
DATABASES={},
)

from plane.license.utils.encryption import (
decrypt_data,
decrypt_data_with_status,
derive_key,
derive_key_legacy,
encrypt_data,
)


@pytest.fixture(autouse=True)
def set_secret_key(settings):
"""Override SECRET_KEY for each test."""
settings.SECRET_KEY = "test-secret-key-for-unit-tests"


class TestDeriveKey:
def test_different_secret_keys_produce_different_derived_keys(self):
"""Different SECRET_KEY values must produce different derived keys."""
key1 = derive_key("secret_key_one")
key2 = derive_key("secret_key_two")
assert key1 != key2

def test_same_secret_key_produces_same_derived_key(self):
"""Same SECRET_KEY must always produce the same derived key (deterministic KDF)."""
key1 = derive_key("my_secret_key")
key2 = derive_key("my_secret_key")
assert key1 == key2

def test_new_and_legacy_keys_differ_for_same_secret(self):
"""New KDF and legacy KDF must produce different keys for the same secret."""
new_key = derive_key("some_secret")
legacy_key = derive_key_legacy("some_secret")
assert new_key != legacy_key


class TestEncryptData:
def test_encrypt_returns_non_empty_string(self, settings):
settings.SECRET_KEY = "test-secret-key"
result = encrypt_data("hello world")
assert result
assert isinstance(result, str)

def test_encrypt_empty_string_returns_empty(self, settings):
settings.SECRET_KEY = "test-secret-key"
result = encrypt_data("")
assert result == ""

def test_encrypt_none_returns_empty(self, settings):
settings.SECRET_KEY = "test-secret-key"
result = encrypt_data(None)
assert result == ""

def test_encrypt_produces_different_ciphertexts_each_call(self, settings):
"""Fernet uses a random IV, so the same plaintext encrypted twice must differ."""
settings.SECRET_KEY = "test-secret-key"
plaintext = "repeated_sensitive_value"
ciphertexts = {encrypt_data(plaintext) for _ in range(5)}
# If all 5 encryptions produce the same ciphertext, the scheme is deterministic
# which is a security weakness — this MUST fail, not just warn.
assert len(ciphertexts) > 1, (
"Deterministic encryption detected: all 5 encryptions of the same plaintext "
"produced identical ciphertexts. The encryption scheme must use a random IV/nonce."
)


class TestDecryptData:
def test_decrypt_empty_string_returns_empty(self, settings):
settings.SECRET_KEY = "test-secret-key"
result = decrypt_data("")
assert result == ""

def test_decrypt_none_returns_empty(self, settings):
settings.SECRET_KEY = "test-secret-key"
result = decrypt_data(None)
assert result == ""

def test_roundtrip_integrity(self, settings):
"""Encryption/decryption roundtrip must preserve data integrity."""
settings.SECRET_KEY = "test-secret-key-for-roundtrip"
payloads = [
"normal_value",
"special!@#$%^&*()chars",
"unicode_value_\u00e9\u00e0\u00fc",
"a" * 1000,
'{"json": "value", "nested": {"key": "val"}}',
]
for payload in payloads:
encrypted = encrypt_data(payload)
decrypted = decrypt_data(encrypted)
assert decrypted == payload, f"Roundtrip failed for payload: {payload!r}"

def test_wrong_key_returns_empty(self, settings):
"""Decrypting with a different key must return empty string, not raise."""
settings.SECRET_KEY = "original-secret-key"
encrypted = encrypt_data("sensitive_data")

settings.SECRET_KEY = "completely-different-key"
result = decrypt_data(encrypted)
assert result == ""


class TestDecryptDataWithStatus:
def test_returns_tuple(self, settings):
settings.SECRET_KEY = "test-secret-key"
result = decrypt_data_with_status("")
assert isinstance(result, tuple)
assert len(result) == 2

def test_empty_input_returns_false_legacy(self, settings):
settings.SECRET_KEY = "test-secret-key"
plaintext, used_legacy = decrypt_data_with_status("")
assert plaintext == ""
assert used_legacy is False

def test_current_encryption_not_legacy(self, settings):
"""Values encrypted with the current scheme must not be flagged as legacy."""
settings.SECRET_KEY = "test-secret-key"
encrypted = encrypt_data("my_value")
plaintext, used_legacy = decrypt_data_with_status(encrypted)
assert plaintext == "my_value"
assert used_legacy is False

def test_legacy_encryption_detected(self, settings):
"""Values encrypted with the legacy scheme must be flagged as legacy."""
from cryptography.fernet import Fernet

settings.SECRET_KEY = "test-secret-key"
# Encrypt using the legacy key derivation directly
legacy_key = derive_key_legacy(settings.SECRET_KEY)
legacy_cipher = Fernet(legacy_key)
legacy_encrypted = legacy_cipher.encrypt(b"legacy_value").decode()

plaintext, used_legacy = decrypt_data_with_status(legacy_encrypted)
assert plaintext == "legacy_value"
assert used_legacy is True


class TestKeyIsolation:
@pytest.mark.parametrize("secret_key", [
"secret1",
"secret2",
"a-]different-key!@#$",
])
def test_different_keys_produce_different_ciphertexts(self, settings, secret_key):
"""Different SECRET_KEY values must produce different encrypted outputs
for the same plaintext, ensuring the key derivation provides uniqueness."""
plaintext = "sensitive_data_12345"
other_key = secret_key + "_different"

settings.SECRET_KEY = secret_key
ct1 = encrypt_data(plaintext)

settings.SECRET_KEY = other_key
ct2 = encrypt_data(plaintext)

assert ct1 != ct2, (
"Different secret keys produced identical ciphertexts — "
"key derivation is not properly differentiating keys"
)