Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import abc
from abc import ABC, abstractmethod
from collections import OrderedDict
from collections.abc import Sequence
from enum import IntEnum
Expand Down Expand Up @@ -731,7 +731,7 @@ def complete_validation(self, validated_position=None):
self._position = validated_position


class ConsumerRebalanceListener(metaclass=abc.ABCMeta):
class ConsumerRebalanceListener(ABC):
"""
A callback interface that the user can implement to trigger custom actions
when the set of partitions assigned to the consumer changes.
Expand Down Expand Up @@ -779,7 +779,7 @@ class ConsumerRebalanceListener(metaclass=abc.ABCMeta):
taking over that partition has their on_partitions_assigned() callback
called to load the state.
"""
@abc.abstractmethod
@abstractmethod
def on_partitions_revoked(self, revoked):
"""
A callback method the user can implement to provide handling of offset
Expand All @@ -801,7 +801,7 @@ def on_partitions_revoked(self, revoked):
"""
pass

@abc.abstractmethod
@abstractmethod
def on_partitions_assigned(self, assigned):
"""
A callback method the user can implement to provide handling of
Expand Down Expand Up @@ -840,7 +840,7 @@ def on_partitions_lost(self, lost):
return self.on_partitions_revoked(lost)


class AsyncConsumerRebalanceListener(metaclass=abc.ABCMeta):
class AsyncConsumerRebalanceListener(ABC):
"""
Async variant of :class:`ConsumerRebalanceListener`.

Expand All @@ -856,7 +856,7 @@ class AsyncConsumerRebalanceListener(metaclass=abc.ABCMeta):
invokes ``on_partitions_assigned``. Both methods must be defined as
``async def``; otherwise use :class:`ConsumerRebalanceListener`.
"""
@abc.abstractmethod
@abstractmethod
async def on_partitions_revoked(self, revoked):
"""Async-callback for the start of a rebalance operation.

Expand All @@ -870,7 +870,7 @@ async def on_partitions_revoked(self, revoked):
"""
pass

@abc.abstractmethod
@abstractmethod
async def on_partitions_assigned(self, assigned):
"""Async-callback for the completion of a partition re-assignment.

Expand Down
12 changes: 6 additions & 6 deletions kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import abc
from abc import ABC, abstractmethod, abstractproperty
from enum import IntEnum

from kafka.protocol.consumer.metadata import (
Expand All @@ -25,13 +25,13 @@ class RebalanceProtocol(IntEnum):
COOPERATIVE = 1


class AbstractPartitionAssignor(metaclass=abc.ABCMeta):
class AbstractPartitionAssignor(ABC):
"""
Abstract assignor implementation which does some common grunt work (in particular collecting
partition counts which are always needed in assignors).
"""

@abc.abstractproperty
@abstractproperty
def name(self):
""".name should be a string identifying the assignor"""
pass
Expand All @@ -48,7 +48,7 @@ def supported_protocols(self):
"""
return [RebalanceProtocol.EAGER]

@abc.abstractmethod
@abstractmethod
def assign(self, cluster, members):
"""Perform group assignment given cluster metadata and member subscriptions

Expand All @@ -64,7 +64,7 @@ def assign(self, cluster, members):
"""
pass

@abc.abstractmethod
@abstractmethod
def metadata(self, topics):
"""Generate ProtocolMetadata to be submitted via JoinGroupRequest.

Expand All @@ -76,7 +76,7 @@ def metadata(self, topics):
"""
pass

@abc.abstractmethod
@abstractmethod
def on_assignment(self, assignment, generation):
"""Callback that runs on each assignment.

Expand Down
10 changes: 5 additions & 5 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import abc
from abc import ABC, abstractmethod
import copy
import logging
import threading
Expand Down Expand Up @@ -70,7 +70,7 @@ class UnjoinedGroupException(Errors.RetriableError):
pass


class BaseCoordinator(metaclass=abc.ABCMeta):
class BaseCoordinator(ABC):
"""
BaseCoordinator implements group management for a single group member
by interacting with a designated Kafka broker (the coordinator). Group
Expand Down Expand Up @@ -225,7 +225,7 @@ def group_id(self):
def group_instance_id(self):
return self.config['group_instance_id']

@abc.abstractmethod
@abstractmethod
def protocol_type(self):
"""
Unique identifier for the class of supported protocols
Expand All @@ -236,7 +236,7 @@ def protocol_type(self):
"""
pass

@abc.abstractmethod
@abstractmethod
def group_protocols(self):
"""Return the list of supported group protocols and metadata.

Expand Down Expand Up @@ -270,7 +270,7 @@ async def _on_join_prepare_async(self, generation, member_id, timeout_ms=None):
"""
pass

@abc.abstractmethod
@abstractmethod
def _perform_assignment(self, leader_id, protocol, members):
"""Perform assignment for the group.

Expand Down
7 changes: 4 additions & 3 deletions kafka/metrics/compound_stat.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import abc
from abc import ABC, abstractmethod

from kafka.metrics.stat import AbstractStat


class AbstractCompoundStat(AbstractStat, metaclass=abc.ABCMeta):
class AbstractCompoundStat(AbstractStat, ABC):
"""
A compound stat is a stat where a single measurement and associated
data structure feeds many metrics. This is the example for a
histogram which has many associated percentiles.
"""
@abstractmethod
def stats(self):
"""
Return list of NamedMeasurable
"""
raise NotImplementedError
pass


class NamedMeasurable:
Expand Down
8 changes: 4 additions & 4 deletions kafka/metrics/measurable.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import abc
from abc import ABC, abstractmethod


class AbstractMeasurable(metaclass=abc.ABCMeta):
class AbstractMeasurable(ABC):
"""A measurable quantity that can be registered as a metric"""
@abc.abstractmethod
@abstractmethod
def measure(self, config, now):
"""
Measure this quantity and return the result
Expand All @@ -16,7 +16,7 @@ def measure(self, config, now):
Returns:
The measured value
"""
raise NotImplementedError
pass


class AnonMeasurable(AbstractMeasurable):
Expand Down
4 changes: 2 additions & 2 deletions kafka/metrics/measurable_stat.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import abc
from abc import ABC

from kafka.metrics.measurable import AbstractMeasurable
from kafka.metrics.stat import AbstractStat


class AbstractMeasurableStat(AbstractStat, AbstractMeasurable, metaclass=abc.ABCMeta):
class AbstractMeasurableStat(AbstractStat, AbstractMeasurable, ABC):
"""
An AbstractMeasurableStat is an AbstractStat that is also
an AbstractMeasurable (i.e. can produce a single floating point value).
Expand Down
24 changes: 12 additions & 12 deletions kafka/metrics/metrics_reporter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import abc
from abc import ABC, abstractmethod


class AbstractMetricsReporter(metaclass=abc.ABCMeta):
class AbstractMetricsReporter(ABC):
"""
An abstract class to allow things to listen as new metrics
are created so they can be reported.
"""
@abc.abstractmethod
@abstractmethod
def init(self, metrics):
"""
This is called when the reporter is first registered
Expand All @@ -15,39 +15,39 @@ def init(self, metrics):
Arguments:
metrics (list of KafkaMetric): All currently existing metrics
"""
raise NotImplementedError
pass

@abc.abstractmethod
@abstractmethod
def metric_change(self, metric):
"""
This is called whenever a metric is updated or added

Arguments:
metric (KafkaMetric)
"""
raise NotImplementedError
pass

@abc.abstractmethod
@abstractmethod
def metric_removal(self, metric):
"""
This is called whenever a metric is removed

Arguments:
metric (KafkaMetric)
"""
raise NotImplementedError
pass

@abc.abstractmethod
@abstractmethod
def configure(self, configs):
"""
Configure this class with the given key-value pairs

Arguments:
configs (dict of {str, ?})
"""
raise NotImplementedError
pass

@abc.abstractmethod
@abstractmethod
def close(self):
"""Called when the metrics repository is closed."""
raise NotImplementedError
pass
8 changes: 4 additions & 4 deletions kafka/metrics/stat.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import abc
from abc import ABC, abstractmethod


class AbstractStat(metaclass=abc.ABCMeta):
class AbstractStat(ABC):
"""
An AbstractStat is a quantity such as average, max, etc that is computed
off the stream of updates to a sensor
"""
@abc.abstractmethod
@abstractmethod
def record(self, config, value, time_ms):
"""
Record the given value
Expand All @@ -16,4 +16,4 @@ def record(self, config, value, time_ms):
value (float): The value to record
timeMs (int): The POSIX time in milliseconds this value occurred
"""
raise NotImplementedError
pass
12 changes: 6 additions & 6 deletions kafka/metrics/stats/sampled_stat.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import abc
from abc import ABC, abstractmethod

from kafka.metrics.measurable_stat import AbstractMeasurableStat


class AbstractSampledStat(AbstractMeasurableStat, metaclass=abc.ABCMeta):
class AbstractSampledStat(AbstractMeasurableStat, ABC):
"""
An AbstractSampledStat records a single scalar value measured over
one or more samples. Each sample is recorded over a configurable
Expand All @@ -25,13 +25,13 @@ def __init__(self, initial_value):
self._samples = []
self._current = 0

@abc.abstractmethod
@abstractmethod
def update(self, sample, config, value, time_ms):
raise NotImplementedError
pass

@abc.abstractmethod
@abstractmethod
def combine(self, samples, config, now):
raise NotImplementedError
pass

def record(self, config, value, time_ms):
sample = self.current(time_ms)
Expand Down
6 changes: 3 additions & 3 deletions kafka/partitioner/abc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import abc
from abc import ABC, abstractmethod


class Partitioner(metaclass=abc.ABCMeta):
@abc.abstractmethod
class Partitioner(ABC):
@abstractmethod
def partition(self, topic, key, serialized_key, value, serialized_value, cluster):
pass

1 change: 1 addition & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ class KafkaProducer:
same key are assigned to the same partition.
When a key is None, the message is delivered to a random partition
(filtered to partitions with available leaders only, if possible).
Default: DefaultPartitioner().
connections_max_idle_ms: Close idle connections after the number of
milliseconds specified by this config. The broker closes idle
connections after connections.max.idle.ms, so this avoids hitting
Expand Down
8 changes: 4 additions & 4 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import abc
from abc import ABC, abstractmethod, abstractproperty
import collections
from enum import IntEnum
import heapq
Expand Down Expand Up @@ -812,7 +812,7 @@ def exception(self):
return self._error


class TxnRequestHandler(metaclass=abc.ABCMeta):
class TxnRequestHandler(ABC):
def __init__(self, transaction_manager, result=None):
self.transaction_manager = transaction_manager
self.retry_backoff_ms = transaction_manager.retry_backoff_ms
Expand Down Expand Up @@ -890,11 +890,11 @@ def set_retry(self):
def is_retry(self):
return self._is_retry

@abc.abstractmethod
@abstractmethod
def handle_response(self, response):
pass

@abc.abstractproperty
@abstractproperty
def priority(self):
pass

Expand Down
8 changes: 4 additions & 4 deletions kafka/protocol/old/abstract.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import abc
from abc import ABC, abstractmethod


class AbstractType(metaclass=abc.ABCMeta):
class AbstractType(ABC):
@classmethod
@abc.abstractmethod
@abstractmethod
def encode(cls, value): # pylint: disable=no-self-argument
pass

@classmethod
@abc.abstractmethod
@abstractmethod
def decode(cls, data): # pylint: disable=no-self-argument
pass

Expand Down
Loading
Loading