Zum Inhalt springen

Eingehende Daten zur späteren Verarbeitung im Jitterbit Design Studio beibehalten

Anwendungsfall

Ein häufiger Anwendungsfall bei Echtzeit-/ereignisgesteuerten Szenarien ist die Frage, was zu tun ist, wenn das Endpoint nicht verfügbar ist. Ein Beispiel ist das Szenario, in dem eine ausgehende Salesforce Nachricht von einer Jitterbit API empfangen und mit einer Datenbank verbunden wird. Wenn die Datenbank offline geht, gibt der Jitterbit Operation, der eine Verbindung zur Datenbank herstellt, einen Fehler aus und aktiviert den Operation „Bei Fehler“. Wenn nichts unternommen wird, um diese Art von Fehler zu behandeln, besteht die Gefahr eines Datenverlusts.

Eine Möglichkeit, dieses Szenario zu handhaben, besteht darin, die eingehende Payload beizubehalten, bis sie erfolgreich verarbeitet wurde. Wenn beispielsweise der Endpoint offline ist, werden Nachrichten beibehalten, bis die Datenbank wieder online ist. Die Nachrichten werden automatisch verarbeitet, sobald die Datenbank wieder online ist.

Beispiel

Im folgenden Beispiel wollte der Kunde nicht nur Nachrichtenpersistenz, sondern auch die Integrität der Auftragsänderungen garantieren. Das heißt, wenn auf Auftragsnummer 1000 zwei Änderungen an demselben Auftrag folgten, mussten alle Änderungen erfolgreich in der richtigen Reihenfolge verarbeitet werden. Dies ähnelt dem Prinzip der „Atomizität“ bei Datenbanktransaktionen. In diesem Beispiel wird beim Erstellen eines Auftrags für jeden Auftrag eine separate Datei erstellt. Jede Änderung der Verarbeitung oder des Status des Auftrags erstellt ebenfalls eine separate Datei.

Da der Kunde mehr als einen privaten Agenten hatte, mussten wir auch sicherstellen, dass jeder Agent über die Verarbeitung durch die anderen Agenten informiert war.

Die Lösung bestand darin, Nachrichtensperren zu verwenden, die in Dateien auf einem Dateisystem gespeichert waren, auf das alle Agenten zugreifen konnten. Beispielsweise befindet sich der private Agent 100 auf Host A und der private Agent 200 auf Host B. Das Dateisystem, das die Sperrdateien enthält, befindet sich auf Host C. Sowohl Host A als auch Host B müssen Zugriff auf das Dateisystem auf Host C haben.

Anhang

Dies ist notwendig, damit jeder Agent über Sperren informiert ist, die von allen anderen Agenten auf Bestellungen gesetzt wurden.

Die Methode, mit der ein privater Agent eindeutig identifiziert wird, besteht darin, die .conf-Datei zu bearbeiten und eine globale Variable zuzuweisen (siehe Verwenden von Globalen Variablen) auf einen festen Wert. Projektvariablen konnten nicht verwendet werden, da ihr Gültigkeitsbereich sich auf alle privaten Agenten erstreckt.

Anhang

In diesem Fall $servername Ist 100. Der $servername für einen anderen privaten Agenten wäre 200, und so weiter.

Nehmen wir zunächst an, dass die eingehenden Nachrichten in eine temporäre Dateiquelle namens „Dateien“ geschrieben werden. Nehmen wir außerdem an, dass es eine Quelle namens „Sperren“ gibt, bei der es sich um ein Verzeichnis im gemeinsam genutzten Dateisystem handelt.

Wir beginnen mit der Generierung einiger Testdaten:

/*
Will generate test order files (like 1000, 1001, 1002) and will generate additional orders (1001, 1001, 1001) randomly. Controlled by 'cnt'. so cnt = 10 will
generate 10 base orders plus some number of duplicates
*/

DeleteFiles("<TAG>Sources/Files</TAG>","*");
DeleteFiles("<TAG>Sources/Locks</TAG>","*");
cnt = 3; i = 0;seed=1000;
While(i<cnt,
 rcnt=Random(1,3);ri=0;
 While(ri<rcnt,
 WriteFile("<TAG>Targets/Files</TAG>","foobar",(seed)+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));
 sleep(1);
 ri+=1
 );
i+=1; seed +=1
);

FlushAllFiles("<TAG>Targets/Files</TAG>");

Hinweis

Verwendung FlushAllFiles() nach WriteFile() wird als bewährte Methode empfohlen.

Wie im Kommentar erwähnt, besteht der Zweck darin, einige eingehende Bestellungen (in diesem Fall 3) mit Zeitstempeln zu generieren.

Bestellliste:

1000.20160322152502

1000.20160322152503

1001.20160322152504

1001.20160322152505

1001.20160322152506

1002.20160322152507

1002.20160322152508

1002.20160322152509

Wir haben 2 Einträge für 1000, 3 Einträge für 1001 und 3 Einträge für 1002. Dies emuliert das Szenario, in dem Bestellung 1000 zusammen mit 1 Änderung an diese Bestellung und so weiter für die anderen beiden Bestellungen.

Anhang

In dieser Kette gibt es zwei Operationen. Die erste Operation ist die Vorverarbeitung der Bestellung und die zweite ist ein Platzhalter für die eigentliche Datenaktualisierung des Ziels, beispielsweise eine Datenbank- oder Webdienstaktualisierung.

Das im ersten Operation enthaltene Script ist unten aufgeführt. Es prüft die Quelle „Dateien“ auf Bestellungen und prüft dann die Quelle „Sperren“ auf etwaige Bestellsperren. Wenn Sperren gefunden werden, wird die Verarbeitung dieser Bestellung sowie aller Aktualisierungen dieser Bestellung übersprungen. Wenn keine Sperren gefunden werden, wird eine Sperrdatei erstellt und der Operation aufgerufen, der die Bestellung verarbeitet. Bei Erfolg werden sowohl die Datei als auch die Sperre gelöscht.

/*
1. Get list of order files with file naming convention of <filename>.<timestamp> where timestamp is yyyymmddhhmmss.
For example, 1001.20160220171811. This is done to aid in sorting the array.
2. Loop through the list of orders and check for locks. Locks have a file naming convention of <orderno>.<timestamp>.<agentnumber>
The agent number is set in the .conf file of the private agent. Assign a unique number to each private agent in the cluster.
The agent number will be useful to determine which agent set the lock. If the lock was set due to the agent failing during processing,
the user can elect to remove those locks as they were not due to the endpoint being down, or operation failure.
3. If a lock file is found for an order, then keep looping. If no lock is found for a file, the loop is stopped, go on to Step 2
4. If unlocked file found, lock file created.
5. Pass file name to synchronous operation and check operation status. If successful, then delete file and delete lock.
Optional: set up global 'datastatus' that will indicate pass/fail of insert into target and change
next statement to use it: If($datastatus || status ...
Configure the processing operation to have a short timeout (<5 min)
**/

unlock = true;
arr = FileList("<TAG>Sources/Files</TAG>","files","*");//reads list of files into array
  SortArray(arr);//sorts
WriteToOperationLog("List: "+arr);
cnt=Length(arr);i=0;//loops


// STEP 1: Find a file with no locks


While(i<cnt,

$filename=arr[i];//gets first file
 orderno=Split($filename,".");//gets order no
 WriteToOperationLog("File: "+$filename+" orderno: "+orderno[0]);
 locks = FileList("<TAG>Sources/Locks</TAG>","locks",orderno[0]+"*"); //checks of order no exists in list of locks
 nolock = If(Length(locks)>0,false,true); // if false then found lock file,skip, else can use this file
 WriteToOperationLog("Locks: "+locks+" nolock: "+nolock);
 If(nolock,
 i=cnt;

WriteToOperationLog("Found unlocked order, stop loop");

 //get file with this order no and sets incrementer to counter, stopping loop

);

 i++;

);


// STEP 2: If unlocked file found, process.


If(nolock,

 WriteToOperationLog("Selected File: "+$filename);
 lockfilename = orderno[0]+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS")+"."+$servername;
 WriteFile("<TAG>Targets/Locks</TAG>",$filename,lockfilename);// create lock file.
 FlushAllFiles("<TAG>Targets/Locks</TAG>");
 WriteToOperationLog("Lockfile: "+lockfilename);

 status = RunOperation("<TAG>Operations/3. Fake Op</TAG>");//pass filename to Operation that will process data. status is success/fail of the operation.

 // deletes order file if status is true, else file is not deleted. Also sets unlock to false, if the operation not successful, then lock is not deleted

If(status,

 DeleteFile("<TAG>Sources/Files</TAG>",$filename);
 WriteToOperationLog("delete file "+ $filename),
 unlock = false

 );

// unlockme = Split($filename,".");

// gets order number:

If(unlock, DeleteFiles("<TAG>Sources/Locks</TAG>",orderno[0]+"*");

 WriteToOperationLog("delete lock for order: "+orderno[0]+"*"));
 // deletes lock file after all the files for that order are processed,
 // but leaves lock if any of the RunOps are unsuccessful
)

Es gibt eine Reihe von Schlüsselfunktionen, die dies ermöglichen. Da die FileList() Funktion gibt ein Array aus, wir können die SortArray() Funktion, die uns die älteste Datei basierend auf dem Zeitstempel im Dateinamen liefert. Wir haben keine Funktion, die eine Datei nach dem Erstellungsdatum sortiert. Die WriteToOperationLog() Die Funktion dient im Allgemeinen zum Debuggen, sodass ihre Instanzen zu Produktionszwecken entfernt werden können.

Das Script „Fake Op“ verwendet die RaiseError() Funktion zum Simulieren eines Fehlers und Erzeugen einer Sperre:

WriteToOperationLog(Now());
RaiseError("foo")

Nach einem Durchlauf können wir die Quellen überprüfen:

Anhang

Es zeigt eine einzelne Sperre für Auftrag 1000, der von Agent 100 erteilt wurde, und dass alle Aufträge unbearbeitet sind.

Wenn wir das auskommentieren RaiseError() damit es erfolgreich verarbeitet wird, und führen Sie „Zu verarbeitende Dateien prüfen“ erneut aus ...

Anhang

Die Sperre für Auftrag #1000 bleibt bestehen und der erste Auftrag 1001 (1001.20160322152504) wird verarbeitet und gelöscht.

Führen Sie den Vorgang noch zwei weitere Male aus ...

Anhang

... und alle Bestellungen 1001 werden verarbeitet, aber die Sperre für Bestellung 1000 bleibt bestehen und keine der Dateien der Bestellung 1000 wird verarbeitet.

Aber was passiert, wenn das Problem mit dem Endpoint behoben ist? Wir müssen die Sperren irgendwie entfernen und es erneut versuchen. Der Ansatz besteht darin, die Sperren zu „altern“ und zu entfernen, sodass die Bestellungen automatisch erneut versucht werden können. Wenn der Endpoint immer noch nicht verfügbar ist, wird die Sperre zurückgesetzt.

Dieses Script kann regelmäßig ausgeführt werden, um alte Sperren zu löschen. Es liest die Quelle „Sperren“, vergleicht den Zeitstempel mit der aktuellen Zeit und löscht die Sperre, wenn sie älter als 5 Minuten ist.

locks = FileList("<TAG>Sources/Locks</TAG>","locks","*");

SortArray(locks);
locks;
cnt = length(locks); i=0;
While(i<cnt,
order = Split(locks[i],".");
orderno = order[0];
now = (CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));

// set interval for 5 min, 300 sec;

If(Double(now)-Double(order[1])>$deletelocktime,del = true;WriteToOperationLog("order lock file to delete"+orderno);DeleteFiles("<TAG>Sources/Locks</TAG>",orderno+"*"),
 WriteToOperationLog("Locks: "+locks+" below time limit")
 );
i+=1);

Beachten Sie, dass hier eine Projektvariable verwendet wird $deletelocktime, das die Anzahl der Sekunden enthält, um die Sperre zu altern.

Anhang