Saltar al contenido

Conservar datos entrantes para su posterior procesamiento en Jitterbit Design Studio

Caso de uso

Un caso de uso frecuente en escenarios en tiempo real o controlados por eventos es qué hacer cuando el extremo no está disponible. Un ejemplo es el escenario en el que una API de Jitterbit recibe un mensaje saliente de Salesforce y lo conecta a una base de datos. Si la base de datos se desconecta, la operación de Jitterbit que se conecta a la base de datos generará un error y activará la ruta de operación "En caso de error". Si no se hace nada para manejar este tipo de error, existe el riesgo de pérdida de datos.

Una forma de manejar esta situación es conservar la carga útil entrante hasta que se procese correctamente. Por ejemplo, si el extremo de la base de datos está fuera de línea, los mensajes se conservan hasta que la base de datos vuelva a estar en línea. Los mensajes se procesarán automáticamente en el momento en que la base de datos vuelva a estar en línea.

Ejemplo

En el siguiente ejemplo, el cliente no solo quería la persistencia de los mensajes, sino que también quería garantizar la integridad de los cambios en los pedidos. Es decir, si el número de pedido 1000 era seguido por 2 cambios en el mismo pedido, todos los cambios debían procesarse en el orden correcto con éxito. Esto es similar al principio de "atomicidad" para las transacciones de bases de datos. En este ejemplo, la creación de un pedido crea un archivo independiente para cada pedido. Cada cambio en el procesamiento o el estado del pedido también crea un archivo independiente.

Como el cliente tenía más de un agente privado, también teníamos que asegurarnos de que cada agente estuviera al tanto del procesamiento realizado por los demás agentes.

La solución fue utilizar bloqueos de mensajes guardados en archivos en un sistema de archivos al que pudieran acceder todos los agentes. Por ejemplo, el agente privado 100 está en el Host A y el agente privado 200 está en el Host B. El sistema de archivos que contiene los archivos de bloqueo está en el Host C. Tanto el Host A como el Host B deben tener acceso al sistema de archivos del Host C.

attachment

Esto es necesario para que cada agente esté al tanto de los bloqueos colocados en los pedidos por todos los demás agentes.

El método por el cual se identifica de manera única a un agente privado es editando el archivo .conf y asignando una variable global (consulte Uso de Variables globales) a un valor fijo. No se pudieron utilizar las variables del proyecto, ya que su alcance abarca a todos los agentes privados.

attachment

En este caso, el $servername es 100. El $servername Para otro agente privado sería 200, y así sucesivamente.

Para comenzar, supongamos que los mensajes entrantes se escriben en una fuente de archivo temporal denominada "Archivos". Supongamos también que existe una fuente denominada "Bloqueos", que es un directorio en el sistema de archivos compartido.

Comenzamos generando algunos datos de prueba:

/*
Will generate test order files (like 1000, 1001, 1002) and will generate additional orders (1001, 1001, 1001) randomly. Controlled by 'cnt'. so cnt = 10 will
generate 10 base orders plus some number of duplicates
*/

DeleteFiles("<TAG>Sources/Files</TAG>","*");
DeleteFiles("<TAG>Sources/Locks</TAG>","*");
cnt = 3; i = 0;seed=1000;
While(i<cnt,
 rcnt=Random(1,3);ri=0;
 While(ri<rcnt,
 WriteFile("<TAG>Targets/Files</TAG>","foobar",(seed)+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));
 sleep(1);
 ri+=1
 );
i+=1; seed +=1
);

FlushAllFiles("<TAG>Targets/Files</TAG>");

Nota

Uso FlushAllFiles() después WriteFile() Se recomienda como práctica recomendada.

Como se indica en el comentario, el objetivo es generar algunos pedidos entrantes (3 en este caso) con marcas de tiempo.

Lista de pedidos:

1000.20160322152502

1000.20160322152503

1001.20160322152504

1001.20160322152505

1001.20160322152506

1002.20160322152507

1002.20160322152508

1002.20160322152509

Tenemos 2 entradas para 1000, 3 entradas para 1001 y 3 entradas para 1002. Esto emula el escenario en el que se recibió el pedido 1000 junto con 1 cambia a ese orden, y así sucesivamente para los otros dos órdenes.

attachment

Hay dos operaciones en esta cadena. La primera operación es el preprocesamiento de la orden y la segunda es un marcador de posición para la actualización de datos reales en el destino, como una actualización de la base de datos o del servicio web.

El secuencia de comandos que se incluye en la primera operación se muestra a continuación. Comprueba la fuente "Archivos" en busca de pedidos y, a continuación, la fuente "Bloqueos" en busca de bloqueos de pedidos. Si se encuentran bloqueos, se omite el procesamiento de ese pedido y las actualizaciones de ese pedido. Si no se encuentran bloqueos, se crea un archivo de bloqueo y se llama a la operación que procesa el pedido. Si la operación es correcta, se eliminan tanto el archivo como el bloqueo.

/*
1. Get list of order files with file naming convention of <filename>.<timestamp> where timestamp is yyyymmddhhmmss.
For example, 1001.20160220171811. This is done to aid in sorting the array.
2. Loop through the list of orders and check for locks. Locks have a file naming convention of <orderno>.<timestamp>.<agentnumber>
The agent number is set in the .conf file of the private agent. Assign a unique number to each private agent in the cluster.
The agent number will be useful to determine which agent set the lock. If the lock was set due to the agent failing during processing,
the user can elect to remove those locks as they were not due to the endpoint being down, or operation failure.
3. If a lock file is found for an order, then keep looping. If no lock is found for a file, the loop is stopped, go on to Step 2
4. If unlocked file found, lock file created.
5. Pass file name to synchronous operation and check operation status. If successful, then delete file and delete lock.
Optional: set up global 'datastatus' that will indicate pass/fail of insert into target and change
next statement to use it: If($datastatus || status ...
Configure the processing operation to have a short timeout (<5 min)
**/

unlock = true;
arr = FileList("<TAG>Sources/Files</TAG>","files","*");//reads list of files into array
  SortArray(arr);//sorts
WriteToOperationLog("List: "+arr);
cnt=Length(arr);i=0;//loops


// STEP 1: Find a file with no locks


While(i<cnt,

$filename=arr[i];//gets first file
 orderno=Split($filename,".");//gets order no
 WriteToOperationLog("File: "+$filename+" orderno: "+orderno[0]);
 locks = FileList("<TAG>Sources/Locks</TAG>","locks",orderno[0]+"*"); //checks of order no exists in list of locks
 nolock = If(Length(locks)>0,false,true); // if false then found lock file,skip, else can use this file
 WriteToOperationLog("Locks: "+locks+" nolock: "+nolock);
 If(nolock,
 i=cnt;

WriteToOperationLog("Found unlocked order, stop loop");

 //get file with this order no and sets incrementer to counter, stopping loop

);

 i++;

);


// STEP 2: If unlocked file found, process.


If(nolock,

 WriteToOperationLog("Selected File: "+$filename);
 lockfilename = orderno[0]+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS")+"."+$servername;
 WriteFile("<TAG>Targets/Locks</TAG>",$filename,lockfilename);// create lock file.
 FlushAllFiles("<TAG>Targets/Locks</TAG>");
 WriteToOperationLog("Lockfile: "+lockfilename);

 status = RunOperation("<TAG>Operations/3. Fake Op</TAG>");//pass filename to Operation that will process data. status is success/fail of the operation.

 // deletes order file if status is true, else file is not deleted. Also sets unlock to false, if the operation not successful, then lock is not deleted

If(status,

 DeleteFile("<TAG>Sources/Files</TAG>",$filename);
 WriteToOperationLog("delete file "+ $filename),
 unlock = false

 );

// unlockme = Split($filename,".");

// gets order number:

If(unlock, DeleteFiles("<TAG>Sources/Locks</TAG>",orderno[0]+"*");

 WriteToOperationLog("delete lock for order: "+orderno[0]+"*"));
 // deletes lock file after all the files for that order are processed,
 // but leaves lock if any of the RunOps are unsuccessful
)

Existen varias funciones clave que hacen esto posible. Dado que el FileList() La función genera una matriz, podemos usar el SortArray() función que nos proporcione el archivo más antiguo en función de la marca de tiempo en el nombre del archivo. No tenemos una función que ordene un archivo por fecha de creación. WriteToOperationLog() La función es generalmente para depuración, por lo que sus instancias se pueden eliminar para fines de producción.

El secuencia de comandos de 'Fake Op' utiliza el RaiseError() Función para simular un fallo y generar un bloqueo:

WriteToOperationLog(Now());
RaiseError("foo")

Después de una ejecución, podemos comprobar las fuentes:

attachment

Muestra un solo bloqueo en el pedido 1000 realizado por el agente 100, y que todos los pedidos están sin procesar.

Si comentamos el RaiseError() para que se procese correctamente y vuelva a ejecutar 'Verificar archivos para procesar'...

attachment

El bloqueo del pedido #1000 permanece y el primer pedido 1001 (1001.20160322152504) se procesa y se elimina.

Ejecute el proceso dos veces más...

attachment

... y se procesan todos los pedidos 1001, pero el bloqueo permanece en el pedido 1000 y no se procesa ninguno de los archivos del pedido 1000.

Pero ¿qué pasa si se resuelve el problema con el extremo ? Necesitamos una forma de eliminar los bloqueos e intentarlo nuevamente. El enfoque es "hacer que los bloqueos se hagan viejos" y eliminarlos, lo que permite volver a intentar los pedidos automáticamente. Si el extremo sigue sin estar disponible, se restablece el bloqueo.

Este secuencia de comandos se puede ejecutar periódicamente para eliminar bloqueos antiguos. Lee la fuente de "Bloqueos", compara la marca de tiempo con la hora actual y, si tiene más de 5 minutos, elimina el bloqueo.

locks = FileList("<TAG>Sources/Locks</TAG>","locks","*");

SortArray(locks);
locks;
cnt = length(locks); i=0;
While(i<cnt,
order = Split(locks[i],".");
orderno = order[0];
now = (CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));

// set interval for 5 min, 300 sec;

If(Double(now)-Double(order[1])>$deletelocktime,del = true;WriteToOperationLog("order lock file to delete"+orderno);DeleteFiles("<TAG>Sources/Locks</TAG>",orderno+"*"),
 WriteToOperationLog("Locks: "+locks+" below time limit")
 );
i+=1);

Tenga en cuenta que esto utiliza una variable de proyecto, $deletelocktime, que contiene la cantidad de segundos que debe transcurrir hasta que se envejezca la cerradura.

attachment