Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite retries : NotLeaderOrFollowerException #106

Open
ThomasDangleterre opened this issue Jun 23, 2022 · 2 comments
Open

Infinite retries : NotLeaderOrFollowerException #106

ThomasDangleterre opened this issue Jun 23, 2022 · 2 comments

Comments

@ThomasDangleterre
Copy link

Hello,

I have an issue while using kafka-proxy :

[Producer clientId=geco-1] Received invalid metadata error in produce request on partition private_dkt_out_listener_kafka_geco_v1-2 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now


[Producer clientId=geco-1] Got error produce response with correlation id 862812 on topic-partition private_dkt_out_listener_kafka_geco_v1-1, retrying (2147483398 attempts left). Error: NOT_LEADER_OR_FOLLOWER

This error repeats itself without pushing data to the topic.

Here is my configuration :

containers:
      - name: vcstream-uat-proxy
        image: <anonymized>/kafka-proxy:v0.3.1
        command:
        - '/bin/sh'
        - '-c'
        - |
          apk update && apk add bind-tools

          AIVEN_BROKER_IPS=$(dig +short  <anonymized> )
          AIVEN_BROKER_IP_1=$(echo "$AIVEN_BROKER_IPS" | sed -n '1p')
          AIVEN_BROKER_IP_2=$(echo "$AIVEN_BROKER_IPS" | sed -n '2p')
          AIVEN_BROKER_IP_3=$(echo "$AIVEN_BROKER_IPS" | sed -n '3p')

          SERVICE_IP=$(dig +short <anonymized> | sed -n '1p')

          EXTERNAL_IP=10.48.28.151

          echo " Ip Broker Aiven n° 1 : $AIVEN_BROKER_IP_1"
          echo " Ip Broker Aiven n° 2 : $AIVEN_BROKER_IP_2"
          echo " Ip Broker Aiven n° 3 : $AIVEN_BROKER_IP_3"

          echo " cluster Ip Service ingress-vcstream-uat-proxy : $SERVICE_IP"
          echo " External IP is $EXTERNAL_IP"

          tty=$(readlink /proc/$$/fd/2)

          echo "Launching kafka proxy ..."

          echo $ca_cert | base64 -d  > ca_cert.pem && \
          echo $access_cert | base64 -d  > access.cert && \
          echo $access_key | base64 -d  > access.key && \
          /opt/kafka-proxy/bin/kafka-proxy server \
          --log-format=json \
          --bootstrap-server-mapping="$AIVEN_BROKER_IP_1":12658,0.0.0.0:12658,"$EXTERNAL_IP":12658 \
          --bootstrap-server-mapping="$AIVEN_BROKER_IP_2":12658,0.0.0.0:12659,"$EXTERNAL_IP":12659 \
          --bootstrap-server-mapping="$AIVEN_BROKER_IP_3":12658,0.0.0.0:12660,"$EXTERNAL_IP":12660 \
          --dynamic-listeners-disable \
          --tls-enable \
          --tls-ca-chain-cert-file ca_cert.pem \
          --tls-client-cert-file access.cert \
          --tls-client-key-file access.key \
          --tls-client-key-password $keystore_password \
          --tls-insecure-skip-verify \
          --proxy-listener-tls-enable \
          --proxy-listener-key-file access.key \
          --proxy-listener-cert-file access.cert \
          --proxy-listener-key-password $keystore_password \
          --proxy-listener-ca-chain-cert-file ca_cert.pem \
          --tls-same-client-cert-enable  | tee $tty | grep -q 'i/o timeout' | exit 1

We dynamically fetch IPS of our broker and fail when we get 'i/o timeout' in logs in order to improve resilience.

Does anyone knows why our producer can't produce data to the topic ?

@ljakob
Copy link

ljakob commented Aug 22, 2022

Hi,
We've just bounced into the same problem... we are still analyzing if it's proxy-related

Did you resolve the issue? Do you have any hint for us?

Thanks Leif

@ThomasDangleterre
Copy link
Author

Hello,

We observed that after a restart the issue was gone. As the /health doesn't fit our need to detect that kafka message production is not working, we created a custom image with kafkacat and a script that produces messages ( on a dedicated topic, always on partition ) trough the proxy and consume them.

The script shown as below is used it in the livenessProbe of the kafka proxy's deployment , so in case of error it will trigger a restart.

#!/bin/sh

#automatically exit on error
set -e

timestamp=$(date '+%s')

echo "Sending message to $HOST:$PORT $LIVENESS_TOPIC"

# produce message in topic
echo "$HOSTNAME $timestamp" | kafkacat -P -b "$EXTERNAL_IP":"$PORT"  -t "$LIVENESS_TOPIC" -p 0 \
    -X security.protocol=SSL \
    -X ssl.key.location=service.key \
    -X ssl.certificate.location=service.cert \
    -X ssl.ca.location=ca.pem

while true; do
  #  Consume last message of the topic and exit
  payload=$( kafkacat -C -b "$HOST":"$PORT"  -t "$LIVENESS_TOPIC" -o  -1 -e \
    -X security.protocol=SSL \
    -X ssl.key.location=service.key \
    -X ssl.certificate.location=service.cert \
    -X ssl.ca.location=ca.pem \
  )

  if [ "$payload" = "$HOSTNAME $timestamp" ]; then
    break;
  fi;

done;

exit 0

We get some transient errors triggering restarts but overall the connection is stable now.

% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating
%3|1661178021.406|FAIL|rdkafka#producer-1| [thrd:ssl://10.48.28.151:12660/32]: ssl://10.48.28.151:12658/32: No further error information available (after 0ms in state SSL_HANDSHAKE)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants