Ir para o conteúdo

Persistir dados de entrada para processamento posterior no Jitterbit Design Studio

Caso de uso

Um caso de uso frequente com cenários em tempo real/orientados por eventos é o que fazer quando o destino do endpoint não está disponível. Um exemplo é o cenário em que uma mensagem de saída do Salesforce é recebida por uma API Jitterbit e interligada a um banco de dados. Se o banco de dados ficar offline, a operação Jitterbit que se conecta ao banco de dados lançará um erro e ativará o caminho da operação 'On Failure'. Se nada for feito para lidar com esse tipo de erro, haverá risco de perda de dados.

Uma maneira de lidar com esse cenário é persistir a payload de entrada até que ela seja processada com sucesso. Por exemplo, se o endpoint do banco de dados estiver offline, as mensagens serão persistidas até que o banco de dados esteja online novamente. As mensagens serão processadas automaticamente no momento em que o banco de dados estiver online novamente.

Exemplo

No exemplo a seguir, o cliente não queria apenas persistência de mensagem, mas também queria garantir a integridade das alterações do pedido. Ou seja, se o número do pedido 1000 fosse seguido por 2 alterações no mesmo pedido, todas as alterações teriam que ser processadas na ordem correta com sucesso. Isso é semelhante ao princípio de "atomicidade" para transações de banco de dados. Neste exemplo, criar um pedido cria um arquivo separado para cada pedido. Cada alteração no processamento ou status do pedido também cria um arquivo separado.

Como o cliente tinha mais de um agente privado, também tivemos que garantir que cada agente estivesse ciente do processamento realizado pelos outros agentes.

A solução foi usar bloqueios de mensagem mantidos em arquivos em um sistema de arquivos que fosse acessível a todos os agentes. Por exemplo, o agente privado 100 está no Host A e o agente privado 200 está no Host B. O sistema de arquivos que contém os arquivos de bloqueio está no Host C. Tanto o Host A quanto o Host B devem ter acesso ao sistema de arquivos no Host C.

anexo

Isso é necessário para que cada agente esteja ciente dos bloqueios colocados em ordens por todos os outros agentes.

O método pelo qual um agente privado é identificado exclusivamente é editando o arquivo .conf e atribuindo uma variável global (veja usando Variáveis globais) para um valor fixo. Variáveis de projeto não puderam ser usadas, pois seu escopo abrange todos os agentes privados.

anexo

Neste caso, o $servername é 100. O $servername para outro agente privado seria 200, e assim por diante.

Para começar, suponha que as mensagens recebidas sejam gravadas em uma fonte de arquivo temporária chamada "Files". Suponha também que haja uma fonte chamada "Locks", que é um diretório no sistema de arquivos compartilhado.

Começamos gerando alguns dados de teste:

/*
Will generate test order files (like 1000, 1001, 1002) and will generate additional orders (1001, 1001, 1001) randomly. Controlled by 'cnt'. so cnt = 10 will
generate 10 base orders plus some number of duplicates
*/

DeleteFiles("<TAG>Sources/Files</TAG>","*");
DeleteFiles("<TAG>Sources/Locks</TAG>","*");
cnt = 3; i = 0;seed=1000;
While(i<cnt,
 rcnt=Random(1,3);ri=0;
 While(ri<rcnt,
 WriteFile("<TAG>Targets/Files</TAG>","foobar",(seed)+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));
 sleep(1);
 ri+=1
 );
i+=1; seed +=1
);

FlushAllFiles("<TAG>Targets/Files</TAG>");

Nota

Usando FlushAllFiles() depois WriteFile() é recomendado como uma prática recomendada.

Conforme observado no comentário, o objetivo é gerar alguns pedidos de entrada (3 neste caso) com carimbos de data/hora.

Lista de pedidos:

1000.20160322152502

1000.20160322152503

1001.20160322152504

1001.20160322152505

1001.20160322152506

1002.20160322152507

1002.20160322152508

1002.20160322152509

Temos 2 entradas para 1000, 3 entradas para 1001 e 3 entradas para 1002. Isso emula o cenário em que o pedido 1000 foi recebido junto com 1 mude para essa ordem, e assim por diante para as outras duas ordens.

anexo

Há duas operações nessa cadeia. A primeira operação é o pré-processamento do pedido e a segunda é um espaço reservado para a atualização de dados real para o alvo, como uma atualização de banco de dados ou serviço web.

O script contido na primeira operação está abaixo. Ele verifica a fonte 'Files' para pedidos, então verifica a fonte 'Locks' para quaisquer bloqueios de pedidos. Se quaisquer bloqueios forem encontrados, então ele pula o processamento desse pedido, bem como quaisquer atualizações para esse pedido. Se não houver bloqueios encontrados, então um arquivo de bloqueio é criado e a operação que processa o pedido é chamada. Se bem-sucedido, tanto o arquivo quanto o bloqueio são excluídos.

/*
1. Get list of order files with file naming convention of <filename>.<timestamp> where timestamp is yyyymmddhhmmss.
For example, 1001.20160220171811. This is done to aid in sorting the array.
2. Loop through the list of orders and check for locks. Locks have a file naming convention of <orderno>.<timestamp>.<agentnumber>
The agent number is set in the .conf file of the private agent. Assign a unique number to each private agent in the cluster.
The agent number will be useful to determine which agent set the lock. If the lock was set due to the agent failing during processing,
the user can elect to remove those locks as they were not due to the endpoint being down, or operation failure.
3. If a lock file is found for an order, then keep looping. If no lock is found for a file, the loop is stopped, go on to Step 2
4. If unlocked file found, lock file created.
5. Pass file name to synchronous operation and check operation status. If successful, then delete file and delete lock.
Optional: set up global 'datastatus' that will indicate pass/fail of insert into target and change
next statement to use it: If($datastatus || status ...
Configure the processing operation to have a short timeout (<5 min)
**/

unlock = true;
arr = FileList("<TAG>Sources/Files</TAG>","files","*");//reads list of files into array
  SortArray(arr);//sorts
WriteToOperationLog("List: "+arr);
cnt=Length(arr);i=0;//loops


// STEP 1: Find a file with no locks


While(i<cnt,

$filename=arr[i];//gets first file
 orderno=Split($filename,".");//gets order no
 WriteToOperationLog("File: "+$filename+" orderno: "+orderno[0]);
 locks = FileList("<TAG>Sources/Locks</TAG>","locks",orderno[0]+"*"); //checks of order no exists in list of locks
 nolock = If(Length(locks)>0,false,true); // if false then found lock file,skip, else can use this file
 WriteToOperationLog("Locks: "+locks+" nolock: "+nolock);
 If(nolock,
 i=cnt;

WriteToOperationLog("Found unlocked order, stop loop");

 //get file with this order no and sets incrementer to counter, stopping loop

);

 i++;

);


// STEP 2: If unlocked file found, process.


If(nolock,

 WriteToOperationLog("Selected File: "+$filename);
 lockfilename = orderno[0]+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS")+"."+$servername;
 WriteFile("<TAG>Targets/Locks</TAG>",$filename,lockfilename);// create lock file.
 FlushAllFiles("<TAG>Targets/Locks</TAG>");
 WriteToOperationLog("Lockfile: "+lockfilename);

 status = RunOperation("<TAG>Operations/3. Fake Op</TAG>");//pass filename to Operation that will process data. status is success/fail of the operation.

 // deletes order file if status is true, else file is not deleted. Also sets unlock to false, if the operation not successful, then lock is not deleted

If(status,

 DeleteFile("<TAG>Sources/Files</TAG>",$filename);
 WriteToOperationLog("delete file "+ $filename),
 unlock = false

 );

// unlockme = Split($filename,".");

// gets order number:

If(unlock, DeleteFiles("<TAG>Sources/Locks</TAG>",orderno[0]+"*");

 WriteToOperationLog("delete lock for order: "+orderno[0]+"*"));
 // deletes lock file after all the files for that order are processed,
 // but leaves lock if any of the RunOps are unsuccessful
)

Há uma série de funções-chave que tornam isso possível. Desde o FileList() função gera uma matriz, podemos usar o SortArray() função para nos dar o arquivo mais antigo com base no timestamp no nome do arquivo. Não temos uma função que classifique um arquivo pela data de criação do arquivo. A WriteToOperationLog() A função geralmente é para depuração, portanto suas instâncias podem ser removidas para fins de produção.

O script 'Fake Op' usa o RaiseError() função para simular uma falha e gerar um bloqueio:

WriteToOperationLog(Now());
RaiseError("foo")

Após uma execução, podemos verificar as fontes:

anexo

Ele mostra um único bloqueio no pedido 1000 feito pelo agente 100, e que todos os pedidos não foram processados.

Se comentarmos o RaiseError() para que ele processe com sucesso e execute novamente 'Verificar arquivos para processar' ...

anexo

O bloqueio na ordem #1000 permanece, e a primeira ordem 1001 (1001.20160322152504) é processada e excluída.

Execute o processo mais duas vezes...

anexo

... e todos os 1001 pedidos são processados, mas o bloqueio permanece no pedido 1000 e nenhum dos arquivos do pedido 1000 é processado.

Mas e se o problema com o endpoint for resolvido? Precisamos de uma maneira de remover os bloqueios e tentar novamente. A abordagem é "envelhecer" os bloqueios e removê-los, permitindo uma nova tentativa dos pedidos automaticamente. Se o endpoint ainda estiver indisponível, o bloqueio será redefinido.

Este script pode ser executado periodicamente para limpar bloqueios antigos. Ele lê a fonte "Locks", compara o timestamp com o horário atual e, se for mais antigo que 5 minutos, excluirá o bloqueio.

locks = FileList("<TAG>Sources/Locks</TAG>","locks","*");

SortArray(locks);
locks;
cnt = length(locks); i=0;
While(i<cnt,
order = Split(locks[i],".");
orderno = order[0];
now = (CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));

// set interval for 5 min, 300 sec;

If(Double(now)-Double(order[1])>$deletelocktime,del = true;WriteToOperationLog("order lock file to delete"+orderno);DeleteFiles("<TAG>Sources/Locks</TAG>",orderno+"*"),
 WriteToOperationLog("Locks: "+locks+" below time limit")
 );
i+=1);

Observe que isso usa uma variável de projeto, $deletelocktime, que contém o número de segundos para envelhecer o bloqueio.

anexo