Skip to content

Commit 3211f7b

Browse files
authored
Merge pull request #115 from IBMStreams/develop
streamsx.hbase merger develop to master
2 parents fd2c52d + 9421620 commit 3211f7b

File tree

6 files changed

+91
-12
lines changed

6 files changed

+91
-12
lines changed

com.ibm.streamsx.hbase/.classpath

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
<classpathentry kind="lib" path="opt/downloaded/hbase-client-1.4.9.jar"/>
1919
<classpathentry kind="lib" path="opt/downloaded/hbase-common-1.4.9.jar"/>
2020
<classpathentry kind="lib" path="opt/downloaded/hbase-protocol-1.4.9.jar"/>
21+
<classpathentry kind="lib" path="opt/downloaded/htrace-core-3.1.0-incubating.jar"/>
2122
<classpathentry kind="lib" path="opt/downloaded/htrace-core4-4.2.0-incubating.jar"/>
23+
<classpathentry kind="lib" path="opt/downloaded/jackson-core-asl-1.9.13.jar"/>
24+
<classpathentry kind="lib" path="opt/downloaded/jackson-mapper-asl-1.9.13.jar"/>
2225
<classpathentry kind="lib" path="opt/downloaded/javaee-api-7.0.jar"/>
2326
<classpathentry kind="lib" path="opt/downloaded/log4j-1.2.15.jar"/>
2427
<classpathentry kind="lib" path="opt/downloaded/metrics-core-2.2.0.jar"/>
@@ -27,6 +30,6 @@
2730
<classpathentry kind="lib" path="opt/downloaded/servlet-api-2.5.jar"/>
2831
<classpathentry kind="lib" path="opt/downloaded/slf4j-api-1.7.10.jar"/>
2932
<classpathentry kind="lib" path="opt/downloaded/slf4j-log4j12-1.7.10.jar"/>
30-
<classpathentry kind="lib" path="opt/downloaded/zookeeper-3.4.13.jar"/>
33+
<classpathentry kind="lib" path="opt/downloaded/zookeeper-3.4.6.jar"/>
3134
<classpathentry kind="output" path="bin"/>
3235
</classpath>

com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEOperatorWithInput.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@ public abstract class HBASEOperatorWithInput extends HBASEOperator {
2424
protected int rowAttrIndex = -1;
2525
protected int colFamilyIndex = -1;
2626
protected int colQualifierIndex = -1;
27+
protected int timestampAttributeIndex = -1;
2728

2829
protected MetaType rowAttrType = null;
2930
protected int valueAttrIndex = -1;
3031
protected MetaType valueAttrType = null;
3132

32-
protected MetaType colQualifierType = null, colFamilyType = null;
33+
protected MetaType colQualifierType = null;
34+
protected MetaType colFamilyType = null;
35+
protected MetaType timestampType =null;
3336

3437
static final String COL_FAM_PARAM_NAME = "columnFamilyAttrName";
3538
static final String COL_QUAL_PARAM_NAME = "columnQualifierAttrName";
3639
static final String TABLE_PARAM_NAME = "tableName";
3740
static final String ROW_PARAM_NAME = "rowAttrName";
41+
static final String TIMESTAMP = "Timestamp";
42+
static final String TIMESTAMP_ATTR = "TimestampAttrName";
43+
3844
byte colFamBytes[] = null;
3945
byte colQualBytes[] = null;
4046

@@ -98,12 +104,20 @@ protected byte[] getColumnQualifier(Tuple tuple) throws Exception {
98104
}
99105
}
100106

107+
protected long getTimeStamp(Tuple tuple) throws Exception {
108+
if (timestampAttributeIndex > 0)
109+
{
110+
return tuple.getLong(timestampAttributeIndex);
111+
}else{
112+
return 0;
113+
}
114+
}
115+
101116
protected byte[] getValue(Tuple tuple) throws Exception {
102117

103118
return getBytes(tuple, valueAttrIndex, valueAttrType);
104119
}
105120

106-
107121
/**
108122
* For {rowAttrName,columnFamilyAttrName,columnQualifierAttrName}, if
109123
* specified, ensures the attribute exists, and stores the index in class

com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPut.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.ArrayList;
88
import java.util.List;
99

10+
1011
import org.apache.hadoop.hbase.TableNotFoundException;
1112
import org.apache.hadoop.hbase.client.BufferedMutator;
1213
import org.apache.hadoop.hbase.client.Put;
@@ -21,6 +22,7 @@
2122
import com.ibm.streams.operator.StreamingInput;
2223
import com.ibm.streams.operator.StreamingOutput;
2324
import com.ibm.streams.operator.Tuple;
25+
import com.ibm.streams.operator.TupleAttribute;
2426
import com.ibm.streams.operator.Type.MetaType;
2527
import com.ibm.streams.operator.compile.OperatorContextChecker;
2628
import com.ibm.streams.operator.meta.TupleType;
@@ -126,6 +128,8 @@ private enum PutMode {
126128
private PutMode putMode = null;
127129
protected String valueAttr = null;
128130
final static String VALUE_NAME = "valueAttrName";
131+
protected long timeStamp = 0;
132+
protected String timestampAttribute = null;
129133
protected byte[][] qualifierArray = null;
130134
protected MetaType[] attrType = null;
131135
protected boolean bufferTransactions = false;
@@ -155,6 +159,17 @@ public void setValueAttr(String val) {
155159
valueAttr = val;
156160
}
157161

162+
@Parameter(name = TIMESTAMP, optional = true, description = "This parameter specifies the timestamp in milliseconds (INT64). The timestamp allows for versioning of the cells. Everytime HBaes make a PUT on a table it set the timestamp. By default this is the current time in milliseconds, but you can set your own timestamp as well with this parametr. Cannot be used with TimestampAttrName")
163+
public void setTimestamp(long ts) {
164+
timeStamp = ts;
165+
}
166+
167+
@Parameter(name = TIMESTAMP_ATTR, optional = true, description = "Name of the attribute on the input tuple containing the timestamp in milliseconds. Cannot be used with Timestamp.")
168+
public void setTimestampAtrr(String ts) {
169+
timestampAttribute = ts;
170+
}
171+
172+
158173
/**
159174
* Do any necessary compile time checks. It calls the checker of the super
160175
* class.
@@ -184,7 +199,9 @@ public static void checkForBatchSizeParam(OperatorContextChecker checker) {
184199
} else if (context.getParameterNames().contains(BATCHSIZE_NAME)) {
185200
System.err.println("The " + BATCHSIZE_NAME + " has been deprecated and should not be used. Use the " +BUFFER_PARAM + " parameter instead.");
186201
}
187-
202+
if ((!checker.checkExcludedParameters(TIMESTAMP,TIMESTAMP_ATTR))){
203+
checker.setInvalidContext("The " + TIMESTAMP + " can not be used with " + TIMESTAMP_ATTR , null);
204+
}
188205
}
189206

190207
Logger logger = Logger.getLogger(this.getClass());
@@ -247,6 +264,17 @@ public synchronized void initialize(OperatorContext context)
247264
valueAttrType = attr.getType().getMetaType();
248265
putMode = PutMode.ENTRY;
249266
}
267+
268+
if (timestampAttribute != null) {
269+
timestampAttributeIndex = checkAndGetIndex(schema, timestampAttribute);
270+
timestampType = schema.getAttribute(timestampAttributeIndex).getType().getMetaType();
271+
if (timestampType != MetaType.INT64){
272+
String message = Messages.getString("HBASE_OP_INVALID_ATTR", timestampAttribute, timestampType);
273+
Logger.getLogger(this.getClass()).trace(message);
274+
System.err.print(message);
275+
}
276+
}
277+
250278
}
251279

252280
/**
@@ -284,7 +312,18 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
284312
case ENTRY:
285313
byte colQ[] = getColumnQualifier(tuple);
286314
byte value[] = getBytes(tuple, valueAttrIndex, valueAttrType);
287-
myPut.addColumn(colF, colQ, value);
315+
if (timestampAttribute != null) {
316+
timeStamp = getTimeStamp(tuple);
317+
}
318+
319+
if (timeStamp > 0){
320+
myPut.addColumn(colF, colQ, timeStamp, value);
321+
// System.out.println("timeStamp = " + timeStamp);
322+
}
323+
else {
324+
myPut.addColumn(colF, colQ, value);
325+
}
326+
288327
break;
289328
case RECORD:
290329
Tuple values = tuple.getTuple(valueAttr);
@@ -308,8 +347,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
308347
byte checkColF[] = getBytes(checkTuple, checkColFIndex, checkColFType);
309348
byte checkColQ[] = getBytes(checkTuple, checkColQIndex, checkColQType);
310349
byte checkValue[] = getCheckValue(checkTuple);
311-
312-
success = myTable.checkAndPut(checkRow, checkColF, checkColQ, checkValue, myPut);
350+
success = myTable.checkAndPut(checkRow, checkColF, checkColQ, checkValue, myPut);
313351
}else{
314352
// set the success value without checkTuple
315353
byte checkColQ[] = getColumnQualifier(tuple);
@@ -341,6 +379,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
341379
// Checks to see if an output tuple is necessary, and if so,
342380
// submits it.
343381
submitOutputTuple(tuple, success);
382+
success = false;
344383
} catch (Exception e) {
345384
logger.error(e.getMessage());
346385
submitErrorMessagee(e.getMessage(), tuple);
@@ -455,4 +494,4 @@ public void processPunctuation(StreamingInput<Tuple> stream,
455494

456495

457496

458-
}
497+
}

com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPutDelete.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ public abstract class HBASEPutDelete extends HBASEOperatorWithInput implements
4343
protected int checkColFIndex = -1;
4444
protected int checkColQIndex = -1;
4545
protected int checkValueIndex = -1;
46+
protected int checkTimestampIndex = -1;
4647

4748
protected MetaType checkColFType = null, checkColQType = null,
48-
checkValueType = null;
49+
checkValueType = null, checkTimestampType = null;
4950

5051
Logger logger = Logger.getLogger(this.getClass());
5152

@@ -132,6 +133,13 @@ protected void establishCheckAttrMatching(Attribute checkAttr)
132133
checkValueType = checkSchema.getAttribute(checkValueIndex)
133134
.getType().getMetaType();
134135
}
136+
137+
if (checkSchema.getAttribute("Timestamp") != null) {
138+
checkTimestampIndex = checkAndGetIndex(checkSchema, "Timestamp");
139+
checkTimestampType = checkSchema.getAttribute(checkTimestampIndex)
140+
.getType().getMetaType();
141+
}
142+
135143
}
136144

137145
byte[] getCheckValue(Tuple tuple) throws Exception {
@@ -140,6 +148,14 @@ byte[] getCheckValue(Tuple tuple) throws Exception {
140148
} else
141149
return null;
142150
}
151+
152+
byte[] getCheckTimeStamp(Tuple tuple) throws Exception {
153+
if (checkTimestampIndex > 0) {
154+
return getBytes(tuple, checkTimestampIndex, checkTimestampType);
155+
} else
156+
return null;
157+
}
158+
143159

144160
/**
145161
* This checks that

com.ibm.streamsx.hbase/info.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,16 @@ Here is an example to connect to the HBase server with Kerberos Authentication:
184184
The first attribute in the optional error output port must be a 'rstring'.
185185
The second attribute in the error output port is optional and must be a 'TUPLE'.
186186

187+
++ What is new in version 3.7.0
188+
189+
* The HBASEPut operator provides 2 new parameters to add time stamp:
190+
**Timestamp** This parameter specifies the timestamp in milliseconds (INT64). The timestamp allows for versioning of the cells. Every time HBaes make a PUT on a table it set the timestamp. By default this is the current time in milliseconds, but you can set your own timestamp as well with this parameter. Cannot be used with TimestampAttrName")
191+
**TimestampAttrName** Name of the attribute on the input tuple containing the timestamp in milliseconds. Cannot be used with Timestamp.
192+
193+
The jar library zookeeper-3.4.13.jar has been replaced with **zookeeper-3.4.6.jar** .
187194

188195
</info:description>
189-
<info:version>3.6.0</info:version>
196+
<info:version>3.7.0</info:version>
190197
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
191198
</info:identity>
192199
<info:dependencies/>

com.ibm.streamsx.hbase/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<groupId>com.ibm.streamsx.hbase</groupId>
1616
<artifactId>streamsx.hbase</artifactId>
1717
<packaging>jar</packaging>
18-
<version>3.6.0</version>
18+
<version>3.7.0</version>
1919
<name>com.ibm.streamsx.hbase</name>
2020
<repositories>
2121
<repository>
@@ -337,7 +337,7 @@
337337
<dependency>
338338
<groupId>org.apache.zookeeper</groupId>
339339
<artifactId>zookeeper</artifactId>
340-
<version>3.4.13</version>
340+
<version>3.4.6</version>
341341
<exclusions>
342342
<exclusion>
343343
<groupId>*</groupId>

0 commit comments

Comments
 (0)