spark udf代碼示例:
public static UserDefinedFunction SNAP_TIME = udf((String rgnDesc,String cntryCd,String nam, String emea, String latam,
String russia, String tokyo, String hongkong, String sydney, String seasia)->{
String name = "";
if(rgnDesc != null && !rgnDesc.trim().isEmpty() && cntryCd != null && regionMap.get(rgnDesc)!= null ){
name= RegionAndCountryEnum.map(String.valueOf(regionMap.get(rgnDesc)),cntryCd);
}else{
return name;
}
if(name.equals(RegionAndCountryEnum.NAM.name())){
return nam;
}
if(name.equals(RegionAndCountryEnum.EMEA.name())){
return emea;
}
if(name.equals(RegionAndCountryEnum.LATAM.name())){
return latam;
}
if(name.equals(RegionAndCountryEnum.RUSSIA.name())){
return russia;
}
if(name.equals(RegionAndCountryEnum.TOKYO.name())){
return tokyo;
}
if(name.equals(RegionAndCountryEnum.HONG_KONG.name())){
return hongkong;
}
if(name.equals(RegionAndCountryEnum.SYDNEY.name())){
return sydney;
}
if(name.equals(RegionAndCountryEnum.SEASIA.name())){
return seasia;
}
return null;
}, DataTypes.StringType);
numQualityTest.apply(col("QUALITY_TEST"), coalesce(col("CUSIP_RECORD_CD"), lit(0)))
public static String udfFuc(String cusipCd, String smcpId, String isInCd, String alVndrNm, String s2SnapCd,String ccyCd, Broadcast<Map<String, String>> vendorBroadCast) {
Map<String, String> map = vendorBroadCast.value();
String ccyCdFlag = "1";
if ("BB_BVAL_BVAL".equals(alVndrNm.trim())) {
ccyCdFlag = "null";
ccyCd ="null";
}
String cusipKey = "cusip" + "^" + cusipCd + "^" + alVndrNm + "^" + s2SnapCd + "^" + ccyCdFlag + "^" + ccyCd;
if (!StringUtils.isEmpty(cusipCd) && map.get(cusipKey) != null) {
return map.get(cusipKey);
}
String smcpKey = "smcp" + "^" + smcpId + "^" + alVndrNm + "^" + s2SnapCd + "^" + ccyCdFlag + "^" + ccyCd;
if (!StringUtils.isEmpty(smcpId) && map.get(smcpKey) != null) {
return map.get(smcpKey);
}
String isInKey = "isin" + "^" + isInCd + "^" + alVndrNm + "^" + s2SnapCd + "^" + ccyCdFlag + "^" + ccyCd;
if (!StringUtils.isEmpty(isInCd) && map.get(isInKey) != null) {
return map.get(isInKey);
}
return "NULL";
}
Broadcast<Map<String, String>> vendorBroadCast = new JavaSparkContext(datasetFactory.getSparkSession().sparkContext()).broadcast(pmcKeysMapNew);
UserDefinedFunction MATCH_FLAG = udf((String cusipCd, String smcpId, String isInCd, String alVndrNm, String s2SnapCd, String ccyCd) ->
SprecialUdf.udfFuc(cusipCd, smcpId, isInCd, alVndrNm, s2SnapCd, ccyCd, vendorBroadCast)
, DataTypes.StringType);
Dataset invenAndRuleDs = ruleAndSmcDs.withColumn("VENDOR_ROW", MATCH_FLAG.apply(col("CUSIP_CD"), col("SMCP_ID"), col("ISIN_CD"), col("SOURCES"), col("SNAP_TIME"), col("ccy_cd")));
spark mapPartitions代碼示例:
Dataset<String> json=dataset.toJSON();
Dataset<Row> Jsonset= json.mapPartitions(
(Iterator<String> it)-> {
List<Row> out = new ArrayList<>();
while (it.hasNext()) {
List<Object> fields = new ArrayList<>();
String message=String.valueOf(it.next());
java.lang.reflect.Type type = new TypeToken<HashMap<String, String>>() {
}.getType();
Map<String, String> map = gson.fromJson(message, type);
String tmId=String.valueOf(map.get("TMID"));
Double doue=Double.valueOf(String.valueOf(map.get("FEED_ID")));
int feedId=doue.intValue();
String valuationDate=String.valueOf(map.get("VDATE"));
String serviceName=String.valueOf(map.get("SERVICENAME"));
String subArea=String.valueOf(map.get("SUBAREA"));
String underlying=String.valueOf(map.get("UNDERLYING"));
String assetClass=String.valueOf(map.get("ASSETCLASS"));
String serviceFrequency=String.valueOf(map.get("SERVICEFREQUENCY"));
String key=tmId+""+feedId+""+valuationDate+""+serviceName+""+subArea+""+underlying+""+assetClass+"_"+serviceFrequency+2;
fields.add(key);
fields.add(message);
out.add(new GenericRowWithSchema( fields.toArray(),jsonsetSchema) );
}
return out.iterator();
}, RowEncoder.apply(jsonsetSchema)
);
mdhTotemCollector.toJSON().foreachPartition(
(Iterator<String> t) -> {
/Jedis jedis = new Jedis("10.50.68.174",14761);
jedis.auth("idm@2021");
Pipeline pipelined=jedis.pipelined();/
Config config = new Config();
config.useSingleServer().setPassword("idm@2021").setAddress("10.50.68.174" + ":" + 14761);
RedissonClient redisson = Redisson.create(config);
Gson gson = new GsonBuilder().create();
while (t.hasNext()) {
String message = String.valueOf(t.next());
java.lang.reflect.Type type = new TypeToken<HashMap<String, String>>() {
}.getType();
Map<String, String> map = gson.fromJson(message, type);
String tmId = String.valueOf(map.get("TMID"));
Double doue = Double.valueOf(String.valueOf(map.get("FEED_ID")));
int feedId = doue.intValue();
String valuationDate = String.valueOf(map.get("VDATE"));
String serviceName = String.valueOf(map.get("SERVICENAME"));
String subArea = String.valueOf(map.get("SUBAREA"));
String underlying = String.valueOf(map.get("UNDERLYING"));
String assetClass = String.valueOf(map.get("ASSETCLASS"));
String serviceFrequency = String.valueOf(map.get("SERVICEFREQUENCY"));
String key = tmId + "" + feedId + "" + valuationDate + "" + serviceName + "" + subArea + "" + underlying + "" + assetClass + "_" + serviceFrequency + 8;
RSet<String> set = redisson.getSet(key);
set.add(message);
}
redisson.shutdown();
}
);