Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions deploy/samples/demodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ metadata:
name: ads-catalog-database
spec:
catalog: ADS_CATALOG
schema: ADS
url: jdbc:demodb://names=ads
dialect: Calcite

Expand Down Expand Up @@ -68,7 +69,7 @@ metadata:
name: ads-source-trigger-template
spec:
databases:
- ads-database
- ads-catalog-database
methods:
- Scan
yaml: |
Expand All @@ -78,7 +79,7 @@ spec:
name: {{name}}-trigger
spec:
schema: KAFKA
table: {{offline.table.name}}
table: {{path}}
jobProperties: {{job.properties}}
yaml: |
apiVersion: batch/v1
Expand Down
2 changes: 1 addition & 1 deletion deploy/samples/logicaldb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ kind: Database
metadata:
name: logical-offline
spec:
url: jdbc:logical://nearline=ads-database;offline=profile-database
url: jdbc:logical://nearline=ads-database;offline=ads-catalog-database
schema: LOGICAL_OFFLINE
dialect: Calcite
4 changes: 2 additions & 2 deletions deploy/samples/retl-job-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: retl-job-template
namespace: default
spec:
databases: ["profile-database"]
databases: ["ads-catalog-database"]
yaml: |
apiVersion: batch/v1
kind: Job
Expand All @@ -20,7 +20,7 @@ spec:
- name: TABLE_TRIGGER_NAME
value: "{{trigger}}"
- name: SOURCE_TABLE_NAME
value: "{{offline.table.name}}"
value: "{{path}}"
- name: ONLINE_TABLE_NAME
value: "{{job.properties.online.table.name}}"
command: ["bash", "-c", "echo {{name}}-trigger fired at `date`"]
Expand Down
2 changes: 1 addition & 1 deletion hoptimator
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ $BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \
sqlline.SqlLine \
-ac sqlline.HoptimatorAppConfig \
--verbose \
-u "jdbc:hoptimator://fun=mysql;hints=offline.table.name=ads_offline,job.properties.account=foo" -n "" -p "" -nn "Hoptimator" $@
-u "jdbc:hoptimator://fun=mysql;hints=job.properties.account=foo" -n "" -p "" -nn "Hoptimator" $@

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public List<String> specify() throws SQLException {
.with("catalog", source.catalog())
.with("schema", source.schema())
.with("table", source.table())
.with("path", source.pathString())
.with(source.options())
.with(JOB_PROPERTIES_PREFIX, getJobPropertiesFromOptions(source.options()))
.with(DeploymentService.parseHints(connection.connectionProperties()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected V1alpha1TableTrigger toK8sObject() throws SQLException {
.with("trigger", triggerName)
.with("job", jobName)
.with("schedule", trigger.cronSchedule())
.with("path", source != null ? source.pathString() : null)
.with("table", source != null ? source.table() : null)
.with("schema", source != null ? source.schema() : null)
.with("catalog", source != null ? source.catalog() : null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,52 @@ void toK8sObjectBindsCatalogAsTemplateVariable() throws SQLException {
"{{catalog}}.{{schema}}.{{table}} must render to OPENHOUSE.MYDB.MYTABLE — got: " + specs.get(0));
}

// ───────── toK8sObject() path derivation tests ─────────

@Test
void toK8sObjectDerivesPathFromSourceForThreePartPath() throws SQLException {
// {{path}} should default to source.pathString() when no explicit option is set.
V1alpha1JobTemplate jobTemplate = new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("myjob").namespace("test-ns"))
.spec(new V1alpha1JobTemplateSpec().yaml(
"offlineTable: {{path}}"));
jobTemplates.add(jobTemplate);

Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *",
Collections.emptyMap(),
new Source(null, Arrays.asList("OPENHOUSE", "MYDB", "MYTABLE"), Collections.emptyMap()),
null);

K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext);
List<String> specs = deployer.specify();

assertFalse(specs.isEmpty());
assertTrue(specs.get(0).contains("offlineTable: OPENHOUSE.MYDB.MYTABLE"),
"{{path}} must default to source.pathString() — got: " + specs.get(0));
}

@Test
void toK8sObjectDerivesPathFromSourceForTwoPartPath() throws SQLException {
// Same default applies for 2-part paths.
V1alpha1JobTemplate jobTemplate = new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("myjob").namespace("test-ns"))
.spec(new V1alpha1JobTemplateSpec().yaml(
"offlineTable: {{path}}"));
jobTemplates.add(jobTemplate);

Trigger trigger = new Trigger("MY_TRIGGER", new UserJob("test-ns", "MY_JOB"), "0 * * * *",
Collections.emptyMap(),
new Source(null, Arrays.asList("ADS", "AD_CLICKS"), Collections.emptyMap()),
null);

K8sTriggerDeployer deployer = makeDeployer(trigger, mockContext);
List<String> specs = deployer.specify();

assertFalse(specs.isEmpty());
assertTrue(specs.get(0).contains("offlineTable: ADS.AD_CLICKS"),
"{{path}} must default to source.pathString() — got: " + specs.get(0));
}

// ───────── toK8sObject() paused rendering test ─────────

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void k8sValidationScript() throws Exception {

@Test
public void k8sMetadataTables() throws Exception {
run("k8s-metadata.id", "hints=offline.table.name=ads_offline");
run("k8s-metadata.id");
}

@Test
Expand Down
74 changes: 41 additions & 33 deletions hoptimator-k8s/src/test/resources/k8s-metadata.id
Original file line number Diff line number Diff line change
Expand Up @@ -61,55 +61,63 @@ select name, failed from "k8s".pipelines order by name;
!ok

select name, failed from "k8s".pipeline_elements order by name;
+---------------------------------------------+--------+
| NAME | FAILED |
+---------------------------------------------+--------+
| FlinkSessionJob/ads-database-audience2 | false |
| FlinkSessionJob/ads-database-pages | false |
| TableTrigger/ads-database-pageviews-trigger | false |
+---------------------------------------------+--------+
(3 rows)
+----------------------------------------+--------+
| NAME | FAILED |
+----------------------------------------+--------+
| FlinkSessionJob/ads-database-audience2 | false |
| FlinkSessionJob/ads-database-pages | false |
+----------------------------------------+--------+
(2 rows)

!ok

select * from "k8s".pipeline_element_map order by element_name, pipeline_name;
+---------------------------------------------+---------------+
| ELEMENT_NAME | PIPELINE_NAME |
+---------------------------------------------+---------------+
| FlinkSessionJob/ads-database-audience2 | ads-audience2 |
| FlinkSessionJob/ads-database-pages | ads-pages |
| TableTrigger/ads-database-pageviews-trigger | ads-audience2 |
| TableTrigger/ads-database-pageviews-trigger | ads-pages |
+---------------------------------------------+---------------+
(4 rows)
+----------------------------------------+---------------+
| ELEMENT_NAME | PIPELINE_NAME |
+----------------------------------------+---------------+
| FlinkSessionJob/ads-database-audience2 | ads-audience2 |
| FlinkSessionJob/ads-database-pages | ads-pages |
+----------------------------------------+---------------+
(2 rows)

!ok

select pl.name as pipeline_name, pe.element_name, pe.failed as element_failed from "k8s".pipelines pl inner join (select t2.element_name, t1.failed, t2.pipeline_name from "k8s".pipeline_elements t1 inner join "k8s".pipeline_element_map t2 on t1.
name = t2.element_name) pe on pl.name = pe.pipeline_name order by pipeline_name, element_name;
+---------------+---------------------------------------------+----------------+
| PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED |
+---------------+---------------------------------------------+----------------+
| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false |
| ads-audience2 | TableTrigger/ads-database-pageviews-trigger | false |
| ads-pages | FlinkSessionJob/ads-database-pages | false |
| ads-pages | TableTrigger/ads-database-pageviews-trigger | false |
+---------------+---------------------------------------------+----------------+
(4 rows)
+---------------+----------------------------------------+----------------+
| PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED |
+---------------+----------------------------------------+----------------+
| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false |
| ads-pages | FlinkSessionJob/ads-database-pages | false |
+---------------+----------------------------------------+----------------+
(2 rows)

!ok

select name, "SCHEMA", "TABLE" from "k8s".TABLE_TRIGGERS;
+--------------------------------+--------+------------------+
| NAME | SCHEMA | TABLE |
+--------------------------------+--------+------------------+
| test-table-trigger | KAFKA | existing-topic-1 |
| ads-database-pageviews-trigger | KAFKA | ads_offline |
+--------------------------------+--------+------------------+
# Scanning a 3-part identifier (ads_catalog.ads.ad_clicks) routes through the
# ads-source-trigger-template (bound to ads-catalog-database), which renders a
# TableTrigger whose `table` is the dot-joined source path.
create or replace materialized view ads.clicks as select * from ads_catalog.ads.ad_clicks;
(0 rows modified)

!update

select name, "SCHEMA", "TABLE" from "k8s".TABLE_TRIGGERS order by name;
+---------------------------------------+--------+---------------------------+
| NAME | SCHEMA | TABLE |
+---------------------------------------+--------+---------------------------+
| ads-catalog-database-adclicks-trigger | KAFKA | ADS_CATALOG.ADS.AD_CLICKS |
| test-table-trigger | KAFKA | existing-topic-1 |
+---------------------------------------+--------+---------------------------+
(2 rows)

!ok

drop materialized view ads.clicks;
(0 rows modified)

!update

drop materialized view ads.pages;
(0 rows modified)

Expand Down
2 changes: 1 addition & 1 deletion hoptimator-k8s/src/test/resources/k8s-trigger-options.id
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ drop trigger testdotted;
!update

# Test trigger with quoted option keys (required when keys contain reserved words like 'table')
create trigger testquoted on ads.ad_clicks as 'my-app' in 'my-mp' with ('offline.table.name' 'my-namespace.my-table', 'job.properties.online.table.name' 'MyOnlineTable');
create trigger testquoted on ads.ad_clicks as 'my-app' in 'my-mp' with ('job.properties.online.table.name' 'MyOnlineTable');
(0 rows modified)

!update
Expand Down
31 changes: 16 additions & 15 deletions hoptimator-logical/src/test/resources/logical-offline-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,33 @@
# Test 1: CREATE TABLE on a logical table with an offline tier auto-creates
# a paused TableTrigger via LogicalTableDeployer.deployImplicitTrigger().
# ─────────────────────────────────────────────────────────────────────────────
create or replace table "LOGICAL_OFFLINE"."MEMBERS" ("ID" bigint, "NAME" varchar);
create or replace table "LOGICAL_OFFLINE"."AD_CLICKS" ("ID" bigint, "NAME" varchar);
(0 rows modified)

!update

# The trigger CRD must exist with the canonical "logical-<table>-offline-trigger" name,
# pointing at the offline tier's physical schema (PROFILE), and must start paused=true.
select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'logical-members-offline-trigger';
+---------------------------------+---------+---------+--------+
| NAME | SCHEMA | TABLE | PAUSED |
+---------------------------------+---------+---------+--------+
| logical-members-offline-trigger | PROFILE | MEMBERS | true |
+---------------------------------+---------+---------+--------+
# pointing at the offline tier's physical catalog+schema (ADS_CATALOG.ADS, from
# ads-catalog-database). The trigger must start paused=true.
select name, catalog, schema, "TABLE", paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger';
+----------------------------------+-------------+--------+-----------+--------+
| NAME | CATALOG | SCHEMA | TABLE | PAUSED |
+----------------------------------+-------------+--------+-----------+--------+
| logical-adclicks-offline-trigger | ADS_CATALOG | ADS | AD_CLICKS | true |
+----------------------------------+-------------+--------+-----------+--------+
(1 row)

!ok

# ─────────────────────────────────────────────────────────────────────────────
# Test 2: RESUME TRIGGER unpauses the implicit trigger.
# ─────────────────────────────────────────────────────────────────────────────
resume trigger "logical-members-offline-trigger";
resume trigger "logical-adclicks-offline-trigger";
(0 rows modified)

!update

select paused from "k8s".table_triggers where name = 'logical-members-offline-trigger';
select paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger';
+--------+
| PAUSED |
+--------+
Expand All @@ -45,12 +46,12 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr
# (K8sTriggerDeployer.update() resolves paused from the existing CRD when the
# caller doesn't set PAUSED_OPTION.)
# ─────────────────────────────────────────────────────────────────────────────
create or replace table "LOGICAL_OFFLINE"."MEMBERS" ("ID" bigint, "NAME" varchar, "EXTRA" varchar);
create or replace table "LOGICAL_OFFLINE"."AD_CLICKS" ("ID" bigint, "NAME" varchar, "EXTRA" varchar);
(0 rows modified)

!update

select paused from "k8s".table_triggers where name = 'logical-members-offline-trigger';
select paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger';
+--------+
| PAUSED |
+--------+
Expand All @@ -63,12 +64,12 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr
# ─────────────────────────────────────────────────────────────────────────────
# Test 4: PAUSE TRIGGER re-pauses it.
# ─────────────────────────────────────────────────────────────────────────────
pause trigger "logical-members-offline-trigger";
pause trigger "logical-adclicks-offline-trigger";
(0 rows modified)

!update

select paused from "k8s".table_triggers where name = 'logical-members-offline-trigger';
select paused from "k8s".table_triggers where name = 'logical-adclicks-offline-trigger';
+--------+
| PAUSED |
+--------+
Expand All @@ -78,7 +79,7 @@ select paused from "k8s".table_triggers where name = 'logical-members-offline-tr

!ok

drop table "LOGICAL_OFFLINE"."MEMBERS";
drop table "LOGICAL_OFFLINE"."AD_CLICKS";
(0 rows modified)

!update
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ void parseHintsBackwardCompatibility() {

// Test values with dots and underscores (common in property names)
Map<String, String> dotted = DeploymentService.parseHints(new Properties() {{
put(HINT_OPTION, "kafka.source.properties.group.id=my_group,offline.table.name=ads_offline");
put(HINT_OPTION, "kafka.source.properties.group.id=my_group");
}});
assertEquals("my_group", dotted.get("kafka.source.properties.group.id"));
assertEquals("ads_offline", dotted.get("offline.table.name"));
}

/**
Expand Down
Loading