Skip to content
45 changes: 27 additions & 18 deletions paimon-python/pypaimon/catalog/filesystem_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.common.time_utils import duration_to_iso8601, local_datetime_to_system_zone_millis
from pypaimon.filesystem.caching_file_io import CachingFileIO
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
Expand Down Expand Up @@ -405,21 +406,18 @@ def create_tag(
time_retained: Optional[str] = None,
ignore_if_exists: bool = False,
) -> None:
if time_retained is not None:
# Python's Tag dataclass does not yet carry tag_create_time /
# tag_time_retained fields; supporting TTL on FileSystemCatalog
# requires extending Tag + TagManager and is tracked as a
# follow-up. Raise here instead of silently dropping the option,
# so callers cannot mistakenly believe the TTL took effect.
raise NotImplementedError(
"FileSystemCatalog does not yet support `time_retained` on "
"create_tag (requires extending the Python Tag dataclass + "
"TagManager).")
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
table = self.get_table(identifier)
try:
table.create_tag(tag_name, snapshot_id, ignore_if_exists)
# Keyword args: FileStoreTable.create_tag orders time_retained after
# ignore_if_exists (Catalog orders it before), so pass by name.
table.create_tag(
tag_name,
snapshot_id,
ignore_if_exists=ignore_if_exists,
time_retained=time_retained,
)
except ValueError as e:
# ``table.create_tag`` honors ``ignore_if_exists`` internally, so
# any "already exists" message that bubbles up here means the
Expand Down Expand Up @@ -453,16 +451,27 @@ def get_tag(
tag = table.tag_manager().get(tag_name)
if tag is None:
raise TagNotExistException(tag_name)
# tag_create_time / tag_time_retained are not tracked on the
# filesystem side yet — the Python Tag dataclass inherits only
# Snapshot fields. Returning ``None`` for both keeps the response
# shape compatible with the Java contract while making the gap
# visible to callers.
# Surface tag_create_time as epoch millis and tag_time_retained as an
# ISO-8601 duration string (types match the Java REST GetTagResponse
# Long / String). The create-time is converted with the host's system
# default time zone, mirroring the Java REST path
# (RESTFileSystemCatalog#getTag uses
# tagCreateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()).
# This intentionally differs from the ``$tags`` system table, which is a
# zone-less Timestamp; both follow their respective Java conversions.
# Both fields are None for tags created without a retention
# (plain-snapshot tag files).
return GetTagResponse(
tag_name=tag_name,
snapshot=tag.trim_to_snapshot(),
tag_create_time=None,
tag_time_retained=None,
tag_create_time=(
None if tag.tag_create_time is None
else local_datetime_to_system_zone_millis(tag.tag_create_time)
),
tag_time_retained=(
None if tag.tag_time_retained is None
else duration_to_iso8601(tag.tag_time_retained)
),
)

def list_tags_paged(
Expand Down
36 changes: 35 additions & 1 deletion paimon-python/pypaimon/common/json_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@ def optional_json_field(json_name: str, json_include: str):
return field(metadata={"json_name": json_name, "json_include": json_include}, default=None)


def json_field_with_codec(json_name: str, encoder, decoder, json_include: str = "non_null"):
"""Create an optional field with a custom JSON name and value codec.

``encoder`` maps the Python value to its JSON representation on serialization;
``decoder`` maps the JSON representation back to the Python value on
deserialization. Both are only invoked for non-``None`` values. This is used
for fields whose on-disk JSON shape must match a non-trivial Java/Jackson
encoding (e.g. ``LocalDateTime`` arrays, ``Duration`` decimal seconds).
"""
return field(
metadata={
"json_name": json_name,
"json_include": json_include,
"encoder": encoder,
"decoder": decoder,
},
default=None,
)


class JSON:

@staticmethod
Expand Down Expand Up @@ -65,6 +85,12 @@ def __to_dict(obj: Any) -> Dict[str, Any]:
if field_info.metadata.get("json_include", None) == "non_null":
continue

# Custom value codec (e.g. Java LocalDateTime / Duration encodings)
encoder = field_info.metadata.get("encoder")
if encoder is not None and field_value is not None:
result[json_name] = encoder(field_value)
continue

# Handle nested objects
if hasattr(field_value, "to_dict"):
result[json_name] = field_value.to_dict()
Expand Down Expand Up @@ -93,9 +119,13 @@ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
# Create field name mapping (json_name -> field_name)
field_mapping = {}
type_mapping = {}
decoder_mapping = {}
for field_info in fields(target_class):
json_name = field_info.metadata.get("json_name", field_info.name)
field_mapping[json_name] = field_info.name
decoder = field_info.metadata.get("decoder")
if decoder is not None:
decoder_mapping[json_name] = decoder
origin_type = getattr(field_info.type, '__origin__', None)
args = getattr(field_info.type, '__args__', None)
field_type = field_info.type
Expand All @@ -113,7 +143,11 @@ def __from_dict(data: Dict[str, Any], target_class: Type[T]) -> T:
for json_name, value in data.items():
if json_name in field_mapping:
field_name = field_mapping[json_name]
if json_name in type_mapping:
if json_name in decoder_mapping:
kwargs[field_name] = (
None if value is None else decoder_mapping[json_name](value)
)
elif json_name in type_mapping:
tp = getattr(type_mapping[json_name], '__origin__', None)
if tp in (list, List):
item_type = getattr(type_mapping[json_name], '__args__', None)[0]
Expand Down
204 changes: 204 additions & 0 deletions paimon-python/pypaimon/common/time_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
# specific language governing permissions and limitations
# under the License.

import calendar
from datetime import datetime, timedelta
from typing import List


def parse_duration(text: str) -> int:
if text is None:
raise ValueError("text cannot be None")
Expand Down Expand Up @@ -77,3 +82,202 @@ def parse_duration(text: str) -> int:
raise ValueError(f"Duration cannot be negative: {text}")

return result_ms_int


# Unit label -> nanoseconds. Mirrors the aliases accepted by ``parse_duration``
# (and Java ``TimeUtils.parseDuration``); a bare number with no unit is treated
# as milliseconds, matching ``parse_duration``.
_UNIT_TO_NANOS = {
'ns': 1, 'nano': 1, 'nanosecond': 1, 'nanoseconds': 1,
'µs': 1_000, 'micro': 1_000, 'microsecond': 1_000, 'microseconds': 1_000,
'ms': 1_000_000, 'milli': 1_000_000, 'millisecond': 1_000_000, 'milliseconds': 1_000_000,
's': 1_000_000_000, 'sec': 1_000_000_000, 'second': 1_000_000_000, 'seconds': 1_000_000_000,
'm': 60_000_000_000, 'min': 60_000_000_000, 'minute': 60_000_000_000, 'minutes': 60_000_000_000,
'h': 3_600_000_000_000, 'hour': 3_600_000_000_000, 'hours': 3_600_000_000_000,
'd': 86_400_000_000_000, 'day': 86_400_000_000_000, 'days': 86_400_000_000_000,
}


def parse_duration_nanos(text: str) -> int:
"""Parse a duration string to an integer nanosecond count.

This is the full-precision integer counterpart of :func:`parse_duration`
(which returns rounded milliseconds): it accepts the same unit aliases and
the same "bare number means milliseconds" convention, but keeps sub-millisecond
units exactly (``"1ns"`` -> ``1``, ``"500micro"`` -> ``500_000``) instead of
rounding them to zero. Use it where sub-millisecond precision must not be
silently dropped. ``parse_duration`` is intentionally left untouched so its
millisecond contract (relied on by option parsing) does not change.
"""
if text is None:
raise ValueError("text cannot be None")

trimmed = text.strip().lower()
if not trimmed:
raise ValueError("argument is an empty- or whitespace-only string")

pos = 0
while pos < len(trimmed) and trimmed[pos].isdigit():
pos += 1

number_str = trimmed[:pos]
unit_str = trimmed[pos:].strip()

if not number_str:
raise ValueError("text does not start with a number")

try:
value = int(number_str)
except ValueError:
raise ValueError(
f"The value '{number_str}' cannot be re represented as 64bit number (numeric overflow)."
)

if not unit_str:
nanos_per_unit = 1_000_000 # bare number is milliseconds
elif unit_str in _UNIT_TO_NANOS:
nanos_per_unit = _UNIT_TO_NANOS[unit_str]
else:
supported_units = (
'DAYS: (d | day | days), '
'HOURS: (h | hour | hours), '
'MINUTES: (m | min | minute | minutes), '
'SECONDS: (s | sec | second | seconds), '
'MILLISECONDS: (ms | milli | millisecond | milliseconds), '
'MICROSECONDS: (µs | micro | microsecond | microseconds), '
'NANOSECONDS: (ns | nano | nanosecond | nanoseconds)'
)
raise ValueError(
f"Time interval unit label '{unit_str}' does not match any of the recognized units: "
f"{supported_units}"
)

return value * nanos_per_unit


# ---------------------------------------------------------------------------
# Codecs for Java temporal types as serialized by Jackson's JavaTimeModule.
#
# These mirror the on-disk JSON shapes that Paimon's Java side reads/writes so
# that a tag file written by pypaimon is interoperable with Java and vice versa
# (see ``org.apache.paimon.tag.Tag`` and ``TagTest``):
# - ``LocalDateTime`` -> JSON array ``[year, month, day, hour, minute,
# second, nanoOfSecond]``
# - ``Duration`` -> JSON number of (fractional) seconds, e.g. ``86400.0``
#
# Resolution note: Python's ``datetime`` / ``timedelta`` are microsecond-based,
# so a Java create-time or duration finer than a microsecond is truncated to
# microseconds on read. Tag granularity is coarse, so this is not a concern in
# practice.
# ---------------------------------------------------------------------------

_NANOS_PER_MICRO = 1000


def local_datetime_to_json_array(dt: datetime) -> List[int]:
"""Encode a naive ``datetime`` as Java ``LocalDateTime`` array form.

Matches Jackson's ``LocalDateTimeSerializer`` byte-for-byte: it always emits
``[year, month, day, hour, minute]``, appends ``second`` only when second or
nanoOfSecond is non-zero, and appends ``nanoOfSecond`` only when it is
non-zero -- i.e. trailing zero components are omitted. ``json_array_to_local_datetime``
pads them back, so a shorter array round-trips. Python ``datetime`` only has
microsecond resolution, so the emitted nanoOfSecond is ``microsecond * 1000``
(never finer than microseconds).
"""
nano = dt.microsecond * _NANOS_PER_MICRO
arr = [dt.year, dt.month, dt.day, dt.hour, dt.minute]
if dt.second != 0 or nano != 0:
arr.append(dt.second)
if nano != 0:
arr.append(nano)
return arr


def json_array_to_local_datetime(arr: List[int]) -> datetime:
"""Decode a Java ``LocalDateTime`` array form into a naive ``datetime``.

Jackson omits trailing zero components, so the array may be shorter than 7;
missing components default to 0. A Java nanoOfSecond finer than a
microsecond is truncated to microseconds (Python's resolution limit).
"""
padded = list(arr) + [0] * (7 - len(arr))
year, month, day, hour, minute, second, nano = padded[:7]
return datetime(
year, month, day, hour, minute, second, nano // _NANOS_PER_MICRO
)


def duration_to_json_seconds(td: timedelta):
"""Encode a ``timedelta`` as Java ``Duration`` decimal-seconds number."""
return td.total_seconds()


def json_seconds_to_duration(seconds) -> timedelta:
"""Decode a Java ``Duration`` decimal-seconds number into a ``timedelta``."""
return timedelta(seconds=seconds)


def duration_to_iso8601(td: timedelta) -> str:
"""Render a non-negative ``timedelta`` like ``java.time.Duration.toString()``.

Examples: 1 day -> ``PT24H`` (Duration has no day unit), 30 min ->
``PT30M``, 5 s -> ``PT5S``, zero -> ``PT0S``. Matches what Paimon's Java
``$tags`` system table surfaces for ``time_retained``. Retentions only ever
come from ``parse_duration``, which rejects negatives, so only the
non-negative form is supported.
"""
total_micros = (
td.days * 86_400_000_000 + td.seconds * 1_000_000 + td.microseconds
)
if total_micros <= 0:
return "PT0S"

total_seconds, micros = divmod(total_micros, 1_000_000)
hours, rem = divmod(total_seconds, 3600)
minutes, secs = divmod(rem, 60)

buf = "PT"
if hours != 0:
buf += "{}H".format(hours)
if minutes != 0:
buf += "{}M".format(minutes)
if secs != 0 or micros != 0 or buf == "PT":
frac = ""
if micros != 0:
frac = "." + ("%06d" % micros).rstrip("0")
buf += "{}{}S".format(secs, frac)
return buf


def local_datetime_to_millis(dt: datetime) -> int:
"""Convert a naive ``LocalDateTime`` to epoch millis, treating it as UTC.

This is the zone-less / wall-clock conversion: it mirrors Java
``Timestamp.fromLocalDateTime`` and feeds the ``$tags`` system table's
``create_time`` (a ``TIMESTAMP`` column, which is itself zone-less), so the
result does NOT depend on the host's time zone. For the REST
``GetTagResponse`` epoch-millis ``Long`` use
:func:`local_datetime_to_system_zone_millis` instead. Uses integer math with
floored sub-millisecond truncation (matching Java ``Instant.toEpochMilli``),
so it is exact and correct for pre-epoch instants too.
"""
return calendar.timegm(dt.timetuple()) * 1000 + dt.microsecond // 1000


def local_datetime_to_system_zone_millis(dt: datetime) -> int:
"""Convert a naive ``LocalDateTime`` to epoch millis in the host's default
time zone.

Mirrors Java ``LocalDateTime.atZone(ZoneId.systemDefault()).toInstant()
.toEpochMilli()`` -- the conversion the REST ``GetTagResponse`` applies to
``tagCreateTime`` (see ``RESTFileSystemCatalog#getTag``). Unlike
:func:`local_datetime_to_millis` (zone-less / UTC, used by the ``$tags``
Timestamp column), the result depends on the host's local time zone, so the
two intentionally differ on non-UTC hosts -- exactly as the two Java paths do.
"""
# A naive datetime.timestamp() interprets dt in the system local zone (via
# mktime). Take whole seconds first to avoid float rounding, then floor the
# sub-second part to millis to match Instant.toEpochMilli().
epoch_seconds = int(dt.replace(microsecond=0).timestamp())
return epoch_seconds * 1000 + dt.microsecond // 1000
Loading
Loading