Skip to content

Commit 9f5fbb7

Browse files
authored
Merge pull request #100 from IBMStreams/develop
merge develop to master #99
2 parents 5f7796d + afc2dcf commit 9f5fbb7

File tree

4 files changed

+49
-30
lines changed

4 files changed

+49
-30
lines changed

com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEOperatorWithInput.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public abstract class HBASEOperatorWithInput extends HBASEOperator {
2626
protected int colQualifierIndex = -1;
2727

2828
protected MetaType rowAttrType = null;
29+
protected int valueAttrIndex = -1;
30+
protected MetaType valueAttrType = null;
2931

3032
protected MetaType colQualifierType = null, colFamilyType = null;
3133

@@ -96,6 +98,12 @@ protected byte[] getColumnQualifier(Tuple tuple) throws Exception {
9698
}
9799
}
98100

101+
protected byte[] getValue(Tuple tuple) throws Exception {
102+
103+
return getBytes(tuple, valueAttrIndex, valueAttrType);
104+
}
105+
106+
99107
/**
100108
* For {rowAttrName,columnFamilyAttrName,columnQualifierAttrName}, if
101109
* specified, ensures the attribute exists, and stores the index in class

com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPut.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ private enum PutMode {
124124
final static String VALUE_NAME = "valueAttrName";
125125
protected byte[][] qualifierArray = null;
126126
protected MetaType[] attrType = null;
127-
private int valueAttrIndex = -1;
128-
private MetaType valueAttrType = null;
129127
protected boolean bufferTransactions = false;
130128

131129
protected Object tableLock = new Object();
@@ -283,39 +281,43 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
283281
byte colQ[] = getColumnQualifier(tuple);
284282
byte value[] = getBytes(tuple, valueAttrIndex, valueAttrType);
285283
myPut.addColumn(colF, colQ, value);
286-
//// myPut.add(colF, colQ, value);
287284
break;
288285
case RECORD:
289286
Tuple values = tuple.getTuple(valueAttr);
290287
for (int i = 0; i < qualifierArray.length; i++) {
291288
myPut.addColumn(colF, qualifierArray[i],
292-
// myPut.add(colF, qualifierArray[i],
293289
getBytes(values, i, attrType[i]));
294290
}
295291
break;
296292
default:
297293
// It should be impossible to get here.
298294
throw new Exception("Unsupported Put type");
299295
}
300-
301-
if (checkAttr != null) {
302-
Tuple checkTuple = tuple.getTuple(checkAttrIndex);
303-
304-
// the row attribute and the check row attribute have to match, so
305-
// don't even look
306-
// in the check attribute for hte row.
296+
297+
if (successAttrName != null) {
307298
byte checkRow[] = getRow(tuple);
308-
byte checkColF[] = getBytes(checkTuple, checkColFIndex,
309-
checkColFType);
310-
byte checkColQ[] = getBytes(checkTuple, checkColQIndex,
311-
checkColQType);
312-
byte checkValue[] = getCheckValue(checkTuple);
313-
314-
success = myTable.checkAndPut(checkRow, checkColF, checkColQ,
315-
checkValue, myPut);
299+
if (checkAttr != null) {
300+
Tuple checkTuple = tuple.getTuple(checkAttrIndex);
301+
// the row attribute and the check row attribute have to match, so
302+
// don't even look
303+
// in the check attribute for the row.
304+
byte checkColF[] = getBytes(checkTuple, checkColFIndex, checkColFType);
305+
byte checkColQ[] = getBytes(checkTuple, checkColQIndex, checkColQType);
306+
byte checkValue[] = getCheckValue(checkTuple);
307+
308+
success = myTable.checkAndPut(checkRow, checkColF, checkColQ, checkValue, myPut);
309+
}else{
310+
// set the success value without checkTuple
311+
byte checkColQ[] = getColumnQualifier(tuple);
312+
byte checkColF[] = getColumnFamily(tuple);
313+
byte checkValue[] = getValue(tuple);
314+
success = myTable.checkAndPut(checkRow,checkColF, checkColQ, checkValue, myPut);
315+
}
316316
logger.debug(Messages.getString("HBASE_PUT_RESULT", success));
317+
317318
} else if (!bufferTransactions && batchSize == 0) {
318-
myTable.put(myPut);
319+
320+
myTable.put(myPut);
319321
} else if (bufferTransactions){
320322
safePut(myPut);
321323
} else {
@@ -331,7 +333,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
331333
if (!bufferTransactions){
332334
myTable.close();
333335
}
334-
}
336+
}
335337
// Checks to see if an output tuple is necessary, and if so,
336338
// submits it.
337339
submitOutputTuple(tuple, success);

com.ibm.streamsx.hbase/impl/java/src/com/ibm/streamsx/hbase/HBASEPutDelete.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public abstract class HBASEPutDelete extends HBASEOperatorWithInput implements
6161
static final String CHECK_ATTR_PARAM = "checkAttrName";
6262
protected int checkAttrIndex = -1;
6363
static final String SUCCESS_PARAM = "successAttr";
64-
private String successAttrName = null;
64+
public String successAttrName = null;
6565
private int successAttrIndex = -1;
6666
StreamingOutput<OutputTuple> outStream = null;
6767

@@ -85,7 +85,7 @@ public void setCheckAttr(String name) {
8585
protected static void successRequiresOutput(OperatorContextChecker checker) {
8686
OperatorContext context = checker.getOperatorContext();
8787
Set<String> params = context.getParameterNames();
88-
if (params.contains(SUCCESS_PARAM)) {
88+
if (params.contains(CHECK_ATTR_PARAM)) {
8989

9090
if (context.getStreamingOutputs().size() == 0) {
9191
checker.setInvalidContext(Messages.getString("HBASE_PUT_DEL_INVALID_OUT_PARAM", SUCCESS_PARAM ), null);
@@ -105,9 +105,9 @@ protected static void successRequiresOutput(OperatorContextChecker checker) {
105105
*/
106106
protected static void compileTimeChecks(OperatorContextChecker checker,
107107
String operatorName) {
108-
// If successAttr is set, then we must be using checkAttrParam
109108
successRequiresOutput(checker);
110-
checker.checkDependentParameters(SUCCESS_PARAM, CHECK_ATTR_PARAM);
109+
// If checkAttrParam is set, then we must be using successAttr
110+
checker.checkDependentParameters(CHECK_ATTR_PARAM, SUCCESS_PARAM);
111111
checkConsistentRegionSource(checker, operatorName);
112112
if (!checker.checkExcludedParameters(CHECK_ATTR_PARAM, BATCHSIZE_NAME)){
113113
checker.setInvalidContext(Messages.getString("HBASE_PUT_DEL_INVALID_PARAM", CHECK_ATTR_PARAM, BATCHSIZE_NAME), null);
@@ -210,15 +210,16 @@ public synchronized void initialize(OperatorContext context)
210210
}
211211

212212
if (successAttrName != null) {
213-
if (checkAttrIndex < 0) {
213+
// if (checkAttrIndex < 0) {
214214
// TODO do context check the right way.
215-
throw new Exception(SUCCESS_PARAM + " only valid if "
216-
+ CHECK_ATTR_PARAM + " exists");
217-
}
215+
// throw new Exception(SUCCESS_PARAM + " only valid if "
216+
// + CHECK_ATTR_PARAM + " exists");
217+
// }
218218
// TODO also check that success attribute is only used if there's an
219219
// output port
220220
StreamSchema outSchema = outStream.getStreamSchema();
221221
Attribute attr = outSchema.getAttribute(successAttrName);
222+
222223
if (attr == null) {
223224
throw new Exception(
224225
"passed in success attribute, but no attribute found");

com.ibm.streamsx.hbase/info.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,16 @@ Here is an example to connect to the HBase server with Kerberos Authentication:
161161

162162
* The parameters 'tableNme' and 'tableNameAttribute' are optional, but only one of them must be set to define the name of table.
163163

164+
++ What is new in version 3.3.0
165+
166+
* The HBABSPut parameter 'successAttr' is not depending to the parameter 'checkAttrName'
167+
* It returns in output parameter a boolean results after a successfully insert data into table.
168+
169+
170+
171+
164172
</info:description>
165-
<info:version>3.2.0</info:version>
173+
<info:version>3.3.0</info:version>
166174
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
167175
</info:identity>
168176
<info:dependencies/>

0 commit comments

Comments
 (0)