-
Notifications
You must be signed in to change notification settings - Fork 21
/
Migration.java
149 lines (131 loc) · 4.85 KB
/
Migration.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package io.smartcat.migration;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import io.smartcat.migration.exceptions.MigrationException;
import io.smartcat.migration.exceptions.SchemaAgreementException;
/**
* Abstract migration class that implements session DI and exposes required methods for execution.
*/
public abstract class Migration {
private int version = -1;
private MigrationType type = MigrationType.SCHEMA;
/**
* Active Cassandra session.
*/
protected Session session;
/**
* Create new migration with provided type and version.
* @param type Migration type (SCHEMA or DATA)
* @param version Migration version
*/
public Migration(final MigrationType type, final int version) {
this.type = type;
this.version = version;
}
/**
* Enables session injection into migration class.
* @param session Session object
*/
public void setSession(final Session session) {
this.session = session;
}
/**
* Returns migration type (schema or data).
* @return Migration type
*/
public MigrationType getType() {
return this.type;
}
/**
* Returns resulting database schema version of this migration.
* @return Resulting db schema version
*/
public int getVersion() {
return this.version;
}
/**
* Returns migration description (for history purposes).
* @return migration description.
*/
public abstract String getDescription();
/**
* Executes migration implementation.
* @throws MigrationException exception
*/
public abstract void execute() throws MigrationException;
/**
* Execute provided statement and checks if the schema migration has been propagated
* to all nodes in the cluster. Use this method when executing schema migrations.
* @param statement Statement to be executed
* @throws SchemaAgreementException exception
*/
protected void executeWithSchemaAgreement(Statement statement)
throws SchemaAgreementException {
ResultSet result = this.session.execute(statement);
if (checkSchemaAgreement(result)) {
return;
}
if (checkClusterSchemaAgreement()) {
return;
}
throw new SchemaAgreementException(
"Failed to propagate schema update to all nodes (schema agreement error)");
}
/**
* Whether the cluster had reached schema agreement after the execution of this query.
*
* After a successful schema-altering query (ex: creating a table), the driver will check if the cluster's nodes
* agree on the new schema version. If not, it will keep retrying for a given delay (configurable via
* {@link com.datastax.driver.core.Cluster.Builder#withMaxSchemaAgreementWaitSeconds(int)}).
*
* If this method returns {@code false}, clients can call
* {@link com.datastax.driver.core.Metadata#checkSchemaAgreement()} later to perform the check manually.
*
* Note that the schema agreement check is only performed for schema-altering queries For other query types, this
* method will always return {@code true}.
*
* @param resultSet Statement execution ResultSet
* @return whether the cluster reached schema agreement, or {@code true} for a non schema-altering statement.
*/
protected boolean checkSchemaAgreement(ResultSet resultSet) {
return resultSet.getExecutionInfo().isSchemaInAgreement();
}
/**
* Checks whether hosts that are currently up agree on the schema definition.
*
* This method performs a one-time check only, without any form of retry; therefore
* {@link com.datastax.driver.core.Cluster.Builder#withMaxSchemaAgreementWaitSeconds(int)}
* does not apply in this case.
*
* @return {@code true} if all hosts agree on the schema; {@code false} if
* they don't agree, or if the check could not be performed
* (for example, if the control connection is down).
*/
protected boolean checkClusterSchemaAgreement() {
return this.session.getCluster().getMetadata().checkSchemaAgreement();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((type == null) ? 0 : type.hashCode());
result = prime * result + version;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Migration other = (Migration) obj;
if (type != other.type)
return false;
if (version != other.version)
return false;
return true;
}
}