Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Adding SQL support for JDBC incremental sink to HDFS #1894

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

sridharpaladugu
Copy link

Adding SQL support to load data incrementally to HDFS. This can support a join column queries too.
For example if i have a tables in mysql;

CREATE TABLE user (
userid varchar(25) NOT NULL,
firstName varchar(50) NOT NULL,
lastName varchar(50) NOT NULL,
email varchar(125) NOT NULL,
PRIMARY KEY (userid)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE tweet (
userid varchar(25) NOT NULL,
msgid int(11) NOT NULL AUTO_INCREMENT,
message varchar(2096) NOT NULL,
timestamp datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (msgid),
KEY userid_idx (userid),
CONSTRAINT useridFK FOREIGN KEY (userid) REFERENCES user (userid) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=4705 DEFAULT CHARSET=latin1;

i use the below job definition to load data in hdfs/hawq;

job create loadTweetsJobTest --definition "jdbchdfs --driverClassName='com.mysql.jdbc.Driver' --url='jdbc:mysql://localhost/tweets?autoReconnect=true&useSSL=false' --username='root' --password='omsai' --sql='select msgid, firstname, lastname, message, timestamp from tweets.tweet join tweets.user on tweets.tweet.userid = tweets.user.userid' --checkColumn='msgid' --restartable=true"

Hawq table;

CREATE EXTERNAL TABLE tweets.extn_tweet
(
msgid integer,
firstname text,
lastname text,
message text,
ts timestamp without time zone
)
LOCATION (
'pxf://172.16.65.129:50070/xd/loadTweetsJob/*.csv?profile=HdfsTextSimple'
)
FORMAT 'text' (delimiter ',' null '\N' escape '')
ENCODING 'UTF8';
ALTER TABLE tweets.extn_tweet
OWNER TO gpadmin;

@mminella mminella self-assigned this Feb 19, 2016
@mminella
Copy link
Contributor

I did a quick read over this. A couple concerns:

  • I don't see any unit tests for this new functionality.
  • I must be missing something. The incremental load functionality works by the partitioner setting a partition clause that limits the lower bound of the check column. However, the NamedColumnJdbcItemReaderFactory which is used to create the reader in this job ignores the partitionClause when sql is provided which would mean that the incremental load functionality isn't applied. Am I missing something?

@sridharpaladugu
Copy link
Author

Added a Test case.

The SQL increment load works using check column only. If the check column not specified then it has no way to identify whether it is incremental load, so falls back to legacy code where we just run the SQL and load data. That is every subsequent job invocation load will produce a file of data with duplicates (This is where i have doubts too). If check column present then the incremental load works fine.

@mminella
Copy link
Contributor

The test case you added demonstrates the partitioning capabilities, but not the incremental load part. Can you please address that part as well?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants