From 49f449e46d358e67d9e8cd109cb956f7b45d3af2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2026 14:37:36 -0700 Subject: [PATCH 1/2] Prefer ABC subclass to ABCmeta metaclass --- kafka/consumer/subscription_state.py | 14 +++---- kafka/coordinator/assignors/abstract.py | 12 +++--- kafka/coordinator/base.py | 10 ++--- kafka/metrics/compound_stat.py | 7 ++-- kafka/metrics/measurable.py | 8 ++-- kafka/metrics/measurable_stat.py | 4 +- kafka/metrics/metrics_reporter.py | 24 ++++++------ kafka/metrics/stat.py | 8 ++-- kafka/metrics/stats/sampled_stat.py | 12 +++--- kafka/partitioner/abc.py | 6 +-- kafka/producer/kafka.py | 1 + kafka/producer/transaction_manager.py | 8 ++-- kafka/protocol/old/abstract.py | 8 ++-- kafka/protocol/old/api.py | 12 +++--- kafka/protocol/old/struct.py | 6 +-- kafka/record/abc.py | 50 ++++++++++++------------- kafka/sasl/abc.py | 14 +++---- 17 files changed, 103 insertions(+), 101 deletions(-) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index a88850844..03863f769 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod from collections import OrderedDict from collections.abc import Sequence from enum import IntEnum @@ -731,7 +731,7 @@ def complete_validation(self, validated_position=None): self._position = validated_position -class ConsumerRebalanceListener(metaclass=abc.ABCMeta): +class ConsumerRebalanceListener(ABC): """ A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes. @@ -779,7 +779,7 @@ class ConsumerRebalanceListener(metaclass=abc.ABCMeta): taking over that partition has their on_partitions_assigned() callback called to load the state. """ - @abc.abstractmethod + @abstractmethod def on_partitions_revoked(self, revoked): """ A callback method the user can implement to provide handling of offset @@ -801,7 +801,7 @@ def on_partitions_revoked(self, revoked): """ pass - @abc.abstractmethod + @abstractmethod def on_partitions_assigned(self, assigned): """ A callback method the user can implement to provide handling of @@ -840,7 +840,7 @@ def on_partitions_lost(self, lost): return self.on_partitions_revoked(lost) -class AsyncConsumerRebalanceListener(metaclass=abc.ABCMeta): +class AsyncConsumerRebalanceListener(ABC): """ Async variant of :class:`ConsumerRebalanceListener`. @@ -856,7 +856,7 @@ class AsyncConsumerRebalanceListener(metaclass=abc.ABCMeta): invokes ``on_partitions_assigned``. Both methods must be defined as ``async def``; otherwise use :class:`ConsumerRebalanceListener`. """ - @abc.abstractmethod + @abstractmethod async def on_partitions_revoked(self, revoked): """Async-callback for the start of a rebalance operation. @@ -870,7 +870,7 @@ async def on_partitions_revoked(self, revoked): """ pass - @abc.abstractmethod + @abstractmethod async def on_partitions_assigned(self, assigned): """Async-callback for the completion of a partition re-assignment. diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index 8b9aa762b..02126c5dc 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod, abstractproperty from enum import IntEnum from kafka.protocol.consumer.metadata import ( @@ -25,13 +25,13 @@ class RebalanceProtocol(IntEnum): COOPERATIVE = 1 -class AbstractPartitionAssignor(metaclass=abc.ABCMeta): +class AbstractPartitionAssignor(ABC): """ Abstract assignor implementation which does some common grunt work (in particular collecting partition counts which are always needed in assignors). """ - @abc.abstractproperty + @abstractproperty def name(self): """.name should be a string identifying the assignor""" pass @@ -48,7 +48,7 @@ def supported_protocols(self): """ return [RebalanceProtocol.EAGER] - @abc.abstractmethod + @abstractmethod def assign(self, cluster, members): """Perform group assignment given cluster metadata and member subscriptions @@ -64,7 +64,7 @@ def assign(self, cluster, members): """ pass - @abc.abstractmethod + @abstractmethod def metadata(self, topics): """Generate ProtocolMetadata to be submitted via JoinGroupRequest. @@ -76,7 +76,7 @@ def metadata(self, topics): """ pass - @abc.abstractmethod + @abstractmethod def on_assignment(self, assignment, generation): """Callback that runs on each assignment. diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 1e44f81cc..074e29a7f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod import copy import logging import threading @@ -70,7 +70,7 @@ class UnjoinedGroupException(Errors.RetriableError): pass -class BaseCoordinator(metaclass=abc.ABCMeta): +class BaseCoordinator(ABC): """ BaseCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group @@ -225,7 +225,7 @@ def group_id(self): def group_instance_id(self): return self.config['group_instance_id'] - @abc.abstractmethod + @abstractmethod def protocol_type(self): """ Unique identifier for the class of supported protocols @@ -236,7 +236,7 @@ def protocol_type(self): """ pass - @abc.abstractmethod + @abstractmethod def group_protocols(self): """Return the list of supported group protocols and metadata. @@ -270,7 +270,7 @@ async def _on_join_prepare_async(self, generation, member_id, timeout_ms=None): """ pass - @abc.abstractmethod + @abstractmethod def _perform_assignment(self, leader_id, protocol, members): """Perform assignment for the group. diff --git a/kafka/metrics/compound_stat.py b/kafka/metrics/compound_stat.py index 31a02181a..5ce30a773 100644 --- a/kafka/metrics/compound_stat.py +++ b/kafka/metrics/compound_stat.py @@ -1,19 +1,20 @@ -import abc +from abc import ABC, abstractmethod from kafka.metrics.stat import AbstractStat -class AbstractCompoundStat(AbstractStat, metaclass=abc.ABCMeta): +class AbstractCompoundStat(AbstractStat, ABC): """ A compound stat is a stat where a single measurement and associated data structure feeds many metrics. This is the example for a histogram which has many associated percentiles. """ + @abstractmethod def stats(self): """ Return list of NamedMeasurable """ - raise NotImplementedError + pass class NamedMeasurable: diff --git a/kafka/metrics/measurable.py b/kafka/metrics/measurable.py index 1abd222c3..d9deb9fe3 100644 --- a/kafka/metrics/measurable.py +++ b/kafka/metrics/measurable.py @@ -1,9 +1,9 @@ -import abc +from abc import ABC, abstractmethod -class AbstractMeasurable(metaclass=abc.ABCMeta): +class AbstractMeasurable(ABC): """A measurable quantity that can be registered as a metric""" - @abc.abstractmethod + @abstractmethod def measure(self, config, now): """ Measure this quantity and return the result @@ -16,7 +16,7 @@ def measure(self, config, now): Returns: The measured value """ - raise NotImplementedError + pass class AnonMeasurable(AbstractMeasurable): diff --git a/kafka/metrics/measurable_stat.py b/kafka/metrics/measurable_stat.py index f16798c5a..a4e633020 100644 --- a/kafka/metrics/measurable_stat.py +++ b/kafka/metrics/measurable_stat.py @@ -1,10 +1,10 @@ -import abc +from abc import ABC from kafka.metrics.measurable import AbstractMeasurable from kafka.metrics.stat import AbstractStat -class AbstractMeasurableStat(AbstractStat, AbstractMeasurable, metaclass=abc.ABCMeta): +class AbstractMeasurableStat(AbstractStat, AbstractMeasurable, ABC): """ An AbstractMeasurableStat is an AbstractStat that is also an AbstractMeasurable (i.e. can produce a single floating point value). diff --git a/kafka/metrics/metrics_reporter.py b/kafka/metrics/metrics_reporter.py index 8423553bb..1c4992185 100644 --- a/kafka/metrics/metrics_reporter.py +++ b/kafka/metrics/metrics_reporter.py @@ -1,12 +1,12 @@ -import abc +from abc import ABC, abstractmethod -class AbstractMetricsReporter(metaclass=abc.ABCMeta): +class AbstractMetricsReporter(ABC): """ An abstract class to allow things to listen as new metrics are created so they can be reported. """ - @abc.abstractmethod + @abstractmethod def init(self, metrics): """ This is called when the reporter is first registered @@ -15,9 +15,9 @@ def init(self, metrics): Arguments: metrics (list of KafkaMetric): All currently existing metrics """ - raise NotImplementedError + pass - @abc.abstractmethod + @abstractmethod def metric_change(self, metric): """ This is called whenever a metric is updated or added @@ -25,9 +25,9 @@ def metric_change(self, metric): Arguments: metric (KafkaMetric) """ - raise NotImplementedError + pass - @abc.abstractmethod + @abstractmethod def metric_removal(self, metric): """ This is called whenever a metric is removed @@ -35,9 +35,9 @@ def metric_removal(self, metric): Arguments: metric (KafkaMetric) """ - raise NotImplementedError + pass - @abc.abstractmethod + @abstractmethod def configure(self, configs): """ Configure this class with the given key-value pairs @@ -45,9 +45,9 @@ def configure(self, configs): Arguments: configs (dict of {str, ?}) """ - raise NotImplementedError + pass - @abc.abstractmethod + @abstractmethod def close(self): """Called when the metrics repository is closed.""" - raise NotImplementedError + pass diff --git a/kafka/metrics/stat.py b/kafka/metrics/stat.py index bd70eb232..8ab40933b 100644 --- a/kafka/metrics/stat.py +++ b/kafka/metrics/stat.py @@ -1,12 +1,12 @@ -import abc +from abc import ABC, abstractmethod -class AbstractStat(metaclass=abc.ABCMeta): +class AbstractStat(ABC): """ An AbstractStat is a quantity such as average, max, etc that is computed off the stream of updates to a sensor """ - @abc.abstractmethod + @abstractmethod def record(self, config, value, time_ms): """ Record the given value @@ -16,4 +16,4 @@ def record(self, config, value, time_ms): value (float): The value to record timeMs (int): The POSIX time in milliseconds this value occurred """ - raise NotImplementedError + pass diff --git a/kafka/metrics/stats/sampled_stat.py b/kafka/metrics/stats/sampled_stat.py index fdda6c79c..0710ebf7b 100644 --- a/kafka/metrics/stats/sampled_stat.py +++ b/kafka/metrics/stats/sampled_stat.py @@ -1,9 +1,9 @@ -import abc +from abc import ABC, abstractmethod from kafka.metrics.measurable_stat import AbstractMeasurableStat -class AbstractSampledStat(AbstractMeasurableStat, metaclass=abc.ABCMeta): +class AbstractSampledStat(AbstractMeasurableStat, ABC): """ An AbstractSampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a configurable @@ -25,13 +25,13 @@ def __init__(self, initial_value): self._samples = [] self._current = 0 - @abc.abstractmethod + @abstractmethod def update(self, sample, config, value, time_ms): - raise NotImplementedError + pass - @abc.abstractmethod + @abstractmethod def combine(self, samples, config, now): - raise NotImplementedError + pass def record(self, config, value, time_ms): sample = self.current(time_ms) diff --git a/kafka/partitioner/abc.py b/kafka/partitioner/abc.py index 57108fbac..efd070737 100644 --- a/kafka/partitioner/abc.py +++ b/kafka/partitioner/abc.py @@ -1,8 +1,8 @@ -import abc +from abc import ABC, abstractmethod -class Partitioner(metaclass=abc.ABCMeta): - @abc.abstractmethod +class Partitioner(ABC): + @abstractmethod def partition(self, topic, key, serialized_key, value, serialized_value, cluster): pass diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0fdf72662..1766204ae 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -251,6 +251,7 @@ class KafkaProducer: same key are assigned to the same partition. When a key is None, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible). + Default: DefaultPartitioner(). connections_max_idle_ms: Close idle connections after the number of milliseconds specified by this config. The broker closes idle connections after connections.max.idle.ms, so this avoids hitting diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 36e8a823f..f25d0528d 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod, abstractproperty import collections from enum import IntEnum import heapq @@ -812,7 +812,7 @@ def exception(self): return self._error -class TxnRequestHandler(metaclass=abc.ABCMeta): +class TxnRequestHandler(ABC): def __init__(self, transaction_manager, result=None): self.transaction_manager = transaction_manager self.retry_backoff_ms = transaction_manager.retry_backoff_ms @@ -890,11 +890,11 @@ def set_retry(self): def is_retry(self): return self._is_retry - @abc.abstractmethod + @abstractmethod def handle_response(self, response): pass - @abc.abstractproperty + @abstractproperty def priority(self): pass diff --git a/kafka/protocol/old/abstract.py b/kafka/protocol/old/abstract.py index 529a73d35..58c690890 100644 --- a/kafka/protocol/old/abstract.py +++ b/kafka/protocol/old/abstract.py @@ -1,14 +1,14 @@ -import abc +from abc import ABC, abstractmethod -class AbstractType(metaclass=abc.ABCMeta): +class AbstractType(ABC): @classmethod - @abc.abstractmethod + @abstractmethod def encode(cls, value): # pylint: disable=no-self-argument pass @classmethod - @abc.abstractmethod + @abstractmethod def decode(cls, data): # pylint: disable=no-self-argument pass diff --git a/kafka/protocol/old/api.py b/kafka/protocol/old/api.py index bf8b06c51..0757fc109 100644 --- a/kafka/protocol/old/api.py +++ b/kafka/protocol/old/api.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod, abstractproperty from io import BytesIO import weakref @@ -61,17 +61,17 @@ class ResponseHeaderV2(Struct): ) -class RequestResponse(Struct, metaclass=abc.ABCMeta): +class RequestResponse(Struct, ABC): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._header = None - @abc.abstractproperty + @abstractproperty def API_KEY(self): """Integer identifier for api request""" pass - @abc.abstractproperty + @abstractproperty def API_VERSION(self): """Integer of api request version""" pass @@ -80,7 +80,7 @@ def to_object(self): return _to_object(self.SCHEMA, self) @classmethod - @abc.abstractmethod + @abstractmethod def is_request(cls): pass @@ -114,7 +114,7 @@ def encode(self, header=False, framed=False): return b''.join(bits) @classmethod - @abc.abstractmethod + @abstractmethod def header_class(cls): pass diff --git a/kafka/protocol/old/struct.py b/kafka/protocol/old/struct.py index 0970a889e..1fb21e4de 100644 --- a/kafka/protocol/old/struct.py +++ b/kafka/protocol/old/struct.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractproperty from io import BytesIO from .abstract import AbstractType @@ -7,9 +7,9 @@ from kafka.util import WeakMethod -class Struct(metaclass=abc.ABCMeta): +class Struct(ABC): - @abc.abstractproperty + @abstractproperty def SCHEMA(self): """An instance of Schema() representing the structure""" pass diff --git a/kafka/record/abc.py b/kafka/record/abc.py index 21da8e928..3d65dc3a3 100644 --- a/kafka/record/abc.py +++ b/kafka/record/abc.py @@ -1,61 +1,61 @@ -import abc +from abc import ABC, abstractmethod, abstractproperty -class ABCRecord(metaclass=abc.ABCMeta): +class ABCRecord(ABC): __slots__ = () - @abc.abstractproperty + @abstractproperty def size_in_bytes(self): """ Number of total bytes in record """ - @abc.abstractproperty + @abstractproperty def offset(self): """ Absolute offset of record """ - @abc.abstractproperty + @abstractproperty def timestamp(self): """ Epoch milliseconds """ - @abc.abstractproperty + @abstractproperty def timestamp_type(self): """ CREATE_TIME(0) or APPEND_TIME(1) """ - @abc.abstractproperty + @abstractproperty def key(self): """ Bytes key or None """ - @abc.abstractproperty + @abstractproperty def value(self): """ Bytes value or None """ - @abc.abstractproperty + @abstractproperty def checksum(self): """ Prior to v2 format CRC was contained in every message. This will be the checksum for v0 and v1 and None for v2 and above. """ - @abc.abstractmethod + @abstractmethod def validate_crc(self): """ Return True if v0/v1 record matches checksum. noop/True for v2 records """ - @abc.abstractproperty + @abstractproperty def headers(self): """ If supported by version list of key-value tuples, or empty list if not supported by format. """ -class ABCRecordBatchBuilder(metaclass=abc.ABCMeta): +class ABCRecordBatchBuilder(ABC): __slots__ = () - @abc.abstractmethod + @abstractmethod def append(self, offset, timestamp, key, value, headers=None): """ Writes record to internal buffer. @@ -74,14 +74,14 @@ def append(self, offset, timestamp, key, value, headers=None): above) and size of the written record. """ - @abc.abstractmethod + @abstractmethod def size_in_bytes(self, offset, timestamp, key, value, headers): """ Return the expected size change on buffer (uncompressed) if we add this message. This will account for varint size changes and give a reliable size. """ - @abc.abstractmethod + @abstractmethod def build(self): """ Close for append, compress if needed, write size and header and return a ready to send buffer object. @@ -91,54 +91,54 @@ def build(self): """ -class ABCRecordBatch(metaclass=abc.ABCMeta): +class ABCRecordBatch(ABC): """ For v2 encapsulates a RecordBatch, for v0/v1 a single (maybe compressed) message. """ __slots__ = () - @abc.abstractmethod + @abstractmethod def __iter__(self): """ Return iterator over records (ABCRecord instances). Will decompress if needed. """ - @abc.abstractproperty + @abstractproperty def base_offset(self): """ Return base offset for batch """ - @abc.abstractproperty + @abstractproperty def size_in_bytes(self): """ Return size of batch in bytes (includes header overhead) """ - @abc.abstractproperty + @abstractproperty def magic(self): """ Return magic value (0, 1, 2) for batch. """ -class ABCRecords(metaclass=abc.ABCMeta): +class ABCRecords(ABC): __slots__ = () - @abc.abstractmethod + @abstractmethod def __init__(self, buffer): """ Initialize with bytes-like object conforming to the buffer interface (ie. bytes, bytearray, memoryview etc.). """ - @abc.abstractmethod + @abstractmethod def size_in_bytes(self): """ Returns the size of inner buffer. """ - @abc.abstractmethod + @abstractmethod def next_batch(self): """ Return next batch of records (ABCRecordBatch instances). """ - @abc.abstractmethod + @abstractmethod def has_next(self): """ True if there are more batches to read, False otherwise. """ diff --git a/kafka/sasl/abc.py b/kafka/sasl/abc.py index 7d47bcfcc..51e835502 100644 --- a/kafka/sasl/abc.py +++ b/kafka/sasl/abc.py @@ -1,24 +1,24 @@ -import abc +from abc import ABC, abstractmethod -class SaslMechanism(metaclass=abc.ABCMeta): - @abc.abstractmethod +class SaslMechanism(ABC): + @abstractmethod def __init__(self, **config): pass - @abc.abstractmethod + @abstractmethod def auth_bytes(self): pass - @abc.abstractmethod + @abstractmethod def receive(self, auth_bytes): pass - @abc.abstractmethod + @abstractmethod def is_done(self): pass - @abc.abstractmethod + @abstractmethod def is_authenticated(self): pass From a42598a877fa0c773e18ad24263e6b56353c40b2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2026 14:48:57 -0700 Subject: [PATCH 2/2] another --- kafka/sasl/oauth.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kafka/sasl/oauth.py b/kafka/sasl/oauth.py index 070c267df..4b8ac6ccb 100644 --- a/kafka/sasl/oauth.py +++ b/kafka/sasl/oauth.py @@ -1,4 +1,4 @@ -import abc +from abc import ABC, abstractmethod import logging from kafka.errors import KafkaConfigurationError @@ -58,8 +58,6 @@ def auth_details(self): raise RuntimeError('Not authenticated yet!') return 'Authenticated via SASL / OAuth' -# This statement is compatible with both Python 2.7 & 3+ -ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) class AbstractTokenProvider(ABC): """ @@ -78,7 +76,7 @@ class AbstractTokenProvider(ABC): def __init__(self, **config): pass - @abc.abstractmethod + @abstractmethod def token(self): """ Returns a (str) ID/Access Token to be sent to the Kafka