From 4f542af29a76b9ce4c97e8b77be1a4d4577313b0 Mon Sep 17 00:00:00 2001 From: Benoit Debaenst Date: Fri, 2 Sep 2022 11:32:01 +0200 Subject: [PATCH 1/2] Add latest kafka versions --- .gitignore | 2 ++ app/controllers/Logkafka.scala | 8 ++++++++ app/controllers/Topic.scala | 6 ++++++ .../manager/actor/cluster/KafkaStateActor.scala | 2 +- app/kafka/manager/model/model.scala | 12 +++++++++++- app/kafka/manager/utils/LogkafkaNewConfigs.scala | 4 +++- app/kafka/manager/utils/TopicConfigs.scala | 4 +++- build.sbt | 2 +- sbt | 3 +++ test/kafka/manager/TestKafkaManagerActor.scala | 14 +++++++------- test/kafka/manager/model/KafkaVersionTest.scala | 8 ++++++-- test/kafka/manager/utils/TestClusterConfig.scala | 16 ++++++++++++++++ 12 files changed, 67 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index bf996f213..4980384dc 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ RUNNING_PID .DS_Store *.log *.swp +.metals +.vscode \ No newline at end of file diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index cd755d17e..5eb73c8bc 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -125,6 +125,10 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_3_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_3_1_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_3_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_2_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_2_1_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -199,6 +203,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext) case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext) case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) } } } @@ -317,6 +323,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_8_1 => LogkafkaNewConfigs.configNames(Kafka_2_8_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_0_0 => LogkafkaNewConfigs.configNames(Kafka_3_0_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_1_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_2_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_2_1 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index de5e4f62b..80bd88cf1 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -81,6 +81,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager val kafka_2_8_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val defaultCreateForm = Form( mapping( @@ -197,6 +199,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext) case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext) case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) } } } @@ -461,6 +465,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_8_1 => TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_0_0 => TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_1_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_2_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_2_1 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } } val updatedConfigMap = ti.config.toMap val updatedConfigList = defaultConfigs.map { diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 95f614a05..ae828f66a 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_2_0, Kafka_3_2_1) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 538da39fb..7405484ca 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -156,6 +156,14 @@ case object Kafka_3_1_0 extends KafkaVersion { override def toString = "3.1.0" } +case object Kafka_3_2_0 extends KafkaVersion { + override def toString = "3.2.0" +} + +case object Kafka_3_2_1 extends KafkaVersion { + override def toString = "3.2.1" +} + object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, @@ -194,7 +202,9 @@ object KafkaVersion { "2.8.0" -> Kafka_2_8_0, "2.8.1" -> Kafka_2_8_1, "3.0.0" -> Kafka_3_0_0, - "3.1.0" -> Kafka_3_1_0 + "3.1.0" -> Kafka_3_1_0, + "3.1.0" -> Kafka_3_2_0, + "3.1.0" -> Kafka_3_2_1 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 29b57bd57..271449a6b 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -53,7 +53,9 @@ object LogkafkaNewConfigs { Kafka_2_8_0 -> logkafka82.LogConfig, Kafka_2_8_1 -> logkafka82.LogConfig, Kafka_3_0_0 -> logkafka82.LogConfig, - Kafka_3_1_0 -> logkafka82.LogConfig + Kafka_3_1_0 -> logkafka82.LogConfig, + Kafka_3_2_0 -> logkafka82.LogConfig, + Kafka_3_2_1 -> logkafka82.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index 183c4e8e9..53614bfa9 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -56,7 +56,9 @@ object TopicConfigs { Kafka_2_8_0 -> two40.LogConfig, Kafka_2_8_1 -> two40.LogConfig, Kafka_3_0_0 -> two40.LogConfig, - Kafka_3_1_0 -> two40.LogConfig + Kafka_3_1_0 -> two40.LogConfig, + Kafka_3_2_0 -> two40.LogConfig, + Kafka_3_2_1 -> two40.LogConfig ) def configNames(version: KafkaVersion): Seq[String] = { diff --git a/build.sbt b/build.sbt index a9a33b0bd..4612ff9e6 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ name := """cmak""" /* For packaging purposes, -SNAPSHOT MUST contain a digit */ -version := "3.0.0.6" +version := "3.0.0.6-1" scalaVersion := "2.12.10" diff --git a/sbt b/sbt index 60b4daf8c..454e10075 100755 --- a/sbt +++ b/sbt @@ -9,6 +9,7 @@ set -o pipefail declare -r sbt_release_version="1.3.8" declare -r sbt_unreleased_version="1.3.8" +declare -r latest_32="3.2.1" declare -r latest_31="3.1.0" declare -r latest_30="3.0.3" declare -r latest_213="2.13.8" @@ -400,6 +401,7 @@ are not special. -213 use $latest_213 -30 use $latest_30 -31 use $latest_31 + -31 use $latest_32 -scala-home use the scala build at the specified directory -scala-version use the specified version of scala -binary-version use the specified scala version when searching for dependencies @@ -475,6 +477,7 @@ process_args() { -213) setScalaVersion "$latest_213" && shift ;; -30) setScalaVersion "$latest_30" && shift ;; -31) setScalaVersion "$latest_31" && shift ;; + -32) setScalaVersion "$latest_32" && shift ;; -scala-version) require_arg version "$1" "$2" && setScalaVersion "$2" && shift 2 ;; -binary-version) require_arg version "$1" "$2" && setThisBuild scalaBinaryVersion "\"$2\"" && shift 2 ;; diff --git a/test/kafka/manager/TestKafkaManagerActor.scala b/test/kafka/manager/TestKafkaManagerActor.scala index f82ad92e5..a03498e0f 100644 --- a/test/kafka/manager/TestKafkaManagerActor.scala +++ b/test/kafka/manager/TestKafkaManagerActor.scala @@ -69,7 +69,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("add cluster") { - val cc = ClusterConfig("dev","3.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc = ClusterConfig("dev","3.2.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -80,7 +80,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -112,7 +112,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -139,7 +139,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { println(result) result.msg.contains("dev") } - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -156,7 +156,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -168,7 +168,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { test("update cluster tuning") { val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1) - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(newTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None ) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => @@ -185,7 +185,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster security protocol") { - val cc2 = ClusterConfig("dev","3.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val cc2 = ClusterConfig("dev","3.2.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) diff --git a/test/kafka/manager/model/KafkaVersionTest.scala b/test/kafka/manager/model/KafkaVersionTest.scala index dc1a5fd1a..f3a92e63a 100644 --- a/test/kafka/manager/model/KafkaVersionTest.scala +++ b/test/kafka/manager/model/KafkaVersionTest.scala @@ -48,7 +48,9 @@ class KafkaVersionTest extends FunSuite { "2.8.0" -> Kafka_2_8_0, "2.8.1" -> Kafka_2_8_1, "3.0.0" -> Kafka_3_0_0, - "3.1.0" -> Kafka_3_1_0 + "3.1.0" -> Kafka_3_1_0, + "3.2.0" -> Kafka_3_2_0, + "3.2.1" -> Kafka_3_2_1 ) test("apply method: supported version.") { @@ -103,7 +105,9 @@ class KafkaVersionTest extends FunSuite { ("2.8.0","2.8.0"), ("2.8.1","2.8.1"), ("3.0.0","3.0.0"), - ("3.1.0","3.1.0") + ("3.1.0","3.1.0"), + ("3.2.0","3.2.0"), + ("3.2.1","3.2.1") ) assertResult(expected)(KafkaVersion.formSelectList) } diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 0109b0ee0..eb3969c19 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -341,4 +341,20 @@ class TestClusterConfig extends FunSuite with Matchers { assert(cc == deserialize.get) } + test("serialize and deserialize 3.2.0") { + val cc = ClusterConfig("qa", "3.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + + test("serialize and deserialize 3.2.1") { + val cc = ClusterConfig("qa", "3.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + } From d660d1c4132352e77174452a7e254417bc2279da Mon Sep 17 00:00:00 2001 From: Benoit Debaenst Date: Fri, 2 Sep 2022 15:28:42 +0200 Subject: [PATCH 2/2] Add kafka versions --- app/controllers/Logkafka.scala | 12 ++++++++---- app/controllers/Topic.scala | 11 +++++++---- .../manager/actor/cluster/KafkaStateActor.scala | 2 +- app/kafka/manager/model/model.scala | 9 +++++++-- app/kafka/manager/utils/LogkafkaNewConfigs.scala | 1 + app/kafka/manager/utils/TopicConfigs.scala | 1 + sbt | 2 +- test/kafka/manager/model/KafkaVersionTest.scala | 2 ++ test/kafka/manager/utils/TestClusterConfig.scala | 8 ++++++++ 9 files changed, 36 insertions(+), 12 deletions(-) diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index 5eb73c8bc..f93305f50 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -125,6 +125,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_3_0_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_3_1_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_3_1_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_1_1_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_3_2_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_3_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_3_2_1_Default = CreateLogkafka("","", @@ -203,8 +205,9 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext) case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext) case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) - case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) - case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext) + case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext) + case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_2_1_Default), clusterContext) } } } @@ -323,8 +326,9 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_8_1 => LogkafkaNewConfigs.configNames(Kafka_2_8_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_0_0 => LogkafkaNewConfigs.configNames(Kafka_3_0_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_1_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap - case Kafka_3_2_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap - case Kafka_3_2_1 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_1_1 => LogkafkaNewConfigs.configNames(Kafka_3_1_1).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_2_0 => LogkafkaNewConfigs.configNames(Kafka_3_2_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_2_1 => LogkafkaNewConfigs.configNames(Kafka_3_2_1).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index 80bd88cf1..ab925b599 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -81,6 +81,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager val kafka_2_8_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_0_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) @@ -199,8 +200,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext) case Kafka_3_0_0 => (defaultCreateForm.fill(kafka_3_0_0_Default), clusterContext) case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) - case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) - case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) + case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext) + case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext) + case Kafka_3_2_1 => (defaultCreateForm.fill(kafka_3_2_1_Default), clusterContext) } } } @@ -465,8 +467,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_8_1 => TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_0_0 => TopicConfigs.configNamesAndDoc(Kafka_3_0_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_1_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } - case Kafka_3_2_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } - case Kafka_3_2_1 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_1_1 => TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_2_0 => TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_2_1 => TopicConfigs.configNamesAndDoc(Kafka_3_2_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } } val updatedConfigMap = ti.config.toMap val updatedConfigList = defaultConfigs.map { diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index ae828f66a..682cc9f2c 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_2_0, Kafka_3_2_1) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_1_1, Kafka_3_2_0, Kafka_3_2_1) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 7405484ca..585dfd0ea 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -156,6 +156,10 @@ case object Kafka_3_1_0 extends KafkaVersion { override def toString = "3.1.0" } +case object Kafka_3_1_1 extends KafkaVersion { + override def toString = "3.1.1" +} + case object Kafka_3_2_0 extends KafkaVersion { override def toString = "3.2.0" } @@ -203,8 +207,9 @@ object KafkaVersion { "2.8.1" -> Kafka_2_8_1, "3.0.0" -> Kafka_3_0_0, "3.1.0" -> Kafka_3_1_0, - "3.1.0" -> Kafka_3_2_0, - "3.1.0" -> Kafka_3_2_1 + "3.1.1" -> Kafka_3_1_1, + "3.2.0" -> Kafka_3_2_0, + "3.2.1" -> Kafka_3_2_1 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 271449a6b..f65470422 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -54,6 +54,7 @@ object LogkafkaNewConfigs { Kafka_2_8_1 -> logkafka82.LogConfig, Kafka_3_0_0 -> logkafka82.LogConfig, Kafka_3_1_0 -> logkafka82.LogConfig, + Kafka_3_1_1 -> logkafka82.LogConfig, Kafka_3_2_0 -> logkafka82.LogConfig, Kafka_3_2_1 -> logkafka82.LogConfig ) diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index 53614bfa9..f1b89db0b 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -57,6 +57,7 @@ object TopicConfigs { Kafka_2_8_1 -> two40.LogConfig, Kafka_3_0_0 -> two40.LogConfig, Kafka_3_1_0 -> two40.LogConfig, + Kafka_3_1_1 -> two40.LogConfig, Kafka_3_2_0 -> two40.LogConfig, Kafka_3_2_1 -> two40.LogConfig ) diff --git a/sbt b/sbt index 454e10075..b91f3b7e4 100755 --- a/sbt +++ b/sbt @@ -10,7 +10,7 @@ declare -r sbt_release_version="1.3.8" declare -r sbt_unreleased_version="1.3.8" declare -r latest_32="3.2.1" -declare -r latest_31="3.1.0" +declare -r latest_31="3.1.1" declare -r latest_30="3.0.3" declare -r latest_213="2.13.8" declare -r latest_212="2.12.15" diff --git a/test/kafka/manager/model/KafkaVersionTest.scala b/test/kafka/manager/model/KafkaVersionTest.scala index f3a92e63a..38c649f09 100644 --- a/test/kafka/manager/model/KafkaVersionTest.scala +++ b/test/kafka/manager/model/KafkaVersionTest.scala @@ -49,6 +49,7 @@ class KafkaVersionTest extends FunSuite { "2.8.1" -> Kafka_2_8_1, "3.0.0" -> Kafka_3_0_0, "3.1.0" -> Kafka_3_1_0, + "3.1.1" -> Kafka_3_1_1, "3.2.0" -> Kafka_3_2_0, "3.2.1" -> Kafka_3_2_1 ) @@ -106,6 +107,7 @@ class KafkaVersionTest extends FunSuite { ("2.8.1","2.8.1"), ("3.0.0","3.0.0"), ("3.1.0","3.1.0"), + ("3.1.1","3.1.1"), ("3.2.0","3.2.0"), ("3.2.1","3.2.1") ) diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index eb3969c19..cc83043b2 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -341,6 +341,14 @@ class TestClusterConfig extends FunSuite with Matchers { assert(cc == deserialize.get) } + test("serialize and deserialize 3.1.1") { + val cc = ClusterConfig("qa", "3.1.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + test("serialize and deserialize 3.2.0") { val cc = ClusterConfig("qa", "3.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) val serialize: String = ClusterConfig.serialize(cc)