Ir para o conteúdo

Persistir Dados de Entrada para Processamento Posterior

Caso de Uso

Um caso de uso frequente com cenários em tempo real/orientados a eventos é o que fazer quando o destino do endpoint não está disponível. Um exemplo é o cenário em que uma mensagem de saída do Salesforce é recebida por uma API Jitterbit e conectada a um banco de dados. Se o banco de dados ficar off-line, a operação Jitterbit que se conecta ao banco de dados lançará um erro e ativará o caminho de operação 'On Failure'. Se nada for feito para lidar com esse tipo de erro, haverá risco de perda de dados.

Uma maneira de lidar com esse cenário é manter a payload recebida até que ela seja processada com êxito. Por exemplo, se o endpoint do banco de dados estiver offline, as mensagens serão mantidas até que o banco de dados volte a ficar online. As mensagens serão processadas automaticamente no momento em que o banco de dados estiver online novamente.

Exemplo

No exemplo a seguir, o cliente não queria apenas a persistência da mensagem, mas também queria garantir a integridade das alterações do pedido. Ou seja, se o pedido número 1000 foi seguido por 2 alterações no mesmo pedido, todas as alterações tiveram que ser processadas na ordem correta com sucesso. Isso é semelhante ao princípio de 'atomicidade' para transações de banco de dados. Neste exemplo, criar um pedido cria um arquivo separado para cada pedido. Cada alteração no processamento ou status do pedido também cria um arquivo separado.

Como o cliente tinha mais de um Agente Privado, também tínhamos que garantir que cada agente estivesse ciente do processamento realizado pelos outros agentes.

A solução foi usar bloqueios de mensagens mantidos em arquivos em um sistema de arquivos acessível a todos os agentes. Por exemplo, o Agente Privado 100 está no Host A e o Agente Privado 200 está no Host B. O sistema de arquivos que contém os arquivos bloqueados está no Host C. Tanto o Host A quanto o Host B devem ter acesso ao sistema de arquivos no Host C.

anexo

Isso é necessário para que cada agente esteja ciente dos bloqueios de pedidos feitos por todos os outros agentes.

O método pelo qual um Agente Privado é identificado exclusivamente é editando o arquivo .conf e atribuindo uma variável global (veja usando Global Variables) para um valor fixo. As variáveis do projeto não podem ser utilizadas, pois seu escopo é transversal a todos os Agentes Privados.

anexo

Neste caso, o $servername é 100. O $servername para outro Agente Privado seria 200, e assim por diante.

Para começar, suponha que as mensagens recebidas sejam gravadas em uma fonte de arquivo temporário chamada "Arquivos". Suponha também que exista uma fonte chamada "Locks", que é um diretório no sistema de arquivos compartilhados.

Começamos gerando alguns dados de teste:

/*
Will generate test order files (like 1000, 1001, 1002) and will generate additional orders (1001, 1001, 1001) randomly. Controlled by 'cnt'. so cnt = 10 will
generate 10 base orders plus some number of duplicates
*/

DeleteFiles("<TAG>Sources/Files</TAG>","*");
DeleteFiles("<TAG>Sources/Locks</TAG>","*");
cnt = 3; i = 0;seed=1000;
While(i<cnt,
 rcnt=Random(1,3);ri=0;
 While(ri<rcnt,
 WriteFile("<TAG>Targets/Files</TAG>","foobar",(seed)+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));
 sleep(1);
 ri+=1
 );
i+=1; seed +=1
);

FlushAllFiles("<TAG>Targets/Files</TAG>");

Nota

Usando FlushAllFiles() depois WriteFile() é recomendado como uma prática recomendada.

Conforme observado no comentário, o objetivo é gerar alguns pedidos de entrada (3 neste caso) com carimbos de data/hora.

Lista de Pedidos:

1000.20160322152502

1000.20160322152503

1001.20160322152504

1001.20160322152505

1001.20160322152506

1002.20160322152507

1002.20160322152508

1002.20160322152509

Temos 2 entradas para 1000, 3 entradas para 1001 e 3 entradas para 1002. Isso emula o cenário em que o pedido 1000 foi recebido junto com 1 alteração nesse pedido e assim por diante para os outros dois pedidos.

anexo

Existem duas operações nesta cadeia. A primeira operação é o pré-processamento do pedido e a segunda é um espaço reservado para a atualização de dados real para o destino, como um banco de dados ou atualização de serviço da web.

O script contido na primeira operação está abaixo. Ele verifica a origem de 'Arquivos' para pedidos e, em seguida, verifica a origem de 'Bloqueios' para quaisquer bloqueios de pedidos. Se algum bloqueio for encontrado, ele ignorará o processamento desse pedido, bem como qualquer atualização desse pedido. Se nenhum bloqueio for encontrado, um arquivo de bloqueio é criado e a operação que processa o pedido é chamada. Se for bem-sucedido, o arquivo e o bloqueio serão excluídos.

/*
1. Get list of order files with file naming convention of <filename>.<timestamp> where timestamp is yyyymmddhhmmss.
For example, 1001.20160220171811. This is done to aid in sorting the array.
2. Loop through the list of orders and check for locks. Locks have a file naming convention of <orderno>.<timestamp>.<agentnumber>
The agent number is set in the .conf file of the Private Agent. Assign a unique number to each Private Agent in the cluster.
The agent number will be useful to determine which agent set the lock. If the lock was set due to the agent failing during processing,
the user can elect to remove those locks as they were not due to the endpoint being down, or operation failure.
3. If a lock file is found for an order, then keep looping. If no lock is found for a file, the loop is stopped, go on to Step 2
4. If unlocked file found, lock file created.
5. Pass file name to synchronous operation and check operation status. If successful, then delete file and delete lock.
Optional: set up global 'datastatus' that will indicate pass/fail of insert into target and change
next statement to use it: If($datastatus || status ...
Configure the processing operation to have a short timeout (<5 min)
**/

unlock = true;
arr = FileList("<TAG>Sources/Files</TAG>","files","*");//reads list of files into array
  SortArray(arr);//sorts
WriteToOperationLog("List: "+arr);
cnt=Length(arr);i=0;//loops


// STEP 1: Find a file with no locks


While(i<cnt,

$filename=arr[i];//gets first file
 orderno=Split($filename,".");//gets order no
 WriteToOperationLog("File: "+$filename+" orderno: "+orderno[0]);
 locks = FileList("<TAG>Sources/Locks</TAG>","locks",orderno[0]+"*"); //checks of order no exists in list of locks
 nolock = If(Length(locks)>0,false,true); // if false then found lock file,skip, else can use this file
 WriteToOperationLog("Locks: "+locks+" nolock: "+nolock);
 If(nolock,
 i=cnt;

WriteToOperationLog("Found unlocked order, stop loop");

 //get file with this order no and sets incrementer to counter, stopping loop

);

 i++;

);


// STEP 2: If unlocked file found, process.


If(nolock,

 WriteToOperationLog("Selected File: "+$filename);
 lockfilename = orderno[0]+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS")+"."+$servername;
 WriteFile("<TAG>Targets/Locks</TAG>",$filename,lockfilename);// create lock file.
 FlushAllFiles("<TAG>Targets/Locks</TAG>");
 WriteToOperationLog("Lockfile: "+lockfilename);

 status = RunOperation("<TAG>Operations/3. Fake Op</TAG>");//pass filename to Operation that will process data. status is success/fail of the operation.

 // deletes order file if status is true, else file is not deleted. Also sets unlock to false, if the operation not successful, then lock is not deleted

If(status,

 DeleteFile("<TAG>Sources/Files</TAG>",$filename);
 WriteToOperationLog("delete file "+ $filename),
 unlock = false

 );

// unlockme = Split($filename,".");

// gets order number:

If(unlock, DeleteFiles("<TAG>Sources/Locks</TAG>",orderno[0]+"*");

 WriteToOperationLog("delete lock for order: "+orderno[0]+"*"));
 // deletes lock file after all the files for that order are processed,
 // but leaves lock if any of the RunOps are unsuccessful
)

Há uma série de funções-chave que tornam isso possível. Desde o FileList() função gera uma matriz, podemos usar o SortArray() função para nos fornecer o arquivo mais antigo com base no registro de data e hora no nome do arquivo. Não temos uma função que classifique um arquivo pela data de criação do arquivo. O WriteToOperationLog() geralmente é para depuração, portanto, suas instâncias podem ser removidas para fins de produção.

O script 'Fake Op' usa o RaiseError() função para simular uma falha e gerar um bloqueio:

WriteToOperationLog(Now());
RaiseError("foo")

Após uma execução, podemos verificar as fontes:

anexo

Ele mostra um único bloqueio no pedido 1000 feito pelo agente 100 e que todos os pedidos não foram processados.

Se comentarmos o RaiseError() para que processe com sucesso e execute novamente 'Check Files to Process' ...

anexo

O bloqueio no pedido #1000 permanece e o primeiro pedido 1001 (1001.20160322152504) é processado e excluído.

Execute o processo mais duas vezes...

anexo

... e todos os 1001 pedidos são processados, mas o bloqueio permanece no pedido 1000 e nenhum dos arquivos do pedido 1000 é processado.

Mas e se o problema com o endpoint for resolvido? Precisamos de uma maneira de remover as travas e tentar novamente. A abordagem é "envelhecer" os bloqueios e removê-los, permitindo uma nova tentativa dos pedidos automaticamente. Se o endpoint ainda estiver indisponível, o bloqueio será redefinido.

Este script pode ser executado periodicamente para limpar bloqueios antigos. Ele lê a fonte "Bloqueios", compara o carimbo de data/hora com o horário atual e, se tiver mais de 5 minutos, excluirá o bloqueio.

locks = FileList("<TAG>Sources/Locks</TAG>","locks","*");

SortArray(locks);
locks;
cnt = length(locks); i=0;
While(i<cnt,
order = Split(locks[i],".");
orderno = order[0];
now = (CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));

// set interval for 5 min, 300 sec;

If(Double(now)-Double(order[1])>$deletelocktime,del = true;WriteToOperationLog("order lock file to delete"+orderno);DeleteFiles("<TAG>Sources/Locks</TAG>",orderno+"*"),
 WriteToOperationLog("Locks: "+locks+" below time limit")
 );
i+=1);

Observe que isso usa uma variável de projeto, $deletelocktime, que contém o número de segundos para envelhecer o bloqueio.

anexo