Skip to content

Latest commit

 

History

History
329 lines (251 loc) · 14.7 KB

7_java_api.md

File metadata and controls

329 lines (251 loc) · 14.7 KB

Documentation

Using Connector in Java

This section describes how to access the functionality of Connector when you write your program in Java. It is assumed that you already familiarized yourself with the previous sections and you understand how Connector works.

With Spark Cassandra Connector 1.1.x, Java API comes with significant changes and enhancements.

Prerequisites

In order to use Java API, you need to add the Java API module to the list of dependencies:

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.1.0" 

The best way to use Connector Java API is to import statically all the methods in CassandraJavaUtil. This utility class is the main entry point for Connector Java API.

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

The code snippets below work with a sample keyspace ks and table people. From the CQLSH shell, create the keyspace, table, and data with these commands:

create keyspace if not exists ks with replication = {'class':'SimpleStrategy', 'replication_factor':1};

create table if not exists ks.people (
  id int primary key,
  name text,
  birth_date timestamp
);

create index on ks.people (name);

insert into ks.people (id, name, birth_date) values (10, 'Catherine', '1987-12-02');
insert into ks.people (id, name, birth_date) values (11, 'Isadora', '2004-09-08');
insert into ks.people (id, name, birth_date) values (12, 'Anna', '1970-10-02');

Accessing Cassandra tables in Java

CassandraJavaRDD is a CassandraRDD counterpart in Java. It allows to invoke easily Connector specific methods in order to enforce selection or projection on the database side. However, conversely to CassandraRDD, it extends JavaRDD which is much more suitable for the development of Spark applications in Java.

In order to create CassandraJavaRDD you need to invoke one of the cassandraTable methods of a special wrapper around SparkContext. The wrapper can be easily obtained with use of one of the overloaded javaFunctions method in CassandraJavaUtil.

Example:

JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable("ks", "people")
        .map(new Function<CassandraRow, String>() {
            @Override
            public String call(CassandraRow cassandraRow) throws Exception {
                return cassandraRow.toString();
            }
        });
System.out.println("Data as CassandraRows: \n" + StringUtils.join(cassandraRowsRDD.toArray(), "\n"));

In the above example, cassandraTable method has been used to create CassandraJavaRDD view of the data in ks.people. The elements of the returned RDD are of CassandraRow type. If you want to produce an RDD of custom beans, you may use cassandraTable method, which accepts a custom RowReaderFactory - (see Working with user-defined case classes and tuples for more details).

Obtaining CassandraJavaRDD

Since version 1.1.x, Java API comes with several useful factory methods which can be used to create factories of row readers of the two major kinds: type converter based and column mapper based.

The type converter based row reader uses a single TypeConverter to map a single column from a row to some type. It doesn't matter how many columns are in projection because it always choose the first one. This kind of row reader is useful when one wants to select a single column from a table and map it directly to an RDD of values of such types as String, Integer, etc. For example, we may want to get an RDD of prices in order to calculate the average:

Example:

JavaRDD<Double> pricesRDD = javaFunctions(sc).cassandraTable("ks", "people", mapColumnTo(Double.class)).select("price");

In the above example we explicitly select a single column (see the next subsection for details) and map it directly to Double.

There are other overloaded versions of mapColumnTo methods which allow to map a column to one of the collection types, or with use of an explicitly specified type converter.

The column mapper based row reader takes all the selected columns and maps them to some object with use of a given ColumnMapper. The corresponding factories can be easily obtained by series of mapRowTo overloaded methods.

Example:

// firstly, we define a bean class
public static class Person implements Serializable {
    private Integer id;
    private String name;
    private Date birthDate;

    // Remember to declare no-args constructor
    public Person() { }

    public Person(Integer id, String name, Date birthDate) {
        this.id = id;
        this.name = name;
        this.birthDate = birthDate;
    }

    public Integer getId() { return id; }
    public void setId(Integer id) { this.id = id; }

    public String getName() { return name; }
    public void setName(String name) { this.name = name; }

    public Date getBirthDate() { return birthDate; }
    public void setBirthDate(Date birthDate) { this.birthDate = birthDate; }

    // other methods, constructors, etc.
}
JavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people", mapRowTo(Person.class));

In this example, we created a CassandraJavaRDD of Person elements. While defining bean classes like Person, remember to define no-args constructor. Although, it is not required for it to be the only constructor of such a class.

By default, mapRowTo methods use JavaBeanColumnMapper with a default column name mapping logic. The column name translation can be customised by providing pairs for column name and attribute name which have to be overridden. There is also one overloaded mapRowTo methods which allows to specify a custom ColumnMapper. More details about column mapper can be found in Working with user-defined case classes and tuples and Customizing the mapping between Scala and Cassandra.

Since 1.2, it is possible to easily provide custom column name to property name translation by select method.

Example: Say we have a table people2 with columns id INT, last_name TEXT, date_of_birth TIMESTAMP and we want to map the rows of this table to objects of Person class.

CassandraJavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people2", mapRowTo(Person.class)).select(
        column("id"),
        column("last_name").as("name"),
        column("date_of_birth").as("birthDate"));

as method can be used for any type of projected value: normal column, TTL or write time:

javaFunctions(sc).cassandraTable("test", "table", mapRowTo(SomeClass.class)).select(
        column("no_alias"),
        column("simple").as("simpleProp"),
        ttl("simple").as("simplePropTTL"),
        writeTime("simple").as("simpleWriteTime"))

Obtaining CassandraJavaPairRDD

Since 1.1.0 one can directly obtain a CassandraJavaPairRDD, which is an extension of JavaPairRDD. This can be done easily by specifying two row reader factories (vs one row reader factory in the previous examples). The corresponding row readers are responsible for resolving key and value from each row. The same methods mapRowTo and mapColumnTo can be used to obtain the proper factories. However, one should keep in mind the following nuances:

Key row reader Value row reader Remarks
mapColumnTo mapColumnTo 1st column mapped to key, 2nd column mapped to value
mapColumnTo mapRowTo 1st column mapped to key, whole row mapped to value
mapRowTo mapColumnTo whole row mapped to key, 1st column mapped to value
mapRowTo mapRowTo whole row mapped to key, whole row mapped to value

Examples:

CassandraJavaPairRDD<Integer, String> rdd1 = javaFunctions(sc)
    .cassandraTable("ks", "people", mapColumnTo(Integer.class), mapColumnTo(String.class))
    .select("id", "name");

CassandraJavaPairRDD<Integer, Person> rdd2 = javaFunctions(sc)
    .cassandraTable("ks", "people", mapColumnTo(Integer.class), mapRowTo(Person.class))
    .select("id", "name", "birth_date");

Using selection and projection on the database side

Once CassandraJavaRDD is created, you may apply selection and projection on that RDD by invoking where and select methods on it respectively. Their semantic is the same as the semantic of their counterparts in CassandraRDD.

Note: See the description of filtering to understand the limitations of the where method.

Example:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "people")
        .select("id").map(new Function<CassandraRow, String>() {
            @Override
            public String call(CassandraRow cassandraRow) throws Exception {
                return cassandraRow.toString();
            }
        });
System.out.println("Data with only 'id' column fetched: \n" + StringUtils.join(rdd.toArray(), "\n"));

Example:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "people")
        .where("name=?", "Anna").map(new Function<CassandraRow, String>() {
            @Override
            public String call(CassandraRow cassandraRow) throws Exception {
                return cassandraRow.toString();
            }
        });
System.out.println("Data filtered by the where clause (name='Anna'): \n" + StringUtils.join(rdd.toArray(), "\n"));

Saving data to Cassandra

javaFunctions method can be also applied to any RDD in order to provide writerBuilder factory method. In Spark Cassandra Connector prior to 1.1.0 there are a number of overloaded saveToCassandra methods because of a lack of default values support for arguments and implicit conversions. Starting from version 1.1.0 they were replaced by a builder object RDDAndDStreamCommonJavaFunctions.WriterBuilder, which can be obtained by invoking writerBuilder method on the RDD wrapper. When the builder is eventually configured, one needs to call saveToCassandra method on it to run writing job.

In the following example, a JavaRDD of Person elements is saved to Cassandra table ks.people with a default mapping and configuration.

Example:

List<Person> people = Arrays.asList(
        new Person(1, "John", new Date()),
        new Person(2, "Troy", new Date()),
        new Person(3, "Andrew", new Date())
);
JavaRDD<Person> rdd = sc.parallelize(people);
javaFunctions(rdd).writerBuilder("ks", "people", mapToRow(Person.class)).saveToCassandra();

There are several mapToRow overloaded methods available to make it easier to get the proper RowWriterFactory instance (which is the required third argument of writerBuilder method). In its simplest form, it takes the class of RDD elements and uses a default JavaBeanColumnMapper to map those elements to Cassandra rows. Custom column name to attribute translations can be specified in order to override the default logic. If JavaBeanColumnMapper is not an option, a custom column mapper can be specified as well.

Working with tuples

Since 1.3 there new methods to work with Scala tuples.

To read a Cassandra table as an RDD of tuples, just use one of mapRowToTuple methods to create the appropriate RowReaderFactory instance. The arity of the tuple is determined by the number of parameters which are provided to the mentioned method.

Example:

CassandraJavaRDD<Tuple3<String, Integer, Double>> rdd = javaFunctions(sc)
        .cassandraTable("ks", tuples", mapRowToTuple(String.class, Integer.class, Double.class))
        .select("stringCol", "intCol", "doubleCol")

Remember to explicitly specify the columns to be selected because the values from the selected columns are resolved by the column position rather than its name.

There are also new methods mapTupleToRow to create RowWriterFactory instance for tuples. Those methods require all the tuple arguments types to be provided. The number of them determines the arity of tuples.

Example:

CassandraJavaUtil.javaFunctions(sc.makeRDD(Arrays.asList(tuple)))
        .writerBuilder("cassandra_java_util_spec", "test_table_4", mapTupleToRow(
                String.class,
                Integer.class,
                Double.class
        )).withColumnSelector(someColumns("stringCol", "intCol", "doubleCol"))
        .saveToCassandra()

Similarly to reading data as tuples, it is highly recommended to explicitly specify the columns which are to be populated.

Extensions for Spark Streaming

The main entry point for Spark Streaming in Java is JavaStreamingContext object. Like for JavaSparkContext, we can use javaFunctions method to access Connector specific functionality. For example, we can create an ordinary CassandraJavaRDD by invoking the same cassandraTable method as we do for SparkContext. There is nothing specific to streaming in this case - these methods are provided only for convenience and they use SparkContext wrapped by StreamingContext under the hood.

You may also save the data from JavaDStream to Cassandra. Again, you need to use javaFunctions method to create a special wrapper around JavaDStream and then invoke writerBuilder method and finally saveToCassandra on it. DStream is a sequence of RDDs and when you invoke saveToCassandra on the builder, it will follow saving to Cassandra all the RDDs in that DStream.

javaFunctions methods for Spark streaming related entities are provided in CassandraStreamingJavaUtil.

Summary of changes between versions 1.0 and 1.1

  • added the new functionality of the connector which has been introduced in v1.1
  • removed multiple overloaded cassandraTable methods from the Java wrappers of SparkContext or StreamingContext
  • introduced several static factory methods in CassandraJavaUtil for:
    • creating column based reader factories (mapColumnTo methods)
    • creating row based reader factories (mapRowTo methods)
    • creating writer factories (mapToRow methods)
    • creating type tags for arbitrary types and type parameters (typeTag methods)
    • resolving type converters for arbitrary types and type parameters (typeConverter methods)
  • removed class argument from Java RDD wrappers factory methods
  • deprecated saveToCassandra methods in Java RDD wrappers; the preferred way to save data to Cassandra is to use writerBuilder method, which returns RDDAndDStreamCommonJavaFunctions.WriterBuilder instance, which in turn has saveToCassandra method

Further Examples

A longer example (with source code) of the Connector Java API is on the DataStax tech blog: Accessing Cassandra from Spark in Java.

Scala 2.11

Java API 1.2.0 is not yet supported for Scala 2.11 because of SPARK-3266

Next - Spark Streaming with Cassandra