diff --git a/pom.xml b/pom.xml
index 4345210..a734d88 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,6 +11,12 @@
2.24.1
github
+ 3.3.2
+ 214
+ 1.1.5
+ UTF-8
+ 11
+ 2.18.0
@@ -48,6 +54,26 @@
commons-codec
1.17.1
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+ io.airlift
+ log
+ ${airlift.version}
+
+
+ software.amazon.msk
+ aws-msk-iam-auth
+ ${aws-msk-iam-auth.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
@@ -95,4 +121,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/assembly/plugin.conf b/src/main/assembly/plugin.conf
index fbb54de..50d9cc7 100644
--- a/src/main/assembly/plugin.conf
+++ b/src/main/assembly/plugin.conf
@@ -56,3 +56,32 @@ enable_compute_all_query_digest=false
# Filter conditions when importing audit information
filter=
+
+
+### kafka configuration
+
+# Set to True if you want to enable Kafka process
+kafka_enableMSK=false
+
+# Name of current Starrocks instance/cluster
+kafka_instanceName=
+
+# Name of the kafka topic that data will be written.
+kafka_topic=
+
+# String with all bootstrap servers related to the Kafka cluster.
+kafka_bootstrapServer=
+
+# Properties for kafka producer
+kafka_conf_keySerializer=org.apache.kafka.common.serialization.StringSerializer
+kafka_conf_valueSerializer=org.apache.kafka.common.serialization.StringSerializer
+kafka_conf_maxRequestSize=52428800
+kafka_conf_bufferMemory=36700160
+kafka_conf_maxBlockMs=180000
+kafka_conf_batchSize=102400
+kafka_conf_compressionType=snappy
+kafka_conf_lingerMs=20
+kafka_conf_securityProtocol=SASL_SSL
+kafka_conf_saslMechanism=AWS_MSK_IAM
+kafka_conf_saslJaasConfig=software.amazon.msk.auth.iam.IAMLoginModule required;
+kafka_conf_saslClientCallbackHandlerClass=software.amazon.msk.auth.iam.IAMClientCallbackHandler
diff --git a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java
index 3007828..03ffe50 100644
--- a/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java
+++ b/src/main/java/com/starrocks/plugin/audit/AuditLoaderPlugin.java
@@ -52,6 +52,16 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.starrocks.plugin.audit.model.QueryEventData;
+import java.util.concurrent.ExecutionException;
+
/*
* This plugin will load audit log to specified starrocks table at specified interval
*/
@@ -70,6 +80,8 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
private volatile boolean isClosed = false;
private volatile boolean isInit = false;
+ public static boolean kafkaEnable = false;
+
/**
* 是否包含新字段 candidateMvs,如果旧版本没有该字段则值为空
*/
@@ -329,6 +341,23 @@ public static class AuditLoaderConf {
public static final String PROP_SECRET_KEY = "secret_key";
public String secretKey = "";
+ public static String PROP_KAFKA_ENABLE = "kafka_enableMSK";
+ public static String PROP_KAFKA_INSTANCE_NAME = "kafka_instanceName";
+ public static String PROP_KAFKA_TOPIC = "kafka_topic";
+ public static String PROP_KAFKA_BOOTSTRAP_SERVERS = "kafka_bootstrapServer";
+ public static String PROP_KAFKA_CONF_KEYSERIALIZER = "kafka_conf_keySerializer";
+ public static String PROP_KAFKA_CONF_VALUESERIALIZER = "kafka_conf_valueSerializer";
+ public static String PROP_KAFKA_CONF_MAXREQUESTSIZE = "kafka_conf_maxRequestSize";
+ public static String PROP_KAFKA_CONF_BUFFERMEMORY = "kafka_conf_bufferMemory";
+ public static String PROP_KAFKA_CONF_MAXBLOCKMS = "kafka_conf_maxBlockMs";
+ public static String PROP_KAFKA_CONF_BATCHSIZE = "kafka_conf_batchSize";
+ public static String PROP_KAFKA_CONF_COMPRESSIONTYPE = "kafka_conf_compressionType";
+ public static String PROP_KAFKA_CONF_LINGERMS = "kafka_conf_lingerMs";
+ public static String PROP_KAFKA_CONF_SECURITYPROTOCOL = "kafka_conf_securityProtocol";
+ public static String PROP_KAFKA_CONF_SASLMECHANISM = "kafka_conf_saslMechanism";
+ public static String PROP_KAFKA_CONF_SASLJAASCONFIG = "kafka_conf_saslJaasConfig";
+ public static String PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS = "kafka_conf_saslClientCallbackHandlerClass";
+
public void init(Map properties) throws PluginException {
try {
if (properties.containsKey(PROP_MAX_BATCH_SIZE)) {
@@ -373,6 +402,54 @@ public void init(Map properties) throws PluginException {
if (properties.containsKey(STREAM_LOAD_FILTER)) {
streamLoadFilter = properties.get(STREAM_LOAD_FILTER);
}
+ if (properties.containsKey(PROP_KAFKA_ENABLE)) {
+ kafkaEnable = Boolean.parseBoolean(properties.get(PROP_KAFKA_ENABLE));
+ }
+ if (properties.containsKey(PROP_KAFKA_INSTANCE_NAME)) {
+ PROP_KAFKA_INSTANCE_NAME = properties.get(PROP_KAFKA_INSTANCE_NAME);
+ }
+ if (properties.containsKey(PROP_KAFKA_TOPIC)) {
+ PROP_KAFKA_TOPIC = properties.get(PROP_KAFKA_TOPIC);
+ }
+ if (properties.containsKey(PROP_KAFKA_BOOTSTRAP_SERVERS)) {
+ PROP_KAFKA_BOOTSTRAP_SERVERS = properties.get(PROP_KAFKA_BOOTSTRAP_SERVERS);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_KEYSERIALIZER)) {
+ PROP_KAFKA_CONF_KEYSERIALIZER = properties.get(PROP_KAFKA_CONF_KEYSERIALIZER);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_VALUESERIALIZER)) {
+ PROP_KAFKA_CONF_VALUESERIALIZER = properties.get(PROP_KAFKA_CONF_VALUESERIALIZER);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_MAXREQUESTSIZE)) {
+ PROP_KAFKA_CONF_MAXREQUESTSIZE = properties.get(PROP_KAFKA_CONF_MAXREQUESTSIZE);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_BUFFERMEMORY)) {
+ PROP_KAFKA_CONF_BUFFERMEMORY = properties.get(PROP_KAFKA_CONF_BUFFERMEMORY);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_MAXBLOCKMS)) {
+ PROP_KAFKA_CONF_MAXBLOCKMS = properties.get(PROP_KAFKA_CONF_MAXBLOCKMS);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_BATCHSIZE)) {
+ PROP_KAFKA_CONF_BATCHSIZE = properties.get(PROP_KAFKA_CONF_BATCHSIZE);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_COMPRESSIONTYPE)) {
+ PROP_KAFKA_CONF_COMPRESSIONTYPE = properties.get(PROP_KAFKA_CONF_COMPRESSIONTYPE);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_LINGERMS)) {
+ PROP_KAFKA_CONF_LINGERMS = properties.get(PROP_KAFKA_CONF_LINGERMS);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_SECURITYPROTOCOL)) {
+ PROP_KAFKA_CONF_SECURITYPROTOCOL = properties.get(PROP_KAFKA_CONF_SECURITYPROTOCOL);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_SASLMECHANISM)) {
+ PROP_KAFKA_CONF_SASLMECHANISM = properties.get(PROP_KAFKA_CONF_SASLMECHANISM);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_SASLJAASCONFIG)) {
+ PROP_KAFKA_CONF_SASLJAASCONFIG = properties.get(PROP_KAFKA_CONF_SASLJAASCONFIG);
+ }
+ if (properties.containsKey(PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS)) {
+ PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS = properties.get(PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS);
+ }
} catch (Exception e) {
throw new PluginException(e.getMessage());
}
@@ -392,6 +469,9 @@ public void run() {
AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS);
if (event != null) {
assembleAudit(event);
+ if (kafkaEnable) {
+ sendToKafka(event);
+ }
}
loadIfNecessary(loader);
} catch (InterruptedException ie) {
@@ -410,4 +490,84 @@ public static synchronized String longToTimeString(long timeStamp) {
return DATETIME_FORMAT.format(new Date(timeStamp));
}
+ public void sendToKafka(AuditEvent event){
+
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", AuditLoaderConf.PROP_KAFKA_BOOTSTRAP_SERVERS);
+ properties.setProperty("key.serializer", AuditLoaderConf.PROP_KAFKA_CONF_KEYSERIALIZER);
+ properties.setProperty("value.serializer", AuditLoaderConf.PROP_KAFKA_CONF_VALUESERIALIZER);
+ properties.setProperty("max.request.size", AuditLoaderConf.PROP_KAFKA_CONF_MAXREQUESTSIZE);
+ properties.setProperty("buffer.memory", AuditLoaderConf.PROP_KAFKA_CONF_BUFFERMEMORY);
+ properties.setProperty("max.block.ms", AuditLoaderConf.PROP_KAFKA_CONF_MAXBLOCKMS);
+ properties.setProperty("batch.size", AuditLoaderConf.PROP_KAFKA_CONF_BATCHSIZE);
+ properties.setProperty("compression.type", AuditLoaderConf.PROP_KAFKA_CONF_COMPRESSIONTYPE);
+ properties.setProperty("linger.ms", AuditLoaderConf.PROP_KAFKA_CONF_LINGERMS);
+ properties.setProperty("security.protocol",AuditLoaderConf.PROP_KAFKA_CONF_SECURITYPROTOCOL);
+ properties.setProperty("sasl.mechanism",AuditLoaderConf.PROP_KAFKA_CONF_SASLMECHANISM);
+ properties.setProperty("sasl.jaas.config",AuditLoaderConf.PROP_KAFKA_CONF_SASLJAASCONFIG);
+ properties.setProperty("sasl.client.callback.handler.class",AuditLoaderConf.PROP_KAFKA_CONF_SASLCLIENTCALLBACKHANDLERCLASS);
+
+ String queryType = getQueryType(event);
+ String eventAuditId = getQueryId(queryType,event);
+
+ QueryEventData eventAuditEG = new QueryEventData();
+ eventAuditEG.setId(eventAuditId);
+ eventAuditEG.setInstanceName(AuditLoaderConf.PROP_KAFKA_INSTANCE_NAME);
+ eventAuditEG.setTimestamp(longToTimeString(event.timestamp));
+ eventAuditEG.setQueryType(queryType);
+ eventAuditEG.setClientIp(event.clientIp);
+ eventAuditEG.setUser(event.user);
+ eventAuditEG.setAuthorizedUser(event.authorizedUser);
+ eventAuditEG.setResourceGroup(event.resourceGroup);
+ eventAuditEG.setCatalog(event.catalog);
+ eventAuditEG.setDb(event.db);
+ eventAuditEG.setState(event.state);
+ eventAuditEG.setErrorCode(event.errorCode);
+ eventAuditEG.setQueryTime(event.queryTime);
+ eventAuditEG.setScanBytes(event.scanBytes);
+ eventAuditEG.setScanRows(event.scanRows);
+ eventAuditEG.setReturnRows(event.returnRows);
+ eventAuditEG.setCpuCostNs(event.cpuCostNs);
+ eventAuditEG.setMemCostBytes(event.memCostBytes);
+ eventAuditEG.setStmtId(event.stmtId);
+ eventAuditEG.setIsQuery(event.isQuery ? true : false);
+ eventAuditEG.setFeIp(event.feIp);
+ eventAuditEG.setStmt(truncateByBytes(event.stmt));
+ if (conf.enableComputeAllQueryDigest && (event.digest == null || StringUtils.isBlank(event.digest))) {
+ event.digest = computeStatementDigest(event.stmt);
+ LOG.debug("compute stmt digest, queryId: {} digest: {}", event.queryId, event.digest);
+ }
+ eventAuditEG.setDigest(event.digest);
+ eventAuditEG.setPlanCpuCosts(event.planCpuCosts);
+ eventAuditEG.setPlanMemCosts(event.planMemCosts);
+ eventAuditEG.setCandidateMvs(event.candidateMvs);
+ eventAuditEG.setHitMVs(event.hitMVs);
+
+ ObjectMapper mapperEventAuditEG = new ObjectMapper();
+ mapperEventAuditEG.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+
+ try {
+ Producer producer = new KafkaProducer<>(properties);
+ Future res = producer.send(
+ new ProducerRecord<>(
+ AuditLoaderConf.PROP_KAFKA_TOPIC,
+ eventAuditId,
+ mapperEventAuditEG.writeValueAsString(eventAuditEG)));
+ try {
+ RecordMetadata metadata = res.get();
+ if (metadata.hasOffset()){
+ LOG.info("Query created event with id: " + eventAuditId + " in partition: "+ String.valueOf(metadata.partition()) + " with offset: " + metadata.offset());
+ } else {
+ LOG.error("Query created event with id: " + eventAuditId + " doesn't have offset. It wasn't sent to the topic. ");
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error(String.format("Query id: "+ eventAuditId + " Not written to kafka topic - Error of interrupted execution on sendToKafka method: %s", e.getMessage()));
+ }
+ producer.close();
+ } catch (Exception e) {
+ LOG.error(String.format("Error on sending to kafka: %s", e.getMessage()));
+ }
+
+ }
+
}
diff --git a/src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java b/src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java
new file mode 100644
index 0000000..b820ed4
--- /dev/null
+++ b/src/main/java/com/starrocks/plugin/audit/model/QueryEventData.java
@@ -0,0 +1,147 @@
+package com.starrocks.plugin.audit.model;
+
+public class QueryEventData {
+
+ public String id;
+ public String instanceName;
+ public String timestamp;
+ public String queryType;
+ public String clientIp;
+ public String user;
+ public String authorizedUser;
+ public String resourceGroup;
+ public String catalog;
+ public String db;
+ public String state;
+ public String errorCode;
+ public long queryTime;
+ public long scanBytes;
+ public long scanRows;
+ public long returnRows;
+ public long cpuCostNs;
+ public long memCostBytes;
+ public long stmtId;
+ public boolean isQuery;
+ public String feIp;
+ public String stmt;
+ public String digest;
+ public double planCpuCosts;
+ public double planMemCosts;
+ public String candidateMvs;
+ public String hitMVs;
+
+ public QueryEventData() {
+
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public void setQueryType(String queryType) {
+ this.queryType = queryType;
+ }
+
+ public void setClientIp(String clientIp) {
+ this.clientIp = clientIp;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setAuthorizedUser(String authorizedUser) {
+ this.authorizedUser = authorizedUser;
+ }
+
+ public void setResourceGroup(String resourceGroup) {
+ this.resourceGroup = resourceGroup;
+ }
+
+ public void setCatalog(String catalog) {
+ this.catalog = catalog;
+ }
+
+ public void setDb(String db) {
+ this.db = db;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public void setErrorCode(String errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ public void setQueryTime(long queryTime) {
+ this.queryTime = queryTime;
+ }
+
+ public void setScanBytes(long scanBytes) {
+ this.scanBytes = scanBytes;
+ }
+
+ public void setScanRows(long scanRows) {
+ this.scanRows = scanRows;
+ }
+
+ public void setReturnRows(long returnRows) {
+ this.returnRows = returnRows;
+ }
+
+ public void setCpuCostNs(long cpuCostNs) {
+ this.cpuCostNs = cpuCostNs;
+ }
+
+ public void setMemCostBytes(long memCostBytes) {
+ this.memCostBytes = memCostBytes;
+ }
+
+ public void setStmtId(long stmtId) {
+ this.stmtId = stmtId;
+ }
+
+ public void setIsQuery(boolean isQuery) {
+ this.isQuery = isQuery;
+ }
+
+ public void setFeIp(String feIp) {
+ this.feIp = feIp;
+ }
+
+ public void setStmt(String stmt) {
+ this.stmt = stmt;
+ }
+
+ public void setDigest(String digest) {
+ this.digest = digest;
+ }
+
+ public void setPlanCpuCosts(double planCpuCosts) {
+ this.planCpuCosts = planCpuCosts;
+ }
+
+ public void setPlanMemCosts(double planMemCosts) {
+ this.planMemCosts = planMemCosts;
+ }
+
+ public void setCandidateMvs(String candidateMvs) {
+ this.candidateMvs = candidateMvs;
+ }
+
+ public void setHitMVs(String hitMVs) {
+ this.hitMVs = hitMVs;
+ }
+
+
+
+}