From 34187e047d7ee7852a266c79595a561d514ee147 Mon Sep 17 00:00:00 2001 From: jiuker <2818723467@qq.com> Date: Thu, 30 Nov 2023 16:25:03 +0800 Subject: [PATCH] feat: support elasticsearch notification endpoint compression codec (#18562) --- docs/bucket/notifications/README.md | 34 ++++++------ internal/config/notify/help.go | 12 ++++ internal/config/notify/legacy.go | 8 +++ internal/config/notify/parse.go | 21 +++++++ internal/event/target/kafka.go | 86 ++++++++++++++++++----------- 5 files changed, 114 insertions(+), 47 deletions(-) diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index d4a7140bf..3eaf1c169 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -1134,22 +1134,24 @@ KEY: notify_kafka[:name] publish bucket notifications to Kafka endpoints ARGS: -MINIO_NOTIFY_KAFKA_ENABLE* (on|off) enable notify_kafka target, default is 'off' -MINIO_NOTIFY_KAFKA_BROKERS* (csv) comma separated list of Kafka broker addresses -MINIO_NOTIFY_KAFKA_TOPIC (string) Kafka topic used for bucket notifications -MINIO_NOTIFY_KAFKA_SASL_USERNAME (string) username for SASL/PLAIN or SASL/SCRAM authentication -MINIO_NOTIFY_KAFKA_SASL_PASSWORD (string) password for SASL/PLAIN or SASL/SCRAM authentication -MINIO_NOTIFY_KAFKA_SASL_MECHANISM (plain*|sha256|sha512) sasl authentication mechanism, default 'plain' -MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH (string) clientAuth determines the Kafka server's policy for TLS client auth -MINIO_NOTIFY_KAFKA_SASL (on|off) set to 'on' to enable SASL authentication -MINIO_NOTIFY_KAFKA_TLS (on|off) set to 'on' to enable TLS -MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY (on|off) trust server TLS without verification, defaults to "on" (verify) -MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT (path) path to client certificate for mTLS auth -MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY (path) path to client key for mTLS auth -MINIO_NOTIFY_KAFKA_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events' -MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '100000' -MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting -MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0' +MINIO_NOTIFY_KAFKA_ENABLE* (on|off) enable notify_kafka target, default is 'off' +MINIO_NOTIFY_KAFKA_BROKERS* (csv) comma separated list of Kafka broker addresses +MINIO_NOTIFY_KAFKA_TOPIC (string) Kafka topic used for bucket notifications +MINIO_NOTIFY_KAFKA_SASL_USERNAME (string) username for SASL/PLAIN or SASL/SCRAM authentication +MINIO_NOTIFY_KAFKA_SASL_PASSWORD (string) password for SASL/PLAIN or SASL/SCRAM authentication +MINIO_NOTIFY_KAFKA_SASL_MECHANISM (plain*|sha256|sha512) sasl authentication mechanism, default 'plain' +MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH (string) clientAuth determines the Kafka server's policy for TLS client auth +MINIO_NOTIFY_KAFKA_SASL (on|off) set to 'on' to enable SASL authentication +MINIO_NOTIFY_KAFKA_TLS (on|off) set to 'on' to enable TLS +MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY (on|off) trust server TLS without verification, defaults to "on" (verify) +MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT (path) path to client certificate for mTLS auth +MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY (path) path to client key for mTLS auth +MINIO_NOTIFY_KAFKA_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events' +MINIO_NOTIFY_KAFKA_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '100000' +MINIO_NOTIFY_KAFKA_COMMENT (sentence) optionally add a comment to this setting +MINIO_NOTIFY_KAFKA_VERSION (string) specify the version of the Kafka cluster e.g. '2.2.0' +MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC (none|snappy|gzip|lz4|zstd) compression codec for producer messages +MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL (number) compression level for producer messages, defaults to '0' ``` To update the configuration, use `mc admin config get` command to get the current configuration. diff --git a/internal/config/notify/help.go b/internal/config/notify/help.go index d327d6cb1..00186f8a7 100644 --- a/internal/config/notify/help.go +++ b/internal/config/notify/help.go @@ -262,6 +262,18 @@ var ( Optional: true, Type: "sentence", }, + config.HelpKV{ + Key: target.KafkaCompressionCodec, + Description: "specify compression_codec of the Kafka cluster", + Optional: true, + Type: "none|snappy|gzip|lz4|zstd", + }, + config.HelpKV{ + Key: target.KafkaCompressionLevel, + Description: "specify compression level of the Kafka cluster", + Optional: true, + Type: "number", + }, } HelpMQTT = config.HelpKVS{ diff --git a/internal/config/notify/legacy.go b/internal/config/notify/legacy.go index c92d9d801..39ce33f99 100644 --- a/internal/config/notify/legacy.go +++ b/internal/config/notify/legacy.go @@ -95,6 +95,14 @@ func SetNotifyKafka(s config.Config, name string, cfg target.KafkaArgs) error { Key: target.KafkaSASLPassword, Value: cfg.SASL.Password, }, + config.KV{ + Key: target.KafkaCompressionCodec, + Value: cfg.Producer.Compression, + }, + config.KV{ + Key: target.KafkaCompressionLevel, + Value: strconv.Itoa(cfg.Producer.CompressionLevel), + }, } return nil } diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index b8faa2264..34a8c6b7a 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -366,6 +366,14 @@ var ( Key: target.KafkaBatchSize, Value: "0", }, + config.KV{ + Key: target.KafkaCompressionCodec, + Value: "", + }, + config.KV{ + Key: target.KafkaCompressionLevel, + Value: "", + }, } ) @@ -483,6 +491,19 @@ func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert)) kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey)) + compressionCodecEnv := target.EnvKafkaProducerCompressionCodec + if k != config.Default { + compressionCodecEnv = compressionCodecEnv + config.Default + k + } + kafkaArgs.Producer.Compression = env.Get(compressionCodecEnv, kv.Get(target.KafkaCompressionCodec)) + + compressionLevelEnv := target.EnvKafkaProducerCompressionLevel + if k != config.Default { + compressionLevelEnv = compressionLevelEnv + config.Default + k + } + compressionLevel, _ := strconv.Atoi(env.Get(compressionLevelEnv, kv.Get(target.KafkaCompressionLevel))) + kafkaArgs.Producer.CompressionLevel = compressionLevel + saslEnableEnv := target.EnvKafkaSASLEnable if k != config.Default { saslEnableEnv = saslEnableEnv + config.Default + k diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 0e15113e6..77ef8c335 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -28,6 +28,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "sync" "time" @@ -43,40 +44,52 @@ import ( // Kafka input constants const ( - KafkaBrokers = "brokers" - KafkaTopic = "topic" - KafkaQueueDir = "queue_dir" - KafkaQueueLimit = "queue_limit" - KafkaTLS = "tls" - KafkaTLSSkipVerify = "tls_skip_verify" - KafkaTLSClientAuth = "tls_client_auth" - KafkaSASL = "sasl" - KafkaSASLUsername = "sasl_username" - KafkaSASLPassword = "sasl_password" - KafkaSASLMechanism = "sasl_mechanism" - KafkaClientTLSCert = "client_tls_cert" - KafkaClientTLSKey = "client_tls_key" - KafkaVersion = "version" - KafkaBatchSize = "batch_size" + KafkaBrokers = "brokers" + KafkaTopic = "topic" + KafkaQueueDir = "queue_dir" + KafkaQueueLimit = "queue_limit" + KafkaTLS = "tls" + KafkaTLSSkipVerify = "tls_skip_verify" + KafkaTLSClientAuth = "tls_client_auth" + KafkaSASL = "sasl" + KafkaSASLUsername = "sasl_username" + KafkaSASLPassword = "sasl_password" + KafkaSASLMechanism = "sasl_mechanism" + KafkaClientTLSCert = "client_tls_cert" + KafkaClientTLSKey = "client_tls_key" + KafkaVersion = "version" + KafkaBatchSize = "batch_size" + KafkaCompressionCodec = "compression_codec" + KafkaCompressionLevel = "compression_level" - EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" - EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" - EnvKafkaTopic = "MINIO_NOTIFY_KAFKA_TOPIC" - EnvKafkaQueueDir = "MINIO_NOTIFY_KAFKA_QUEUE_DIR" - EnvKafkaQueueLimit = "MINIO_NOTIFY_KAFKA_QUEUE_LIMIT" - EnvKafkaTLS = "MINIO_NOTIFY_KAFKA_TLS" - EnvKafkaTLSSkipVerify = "MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY" - EnvKafkaTLSClientAuth = "MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH" - EnvKafkaSASLEnable = "MINIO_NOTIFY_KAFKA_SASL" - EnvKafkaSASLUsername = "MINIO_NOTIFY_KAFKA_SASL_USERNAME" - EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD" - EnvKafkaSASLMechanism = "MINIO_NOTIFY_KAFKA_SASL_MECHANISM" - EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT" - EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" - EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" - EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE" + EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE" + EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS" + EnvKafkaTopic = "MINIO_NOTIFY_KAFKA_TOPIC" + EnvKafkaQueueDir = "MINIO_NOTIFY_KAFKA_QUEUE_DIR" + EnvKafkaQueueLimit = "MINIO_NOTIFY_KAFKA_QUEUE_LIMIT" + EnvKafkaTLS = "MINIO_NOTIFY_KAFKA_TLS" + EnvKafkaTLSSkipVerify = "MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY" + EnvKafkaTLSClientAuth = "MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH" + EnvKafkaSASLEnable = "MINIO_NOTIFY_KAFKA_SASL" + EnvKafkaSASLUsername = "MINIO_NOTIFY_KAFKA_SASL_USERNAME" + EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD" + EnvKafkaSASLMechanism = "MINIO_NOTIFY_KAFKA_SASL_MECHANISM" + EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT" + EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY" + EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION" + EnvKafkaBatchSize = "MINIO_NOTIFY_KAFKA_BATCH_SIZE" + EnvKafkaProducerCompressionCodec = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_CODEC" + EnvKafkaProducerCompressionLevel = "MINIO_NOTIFY_KAFKA_PRODUCER_COMPRESSION_LEVEL" ) +var codecs = map[string]sarama.CompressionCodec{ + "none": sarama.CompressionNone, + "gzip": sarama.CompressionGZIP, + "snappy": sarama.CompressionSnappy, + "lz4": sarama.CompressionLZ4, + "zstd": sarama.CompressionZSTD, +} + // KafkaArgs - Kafka target arguments. type KafkaArgs struct { Enable bool `json:"enable"` @@ -100,6 +113,10 @@ type KafkaArgs struct { Password string `json:"password"` Mechanism string `json:"mechanism"` } `json:"sasl"` + Producer struct { + Compression string `json:"compression"` + CompressionLevel int `json:"compressionLevel"` + } `json:"producer"` } // Validate KafkaArgs fields @@ -391,6 +408,13 @@ func (target *KafkaTarget) initKafka() error { config.Producer.Return.Errors = true config.Producer.RequiredAcks = 1 config.Producer.Timeout = (5 * time.Second) + // Set Producer Compression + cc, ok := codecs[strings.ToLower(args.Producer.Compression)] + if ok { + config.Producer.Compression = cc + config.Producer.CompressionLevel = args.Producer.CompressionLevel + } + config.Net.ReadTimeout = (5 * time.Second) config.Net.DialTimeout = (5 * time.Second) config.Net.WriteTimeout = (5 * time.Second)