diff --git a/deploy/samples/demodb.yaml b/deploy/samples/demodb.yaml index 66a6a9ab..9df388b4 100644 --- a/deploy/samples/demodb.yaml +++ b/deploy/samples/demodb.yaml @@ -15,6 +15,7 @@ metadata: name: ads-catalog-database spec: catalog: ADS_CATALOG + schema: ADS url: jdbc:demodb://names=ads dialect: Calcite @@ -68,7 +69,7 @@ metadata: name: ads-source-trigger-template spec: databases: - - ads-database + - ads-catalog-database methods: - Scan yaml: | @@ -78,7 +79,7 @@ spec: name: {{name}}-trigger spec: schema: KAFKA - table: {{offline.table.name}} + table: {{path}} jobProperties: {{job.properties}} yaml: | apiVersion: batch/v1 diff --git a/deploy/samples/logicaldb.yaml b/deploy/samples/logicaldb.yaml index 8afc423d..87aa3b65 100644 --- a/deploy/samples/logicaldb.yaml +++ b/deploy/samples/logicaldb.yaml @@ -20,6 +20,6 @@ kind: Database metadata: name: logical-offline spec: - url: jdbc:logical://nearline=ads-database;offline=profile-database + url: jdbc:logical://nearline=ads-database;offline=ads-catalog-database schema: LOGICAL_OFFLINE dialect: Calcite diff --git a/deploy/samples/retl-job-template.yaml b/deploy/samples/retl-job-template.yaml index 0643d805..912ab26b 100644 --- a/deploy/samples/retl-job-template.yaml +++ b/deploy/samples/retl-job-template.yaml @@ -4,7 +4,7 @@ metadata: name: retl-job-template namespace: default spec: - databases: ["profile-database"] + databases: ["ads-catalog-database"] yaml: | apiVersion: batch/v1 kind: Job @@ -20,7 +20,7 @@ spec: - name: TABLE_TRIGGER_NAME value: "{{trigger}}" - name: SOURCE_TABLE_NAME - value: "{{offline.table.name}}" + value: "{{path}}" - name: ONLINE_TABLE_NAME value: "{{job.properties.online.table.name}}" command: ["bash", "-c", "echo {{name}}-trigger fired at `date`"] diff --git a/hoptimator b/hoptimator index 7b2d3bbf..2c0a7dd6 100755 --- a/hoptimator +++ b/hoptimator @@ -8,5 +8,5 @@ $BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \ sqlline.SqlLine \ -ac sqlline.HoptimatorAppConfig \ --verbose \ - -u "jdbc:hoptimator://fun=mysql;hints=offline.table.name=ads_offline,job.properties.account=foo" -n "" -p "" -nn "Hoptimator" $@ + -u "jdbc:hoptimator://fun=mysql;hints=job.properties.account=foo" -n "" -p "" -nn "Hoptimator" $@ diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java index b2490e43..3565c454 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java @@ -44,6 +44,7 @@ public List specify() throws SQLException { .with("catalog", source.catalog()) .with("schema", source.schema()) .with("table", source.table()) + .with("path", source.pathString()) .with(source.options()) .with(JOB_PROPERTIES_PREFIX, getJobPropertiesFromOptions(source.options())) .with(DeploymentService.parseHints(connection.connectionProperties())); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java index d2316997..b095c4d4 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java @@ -110,6 +110,7 @@ protected V1alpha1TableTrigger toK8sObject() throws SQLException { .with("trigger", triggerName) .with("job", jobName) .with("schedule", trigger.cronSchedule()) + .with("path", source != null ? source.pathString() : null) .with("table", source != null ? source.table() : null) .with("schema", source != null ? source.schema() : null) .with("catalog", source != null ? source.catalog() : null) diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java index 9d0fa502..0dc62226 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployerTest.java @@ -595,6 +595,52 @@ void toK8sObjectBindsCatalogAsTemplateVariable() throws SQLException { "{{catalog}}.{{schema}}.{{table}} must render to OPENHOUSE.MYDB.MYTABLE — got: " + specs.get(0)); } + // ───────── toK8sObject() path derivation tests ───────── + + @Test + void toK8sObjectDerivesPathFromSourceForThreePartPath() throws SQLException { + // {{path}} should default to source.pathString() when no explicit option is set. + V1alpha1JobTemplate jobTemplate = new V1alpha1JobTemplate() + .metadata(new V1ObjectMeta().name("myjob").namespace("test-ns")) + .spec(new V1alpha1JobTemplateSpec().yaml( + "offlineTable: {{path}}")); + jobTemplates.add(jobTemplate); + + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", + Collections.emptyMap(), + new Source(null, Arrays.asList("OPENHOUSE", "MYDB", "MYTABLE"), Collections.emptyMap()), + null); + + K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); + List specs = deployer.specify(); + + assertFalse(specs.isEmpty()); + assertTrue(specs.get(0).contains("offlineTable: OPENHOUSE.MYDB.MYTABLE"), + "{{path}} must default to source.pathString() — got: " + specs.get(0)); + } + + @Test + void toK8sObjectDerivesPathFromSourceForTwoPartPath() throws SQLException { + // Same default applies for 2-part paths. + V1alpha1JobTemplate jobTemplate = new V1alpha1JobTemplate() + .metadata(new V1ObjectMeta().name("myjob").namespace("test-ns")) + .spec(new V1alpha1JobTemplateSpec().yaml( + "offlineTable: {{path}}")); + jobTemplates.add(jobTemplate); + + Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *", + Collections.emptyMap(), + new Source(null, Arrays.asList("ADS", "AD_CLICKS"), Collections.emptyMap()), + null); + + K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext); + List specs = deployer.specify(); + + assertFalse(specs.isEmpty()); + assertTrue(specs.get(0).contains("offlineTable: ADS.AD_CLICKS"), + "{{path}} must default to source.pathString() — got: " + specs.get(0)); + } + // ───────── toK8sObject() paused rendering test ───────── @Test diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java index b8a61406..d5f7403b 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java @@ -26,7 +26,7 @@ public void k8sValidationScript() throws Exception { @Test public void k8sMetadataTables() throws Exception { - run("k8s-metadata.id", "hints=offline.table.name=ads_offline"); + run("k8s-metadata.id"); } @Test diff --git a/hoptimator-k8s/src/test/resources/k8s-metadata.id b/hoptimator-k8s/src/test/resources/k8s-metadata.id index ef1d492c..b784947d 100644 --- a/hoptimator-k8s/src/test/resources/k8s-metadata.id +++ b/hoptimator-k8s/src/test/resources/k8s-metadata.id @@ -61,55 +61,63 @@ select name, failed from "k8s".pipelines order by name; !ok select name, failed from "k8s".pipeline_elements order by name; -+---------------------------------------------+--------+ -| NAME | FAILED | -+---------------------------------------------+--------+ -| FlinkSessionJob/ads-database-audience2 | false | -| FlinkSessionJob/ads-database-pages | false | -| TableTrigger/ads-database-pageviews-trigger | false | -+---------------------------------------------+--------+ -(3 rows) ++----------------------------------------+--------+ +| NAME | FAILED | ++----------------------------------------+--------+ +| FlinkSessionJob/ads-database-audience2 | false | +| FlinkSessionJob/ads-database-pages | false | ++----------------------------------------+--------+ +(2 rows) !ok select * from "k8s".pipeline_element_map order by element_name, pipeline_name; -+---------------------------------------------+---------------+ -| ELEMENT_NAME | PIPELINE_NAME | -+---------------------------------------------+---------------+ -| FlinkSessionJob/ads-database-audience2 | ads-audience2 | -| FlinkSessionJob/ads-database-pages | ads-pages | -| TableTrigger/ads-database-pageviews-trigger | ads-audience2 | -| TableTrigger/ads-database-pageviews-trigger | ads-pages | -+---------------------------------------------+---------------+ -(4 rows) ++----------------------------------------+---------------+ +| ELEMENT_NAME | PIPELINE_NAME | ++----------------------------------------+---------------+ +| FlinkSessionJob/ads-database-audience2 | ads-audience2 | +| FlinkSessionJob/ads-database-pages | ads-pages | ++----------------------------------------+---------------+ +(2 rows) !ok select pl.name as pipeline_name, pe.element_name, pe.failed as element_failed from "k8s".pipelines pl inner join (select t2.element_name, t1.failed, t2.pipeline_name from "k8s".pipeline_elements t1 inner join "k8s".pipeline_element_map t2 on t1. name = t2.element_name) pe on pl.name = pe.pipeline_name order by pipeline_name, element_name; -+---------------+---------------------------------------------+----------------+ -| PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED | -+---------------+---------------------------------------------+----------------+ -| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false | -| ads-audience2 | TableTrigger/ads-database-pageviews-trigger | false | -| ads-pages | FlinkSessionJob/ads-database-pages | false | -| ads-pages | TableTrigger/ads-database-pageviews-trigger | false | -+---------------+---------------------------------------------+----------------+ -(4 rows) ++---------------+----------------------------------------+----------------+ +| PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED | ++---------------+----------------------------------------+----------------+ +| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false | +| ads-pages | FlinkSessionJob/ads-database-pages | false | ++---------------+----------------------------------------+----------------+ +(2 rows) !ok -select name, "SCHEMA", "TABLE" from "k8s".TABLE_TRIGGERS; -+--------------------------------+--------+------------------+ -| NAME | SCHEMA | TABLE | -+--------------------------------+--------+------------------+ -| test-table-trigger | KAFKA | existing-topic-1 | -| ads-database-pageviews-trigger | KAFKA | ads_offline | -+--------------------------------+--------+------------------+ +# Scanning a 3-part identifier (ads_catalog.ads.ad_clicks) routes through the +# ads-source-trigger-template (bound to ads-catalog-database), which renders a +# TableTrigger whose `table` is the dot-joined source path. +create or replace materialized view ads.clicks as select * from ads_catalog.ads.ad_clicks; +(0 rows modified) + +!update + +select name, "SCHEMA", "TABLE" from "k8s".TABLE_TRIGGERS order by name; ++---------------------------------------+--------+---------------------------+ +| NAME | SCHEMA | TABLE | ++---------------------------------------+--------+---------------------------+ +| ads-catalog-database-adclicks-trigger | KAFKA | ADS_CATALOG.ADS.AD_CLICKS | +| test-table-trigger | KAFKA | existing-topic-1 | ++---------------------------------------+--------+---------------------------+ (2 rows) !ok +drop materialized view ads.clicks; +(0 rows modified) + +!update + drop materialized view ads.pages; (0 rows modified) diff --git a/hoptimator-k8s/src/test/resources/k8s-trigger-options.id b/hoptimator-k8s/src/test/resources/k8s-trigger-options.id index 5f7853bb..1ab8c4ff 100644 --- a/hoptimator-k8s/src/test/resources/k8s-trigger-options.id +++ b/hoptimator-k8s/src/test/resources/k8s-trigger-options.id @@ -44,7 +44,7 @@ drop trigger testdotted; !update # Test trigger with quoted option keys (required when keys contain reserved words like 'table') -create trigger testquoted on ads.ad_clicks as 'my-app' in 'my-mp' with ('offline.table.name' 'my-namespace.my-table', 'job.properties.online.table.name' 'MyOnlineTable'); +create trigger testquoted on ads.ad_clicks as 'my-app' in 'my-mp' with ('job.properties.online.table.name' 'MyOnlineTable'); (0 rows modified) !update diff --git a/hoptimator-logical/src/test/resources/logical-offline-ddl.id b/hoptimator-logical/src/test/resources/logical-offline-ddl.id index 9e154ba2..869032cc 100644 --- a/hoptimator-logical/src/test/resources/logical-offline-ddl.id +++ b/hoptimator-logical/src/test/resources/logical-offline-ddl.id @@ -5,19 +5,20 @@ # Test 1: CREATE TABLE on a logical table with an offline tier auto-creates # a paused TableTrigger via LogicalTableDeployer.deployImplicitTrigger(). # ───────────────────────────────────────────────────────────────────────────── -create or replace table "LOGICAL_OFFLINE"."MEMBERS" ("ID" bigint, "NAME" varchar); +create or replace table "LOGICAL_OFFLINE"."AD_CLICKS" ("ID" bigint, "NAME" varchar); (0 rows modified) !update # The trigger CRD must exist with the canonical "logical--offline-trigger" name, -# pointing at the offline tier's physical schema (PROFILE), and must start paused=true. -select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'logical-members-offline-trigger'; -+---------------------------------+---------+---------+--------+ -| NAME | SCHEMA | TABLE | PAUSED | -+---------------------------------+---------+---------+--------+ -| logical-members-offline-trigger | PROFILE | MEMBERS | true | -+---------------------------------+---------+---------+--------+ +# pointing at the offline tier's physical catalog+schema (ADS_CATALOG.ADS, from +# ads-catalog-database). The trigger must start paused=true. +select name, catalog, schema, "TABLE", paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger'; ++----------------------------------+-------------+--------+-----------+--------+ +| NAME | CATALOG | SCHEMA | TABLE | PAUSED | ++----------------------------------+-------------+--------+-----------+--------+ +| logical-adclicks-offline-trigger | ADS_CATALOG | ADS | AD_CLICKS | true | ++----------------------------------+-------------+--------+-----------+--------+ (1 row) !ok @@ -25,12 +26,12 @@ select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'log # ───────────────────────────────────────────────────────────────────────────── # Test 2: RESUME TRIGGER unpauses the implicit trigger. # ───────────────────────────────────────────────────────────────────────────── -resume trigger "logical-members-offline-trigger"; +resume trigger "logical-adclicks-offline-trigger"; (0 rows modified) !update -select paused from "k8s".table_triggers where name = 'logical-members-offline-trigger'; +select paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger'; +--------+ | PAUSED | +--------+ @@ -45,12 +46,12 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr # (K8sTriggerDeployer.update() resolves paused from the existing CRD when the # caller doesn't set PAUSED_OPTION.) # ───────────────────────────────────────────────────────────────────────────── -create or replace table "LOGICAL_OFFLINE"."MEMBERS" ("ID" bigint, "NAME" varchar, "EXTRA" varchar); +create or replace table "LOGICAL_OFFLINE"."AD_CLICKS" ("ID" bigint, "NAME" varchar, "EXTRA" varchar); (0 rows modified) !update -select paused from "k8s".table_triggers where name = 'logical-members-offline-trigger'; +select paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger'; +--------+ | PAUSED | +--------+ @@ -63,12 +64,12 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr # ───────────────────────────────────────────────────────────────────────────── # Test 4: PAUSE TRIGGER re-pauses it. # ───────────────────────────────────────────────────────────────────────────── -pause trigger "logical-members-offline-trigger"; +pause trigger "logical-adclicks-offline-trigger"; (0 rows modified) !update -select paused from "k8s".table_triggers where name = 'logical-members-offline-trigger'; +select paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger'; +--------+ | PAUSED | +--------+ @@ -78,7 +79,7 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr !ok -drop table "LOGICAL_OFFLINE"."MEMBERS"; +drop table "LOGICAL_OFFLINE"."AD_CLICKS"; (0 rows modified) !update \ No newline at end of file diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java index 554b71df..06c9e9cf 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/DeploymentServiceTest.java @@ -162,10 +162,9 @@ void parseHintsBackwardCompatibility() { // Test values with dots and underscores (common in property names) Map dotted = DeploymentService.parseHints(new Properties() {{ - put(HINT_OPTION, "kafka.source.properties.group.id=my_group,offline.table.name=ads_offline"); + put(HINT_OPTION, "kafka.source.properties.group.id=my_group"); }}); assertEquals("my_group", dotted.get("kafka.source.properties.group.id")); - assertEquals("ads_offline", dotted.get("offline.table.name")); } /**