0.00.0100005050NY50000YN1000100-2024/01/09 15:07:19.021-2024/01/09 15:07:19.021H4sIAAAAAAAAAAMAAAAAAAAAAAA=NMOM${mysql_host}MYSQLNative${mysql_dbname}3306${mysql_username}${mysql_password}EXTRA_OPTION_MYSQL.characterEncodingutf-8EXTRA_OPTION_MYSQL.defaultFetchSize5000EXTRA_OPTION_MYSQL.rewriteBatchedStatementstrueEXTRA_OPTION_MYSQL.useCompressiontrueEXTRA_OPTION_MYSQL.useCursorFetchtrueEXTRA_OPTION_MYSQL.useServerPrepStmtstrueFORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENIS_CLUSTEREDNPORT_NUMBER3306PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSTREAM_RESULTSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGNU8${U8DB_HOST}GENERICNative${U8DB_NAME}1433${sqlserver_username}${sqlserver_password}CUSTOM_DRIVER_CLASS${sqlserver_driverclassname}CUSTOM_URL${sqlserver_url}DATABASE_DIALECT_IDGeneric databaseFORCE_IDENTIFIERS_TO_LOWERCASENFORCE_IDENTIFIERS_TO_UPPERCASENIS_CLUSTEREDNPORT_NUMBER1433PRESERVE_RESERVED_WORD_CASEYQUOTE_ALL_FIELDSNSUPPORTS_BOOLEAN_DATA_TYPEYSUPPORTS_TIMESTAMP_DATA_TYPEYUSE_POOLINGN插入 / 更新复制记录到结果YJava 代码-id插入 / 更新Y表输入Java 代码-idY获取变量表输入YJava 代码-idUserDefinedJavaClassN1noneTRANSFORM_CLASSProcessorpublic boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
if (snowflakeIdWorker == null) {
snowflakeIdWorker = new SnowflakeIdWorker(1, 1); // 设置你的workerId和datacenterId
}
String id = String.valueOf(snowflakeIdWorker.nextId());
get(Fields.Out, "id").setValue(r, id);
putRow(data.outputRowMeta, r);
return true;
}
// 定义全局变量(放在类顶部)
private static SnowflakeIdWorker snowflakeIdWorker = null;
// 内部类:雪花算法实现
public static class SnowflakeIdWorker {
private final long twepoch = 1288834974657L;
private final long workerIdBits = 5L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;
private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
private long workerId;
private long datacenterId;
private long sequence = 0L;
private long lastTimestamp = -1L;
public SnowflakeIdWorker(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
public synchronized long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id for " + (lastTimestamp - timestamp) + " milliseconds");
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
private long timeGen() {
return System.currentTimeMillis();
}
}idString-1-1N288128Y复制记录到结果RowsToResultY1none592128Y插入 / 更新InsertUpdateY1noneMOM1000Y
wms_task_out
interface_idinterface_id=task_datetask_date=dept_codedept_code=bill_typebill_type_code=interface_idinterface_idNinterface_codeinterface_codeNididNbill_type_codebill_typeNtask_typetask_typeNtask_datetask_dateNtask_codetask_codeNdept_codedept_codeNcreate_idsync_idNcreate_bysync_nameNcreate_timesync_timeN448128Y表输入TableInputY1noneU8DECLARE @USER_NAME VARCHAR(50),@USER_ID INT,@BEGIN_DATE DATE,@END_DATE DATE,@DEPT_CODE VARCHAR(50)
SET @USER_NAME = CASE '${USER_NAME}' WHEN '' THEN '管理员(kettle)' ELSE '${USER_NAME}' END
SET @USER_ID = CASE 0${USER_ID} WHEN 0 THEN 1 ELSE 0${USER_ID} END
SET @BEGIN_DATE = CASE '${BEGIN_DATE}' WHEN '' THEN CONVERT(VARCHAR(100), GETDATE(), 23) ELSE '${BEGIN_DATE}' END
SET @END_DATE = CASE '${BEGIN_DATE}' WHEN '' THEN CONVERT(VARCHAR(100), DATEADD(DAY, 1, GETDATE()), 23) ELSE '${END_DATE}' END
SET @DEPT_CODE = '${DEPT_CODE}'
SELECT
T1.MoId AS interface_id
,T1.MoCode AS interface_code
,-1 AS task_type --1入库、-1入库退、-1出库、1出库退
,1 AS bill_type --1生产订单、2委外订单、3委外到货单、4采购订单、5采购到货单、6销售订单、7销售发货单
,'OT1-'+T1.MoCode+ CASE T2.RowID WHEN 1 THEN '' ELSE +'-'+CONVERT(VARCHAR,T2.RowID-1) END AS task_code
,T2.StartDate AS task_date
,T2.MDeptCode AS dept_code
,@USER_NAME AS sync_name
,@USER_ID AS sync_id
,GETDATE() AS sync_time
FROM mom_order T1
JOIN (SELECT
K1.MoId,ISNULL(K1.MDeptCode,'') MDeptCode,CONVERT(DATE,K2.StartDate) StartDate
,RANK()OVER(PARTITION BY K1.MoId ORDER BY ISNULL(K1.MDeptCode,''),CONVERT(DATE,K2.StartDate)) RowID
FROM mom_orderdetail K1,mom_morder K2
WHERE K1.MoDId =K2.MoDId
AND ISNULL(K1.QualifiedInQty,0) =0
AND K1.Status =3 --3已审核4关闭
AND ((@DEPT_CODE ='' AND 1=1)OR(@DEPT_CODE ='${DEPT_CODE}' AND 1=1)OR(@DEPT_CODE<>'' AND @DEPT_CODE =ISNULL(K1.MDeptCode,'')))
GROUP BY K1.MoId,ISNULL(K1.MDeptCode,''),CONVERT(DATE,K2.StartDate)) T2 ON T1.MoId =T2.MoId
WHERE 1=1
--AND T2.Status =3 --3已审核4关闭
--AND EXISTS (SELECT 1 FROM mom_moallocate T WHERE T2.MoDId =T.MoDId WHERE ISNULL(IssQty,0) =0)
AND CONVERT(DATE,T2.StartDate) BETWEEN @BEGIN_DATE AND @END_DATE
0NYNNIntegernormalinterface_id90表输入interface_id####0;-####0.,noneNY0NNNzh_CNAsia/ShanghaiNStringnormalinterface_code30-1表输入interface_code.,noneNY0NNNzh_CNAsia/ShanghaiNIntegernormaltask_type90表输入task_type####0;-####0.,noneNY0NNNzh_CNAsia/ShanghaiNIntegernormalbill_type90表输入bill_type####0;-####0.,noneNY0NNNzh_CNAsia/ShanghaiNStringnormaltask_code65-1表输入task_code.,noneNY0NNNzh_CNAsia/ShanghaiNStringnormaltask_date10-1表输入task_date.,noneNY0NNNzh_CNAsia/ShanghaiNStringnormaldept_code12-1表输入dept_code.,noneNY0NNNzh_CNAsia/ShanghaiNStringnormalsync_name50-1表输入sync_name.,noneNY0NNNzh_CNAsia/ShanghaiNIntegernormalsync_id90表输入sync_id####0;-####0.,noneNY0NNNzh_CNAsia/ShanghaiNTimestampnormalsync_time3-1表输入sync_time.,noneNY0NNNzh_CNAsia/ShanghaiN128128Y获取变量GetVariableY1nonemysql_driverclassname${mysql_driverclassname}String-1-1nonemysql_password${mysql_password}String-1-1nonemysql_url${mysql_url}String-1-1nonemysql_username${mysql_username}String-1-1noneUSER_ID${USER_ID}String-1-1nonesqlserver_driverclassname${sqlserver_driverclassname}String-1-1nonesqlserver_password${sqlserver_password}String-1-1nonesqlserver_url${sqlserver_url}String-1-1nonesqlserver_username${sqlserver_username}String-1-1noneUSER_NAME${USER_NAME}String-1-1noneBEGIN_DATE${BEGIN_DATE}String-1-1noneEND_DATE${END_DATE}String-1-1noneDEPT_CODE${DEPT_CODE}String-1-1nonemysql_host${mysql_host}String-1-1nonemysql_dbname${mysql_dbname}String-1-1none136575YN