Skip to content

Commit 205f752

Browse files
committed
AJ-989:add test case about HAStreamTable
1 parent 4d8318c commit 205f752

5 files changed

Lines changed: 345 additions & 15 deletions

File tree

test/com/xxdb/Prepare.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ public static void clear_env() throws IOException {
2626
"for(i in a){\n" +
2727
"\ttry{stopPublishTable(i.subscriber.split(\":\")[0],int(i.subscriber.split(\":\")[1]),i.tableName,i.actions)}catch(ex){}\n" +
2828
"}");
29+
conn.run("streamT = exec name from getStreamTables() where shared=true\n" +
30+
"for(i in streamT){\n" +
31+
"\ttry{\n" +
32+
"\t\tdropStreamTable(i)\n" +
33+
"\t\t}catch(ex){}\n" +
34+
"\t}");
2935
conn.run("res = getStreamingSQLStatus()\n" +
3036
" for(sqlStream in res){\n" +
3137
" try{unsubscribeStreamingSQL(, sqlStream.queryId)}catch(ex){print ex}\n" +
@@ -594,6 +600,19 @@ public static void wait_data(String table_name, int data_row) throws IOException
594600
BasicInt row_num;
595601
for(int i=0;i<200;i++){
596602
row_num = (BasicInt)conn.run("(exec count(*) from "+table_name+")[0]");
603+
// System.out.println(row_num.getInt());
604+
if(row_num.getInt() == data_row){
605+
break;
606+
}
607+
Thread.sleep(300);
608+
i++;
609+
}
610+
}
611+
612+
public static void wait_data(String table_name, int data_row, DBConnection conn) throws IOException, InterruptedException {
613+
BasicInt row_num;
614+
for(int i=0;i<200;i++){
615+
row_num = (BasicInt)conn.run("(exec count(*) from "+table_name+")[0]");
597616
// System.out.println(row_num.getInt());
598617
if(row_num.getInt() == data_row){
599618
break;

test/com/xxdb/streaming/client/cep/EventClientTest.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ public void test_EventClient_subscribe_haStreamTable() throws IOException, Inte
929929
BasicTable re = (BasicTable)conn.run("select * from outputTable");
930930
Assert.assertEquals(1,re.rows());
931931
}
932-
//@Test
932+
@Test
933933
public void test_EventClient_subscribe_haStreamTable_leader() throws IOException, InterruptedException {
934934
BasicString StreamLeaderTmp = (BasicString)conn.run(String.format("getStreamingLeader(%d)", GROUP_ID));
935935
String StreamLeader = StreamLeaderTmp.getString();
@@ -939,13 +939,13 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
939939
int StreamLeaderPort = StreamLeaderPortTmp.getInt();
940940
System.out.println(StreamLeaderHost);
941941
System.out.println(StreamLeaderPort);
942-
DBConnection conn1 = new DBConnection();
943-
conn1.connect(StreamLeaderHost, StreamLeaderPort, "admin", "123456");
942+
DBConnection conn_leader = new DBConnection();
943+
conn_leader.connect(StreamLeaderHost, StreamLeaderPort, "admin", "123456");
944944
String script = "try{\ndropStreamTable(`inputTable_1)\n}catch(ex){\n}\n"+
945945
"table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n"+
946946
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);\n"+
947947
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
948-
conn1.run(script);
948+
conn_leader.run(script);
949949
EventSchema scheme = new EventSchema();
950950
scheme.setEventType("MarketData");
951951
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
@@ -955,16 +955,28 @@ public void test_EventClient_subscribe_haStreamTable_leader() throws IOExceptio
955955
eventSchemas.add(scheme);
956956
List<String> eventTimeFields = Arrays.asList(new String[]{"timestamp"});
957957
List<String> commonFields = Arrays.asList(new String[]{"comment1"});
958-
sender = new EventSender(conn, "inputTable_1", eventSchemas, eventTimeFields, commonFields);
958+
sender = new EventSender(conn_leader, "inputTable_1", eventSchemas, eventTimeFields, commonFields);
959959
client = new EventClient(eventSchemas, eventTimeFields, commonFields);
960960

961961
List<Entity> attributes = new ArrayList<>();
962962
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
963963
attributes.add(new BasicString("123456"));
964+
965+
final DBConnection finalConn1 = conn_leader;
966+
EventMessageHandler handler = new EventMessageHandler() {
967+
@Override
968+
public void doEvent(String eventType, List<Entity> attribute) {
969+
try {
970+
finalConn1.run("tableInsert{outputTable}", attribute);
971+
} catch (IOException e) {
972+
throw new RuntimeException(e);
973+
}
974+
}
975+
};
964976
client.subscribe(StreamLeaderHost, StreamLeaderPort, "inputTable_1", "test1", handler, -1, true, "admin", "123456");
965977
sender.sendEvent("MarketData", attributes);
966978
sleep(1000);
967-
BasicTable re = (BasicTable)conn1.run("select * from outputTable");
979+
BasicTable re = (BasicTable)conn_leader.run("select * from outputTable");
968980
Assert.assertEquals(1,re.rows());
969981
Assert.assertEquals("2024.03.22T10:45:03.100",re.getColumn(0).get(0).getString());
970982
Assert.assertEquals("123456",re.getColumn(1).get(0).getString());
@@ -984,13 +996,14 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
984996
int StreamFollowerPort = StreamFollowerPortTmp.getInt();
985997
System.out.println(StreamFollowerHost);
986998
System.out.println(StreamFollowerPort);
987-
DBConnection conn1 = new DBConnection();
988-
conn1.connect(StreamFollowerHost, StreamFollowerPort, "admin", "123456");
999+
DBConnection conn_follower = new DBConnection();
1000+
conn_follower.connect(StreamFollowerHost, StreamFollowerPort, "admin", "123456");
9891001
String script = "try{\ndropStreamTable(`inputTable_1)\n}catch(ex){\n}\n"+
9901002
"table = table(1000000:0, `timestamp`eventType`event`comment1, [TIMESTAMP,STRING,BLOB,STRING]);\n"+
991-
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);\n"+
1003+
"haStreamTable("+GROUP_ID+", table, `inputTable_1, 100000);" +
1004+
"sleep(1000);\n"+
9921005
"share table(100:0, `timestamp`comment1, [TIMESTAMP,STRING]) as outputTable;\n";
993-
conn1.run(script);
1006+
conn_follower.run(script);
9941007
EventSchema scheme = new EventSchema();
9951008
scheme.setEventType("MarketData");
9961009
scheme.setFieldNames(Arrays.asList("timestamp", "comment1"));
@@ -1000,16 +1013,30 @@ public void test_EventClient_subscribe_haStreamTable_follower() throws IOExcept
10001013
eventSchemas.add(scheme);
10011014
List<String> eventTimeFields = Arrays.asList(new String[]{"timestamp"});
10021015
List<String> commonFields = Arrays.asList(new String[]{"comment1"});
1003-
sender = new EventSender(conn, "inputTable_1", eventSchemas, eventTimeFields, commonFields);
1016+
sender = new EventSender(conn_follower, "inputTable_1", eventSchemas, eventTimeFields, commonFields);
10041017
client = new EventClient(eventSchemas, eventTimeFields, commonFields);
10051018

10061019
List<Entity> attributes = new ArrayList<>();
10071020
attributes.add(new BasicTimestamp(LocalDateTime.of(2024,3,22,10,45,3,100000000)));
10081021
attributes.add(new BasicString("123456"));
1009-
client.subscribe(StreamFollowerHost, StreamFollowerPort, "inputTable_1", "test1", handler, -1, true, "user1", "123456");
1022+
final DBConnection finalConn1 = conn_follower;
1023+
EventMessageHandler handler = new EventMessageHandler() {
1024+
@Override
1025+
public void doEvent(String eventType, List<Entity> attribute) {
1026+
System.out.println("eventType: ");
1027+
System.out.println("eventType: " + eventType);
1028+
System.out.println(attribute.toString());
1029+
try {
1030+
finalConn1.run("tableInsert{outputTable}", attribute);
1031+
} catch (IOException e) {
1032+
throw new RuntimeException(e);
1033+
}
1034+
}
1035+
};
1036+
client.subscribe(StreamFollowerHost, StreamFollowerPort, "inputTable_1", "test1", handler, -1, true, "admin", "123456");
10101037
sender.sendEvent("MarketData", attributes);
10111038
sleep(1000);
1012-
BasicTable re = (BasicTable)conn1.run("select * from outputTable");
1039+
BasicTable re = (BasicTable)conn_follower.run("select * from outputTable");
10131040
Assert.assertEquals(1,re.rows());
10141041
Assert.assertEquals("2024.03.22T10:45:03.100",re.getColumn(0).get(0).getString());
10151042
Assert.assertEquals("123456",re.getColumn(1).get(0).getString());

test/com/xxdb/streaming/reverse/PollingClientReverseTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2094,4 +2094,111 @@ public void test_PollingClient_subscribe_orca_table_not_orca_node() throws Excep
20942094
}
20952095
pollingClient.unsubscribe(HOST, port1, "orca.orca_table.output","orca");
20962096
}
2097+
2098+
@Test(timeout = 120000)
2099+
public void Test_threadedClient_subscribe_haStreamTable_on_leader() throws IOException, InterruptedException {
2100+
BasicString StreamLeaderTmp = (BasicString)conn.run(String.format("getStreamingLeader(%d)", GROUP_ID));
2101+
String StreamLeader = StreamLeaderTmp.getString();
2102+
BasicString StreamLeaderHostTmp = (BasicString)conn.run(String.format("(exec host from rpc(getControllerAlias(), getClusterPerf) where name=\"%s\")[0]", StreamLeader));
2103+
String StreamLeaderHost = StreamLeaderHostTmp.getString();
2104+
BasicInt StreamLeaderPortTmp = (BasicInt)conn.run(String.format("(exec port from rpc(getControllerAlias(), getClusterPerf) where mode = 0 and name=\"%s\")[0]", StreamLeader));
2105+
int StreamLeaderPort = StreamLeaderPortTmp.getInt();
2106+
System.out.println(StreamLeaderHost);
2107+
System.out.println(StreamLeaderPort);
2108+
DBConnection conn_leader = new DBConnection();
2109+
conn_leader.connect(StreamLeaderHost, StreamLeaderPort, "admin", "123456");
2110+
String script = "try{\ndropStreamTable(`Trades)\n}catch(ex){\n}\n" +
2111+
"table = table(1000000:0, `tag`ts`data,[INT,TIMESTAMP,DOUBLE]);\n"+
2112+
"haStreamTable("+GROUP_ID+", table, `Trades, 100000);\n"+
2113+
" share streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE]) as Receive;";
2114+
conn_leader.run(script);
2115+
final DBConnection finalConn1 = conn_leader;
2116+
TopicPoller poller = pollingClient.subscribe(StreamLeaderHost, StreamLeaderPort, "Trades", "haStreamTable", 0);
2117+
conn_leader.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
2118+
List<IMessage> messages = poller.poll(2000,10000);
2119+
2120+
for(int i=0;i<messages.size();i++){
2121+
try {
2122+
IMessage msg = messages.get(i);
2123+
List<Entity> args = new ArrayList<>();
2124+
args.add(msg.getEntity(0));
2125+
args.add(msg.getEntity(1));
2126+
args.add(msg.getEntity(2));
2127+
finalConn1.run("tableInsert{Receive}", args);
2128+
} catch (Exception e) {
2129+
e.printStackTrace();
2130+
}
2131+
}
2132+
2133+
BasicTable re = (BasicTable) conn_leader.run("select * from Receive");
2134+
BasicTable tra = (BasicTable) conn_leader.run("select * from Trades");
2135+
assertEquals(10000, re.rows());
2136+
for (int i = 0; i < re.rows(); i++) {
2137+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
2138+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
2139+
assertEquals(re.getColumn(2).get(i).getString(), tra.getColumn(2).get(i).getString());
2140+
}
2141+
pollingClient.unsubscribe(StreamLeaderHost, StreamLeaderPort, "Trades", "haStreamTable");
2142+
}
2143+
2144+
@Test(timeout = 120000)
2145+
public void Test_threadedClient_subscribe_haStreamTable_on_follower() throws IOException, InterruptedException {
2146+
BasicString StreamLeaderTmp = (BasicString)conn.run(String.format("getStreamingLeader(%d)", GROUP_ID));
2147+
String StreamLeader = StreamLeaderTmp.getString();
2148+
BasicString StreamLeaderHostTmp = (BasicString)conn.run(String.format("(exec host from rpc(getControllerAlias(), getClusterPerf) where name=\"%s\")[0]", StreamLeader));
2149+
String StreamLeaderHost = StreamLeaderHostTmp.getString();
2150+
BasicInt StreamLeaderPortTmp = (BasicInt)conn.run(String.format("(exec port from rpc(getControllerAlias(), getClusterPerf) where mode = 0 and name=\"%s\")[0]", StreamLeader));
2151+
int StreamLeaderPort = StreamLeaderPortTmp.getInt();
2152+
System.out.println(StreamLeaderHost);
2153+
System.out.println(StreamLeaderPort);
2154+
DBConnection conn_leader = new DBConnection();
2155+
conn_leader.connect(StreamLeaderHost, StreamLeaderPort, "admin", "123456");
2156+
2157+
String script0 ="leader = getStreamingLeader("+GROUP_ID+");\n" +
2158+
"groupSitesStr = (exec sites from getStreamingRaftGroups() where id =="+GROUP_ID+")[0];\n"+
2159+
"groupSites = split(groupSitesStr, \",\");\n"+
2160+
"followerInfo = exec top 1 * from rpc(getControllerAlias(), getClusterPerf) where site in groupSites and name!=leader;";
2161+
conn.run(script0);
2162+
BasicString StreamFollowerHostTmp = (BasicString)conn.run("(exec host from followerInfo)[0]");
2163+
String StreamFollowerHost = StreamFollowerHostTmp.getString();
2164+
BasicInt StreamFollowerPortTmp = (BasicInt)conn.run("(exec port from followerInfo)[0]");
2165+
int StreamFollowerPort = StreamFollowerPortTmp.getInt();
2166+
System.out.println(StreamFollowerHost);
2167+
System.out.println(StreamFollowerPort);
2168+
DBConnection conn_follower = new DBConnection();
2169+
conn_follower.connect(StreamFollowerHost, StreamFollowerPort, "admin", "123456");
2170+
String script = "try{\ndropStreamTable(`Trades)\n}catch(ex){\n}\n" +
2171+
"table = table(1000000:0, `tag`ts`data,[INT,TIMESTAMP,DOUBLE]);\n"+
2172+
"haStreamTable("+GROUP_ID+", table, `Trades, 100000);\n"+
2173+
" share streamTable(1000000:0,`tag`ts`data,[INT,TIMESTAMP,DOUBLE]) as Receive;";
2174+
conn_follower.run(script);
2175+
final DBConnection finalConn1 = conn_follower;
2176+
2177+
TopicPoller poller = pollingClient.subscribe(StreamFollowerHost, StreamFollowerPort, "Trades", "haStreamTable", 0);
2178+
conn_leader.run("n=10000;t=table(1..n as tag,now()+1..n as ts,rand(100.0,n) as data);" + "Trades.append!(t)");
2179+
List<IMessage> messages = poller.poll(2000,10000);
2180+
2181+
for(int i=0;i<messages.size();i++){
2182+
try {
2183+
IMessage msg = messages.get(i);
2184+
List<Entity> args = new ArrayList<>();
2185+
args.add(msg.getEntity(0));
2186+
args.add(msg.getEntity(1));
2187+
args.add(msg.getEntity(2));
2188+
finalConn1.run("tableInsert{Receive}", args);
2189+
} catch (Exception e) {
2190+
e.printStackTrace();
2191+
}
2192+
}
2193+
2194+
BasicTable re = (BasicTable) conn_follower.run("select * from Receive");
2195+
BasicTable tra = (BasicTable) conn_leader.run("select * from Trades");
2196+
assertEquals(10000, re.rows());
2197+
for (int i = 0; i < re.rows(); i++) {
2198+
assertEquals(re.getColumn(0).get(i), tra.getColumn(0).get(i));
2199+
assertEquals(re.getColumn(1).get(i), tra.getColumn(1).get(i));
2200+
assertEquals(re.getColumn(2).get(i).getString(), tra.getColumn(2).get(i).getString());
2201+
}
2202+
pollingClient.unsubscribe(StreamFollowerHost, StreamFollowerPort, "Trades", "haStreamTable");
2203+
}
20972204
}

0 commit comments

Comments
 (0)