Jump to content

Wire Queue, basic implementation of simple queues


horst
 Share

Recommended Posts

Wire Queue

Wire Queue is a module that allows easy creation and usage of Queues in ProcessWire.

It is based upon a basic parent module (WireQueue) that should have one or multiple StorageHandler modules installed too. This beta release contains a simple plain text storage module, WireQueueTextfile, and a simple Sqlite3-DB storage module, WireQueueSqlite3.

The base module creates the needed:

  • FIELDS (wire_queue_type, wire_queue_state, wire_queue_storage_options)
  • TEMPLATES (wire-queue-container, wire-queue-storage, wire-queue-tools)
  • PAGES (wire-queues = container for all Queuepages, wire-queue-storages = container for StoragetypePages)
  • ROLE (wire-queue-admin)

Each storage module creates one page under wire-queue-storages.

New Queues can be created in the backend by adding a new page under "Wire Queues". After creation one have to select a Storage type from availables list and publish the page. After that, there are some buttons available to start / pause / and close the queue.

Putting and getting data to and from the queue is done via API calls. First you have to get the page that holds the queue object.

// get and validate the queue handle
if($queue = $pages->get(SELECTOR_TO_DESIRED_PAGE)->wireQueue()) {
    // wrap your data into an array and pass it into the queue
    $success = $queue->addItem($data);
    ...
}
// get and validate the queue handle
if($queue = $pages->get(SELECTOR_TO_DESIRED_PAGE)->wireQueue()) {
    $data = $queue->getItem();
    ...
}

Thats basically all what you want from a queue. Following are a few conveniences:

  • $queue->getPage()->title gives you the title of the queue, ($queue->getPage() returns the page object)
  • $queue->className() shows the StorageType of the queue
  • $queue->getState() and $queue->getStateStr() returns the current state of a queue:
    1 = new / empty
    2 = enabled / running
    3 = paused
    4 = closed / archived
  • $queue->itemCount() gives the total number of all pending items in the queue

Here is code that gives an overview of all Queues in the system:

$container = $pages->get('template=wire-queue-container');
$bodyContent = "<h1>$container->title</h1>";
$bodyContent .= "<p>There are currently {$container->numChildren} Queues defined:</p>";
$bodyContent .= "<ul>";
foreach($container->children() as $p) {
    if(! ($queue = $p->wireQueue())) continue;
    $bodyContent .= "<li>{$queue->getPage()->title}<ul>";
    if(!$queue->ready2use()) {
        $bodyContent .= "<li>{$queue->className}</li>";
        $bodyContent .= "<li>This Storagetype is not ready to use! The System seems not to provide all requirements.</li>";
        $bodyContent .= "</ul></li>";
        continue;
    }
    $bodyContent .= "<li>{$queue->className}</li>";
    $bodyContent .= "<li>{$queue->getStateStr()} ({$queue->getState()})</li>";
    $bodyContent .= "<li>Currently are {$queue->itemCount()} items pending!</li>";
    $bodyContent .= "</ul></li>";
}
$bodyContent .= "</ul>";

Following is a screenshot of the backend

wire-queue-01.gif

The module is available in the modules directory: http://modules.processwire.com/modules/wire-queue/
Or you get it from Github: https://github.com/horst-n/WireQueue.
.
.
.
The Sqlite3 storage handler not only let you push and pull data items to and from it, it also can keep track of the current state of a record. If you use multiple / different workers for pulling and processing the data, you can store an ID for them too.
 
This is how the DB Table looks like:
pw_wirequeue_sqlite3_db-screen.jpg
 
The Wire Queue Sqlite3 storage handler provides the methods

  • addItem($arrayData)      // same as WireQueueTextfile
  • getItem($worker = null)   // same as WireQueueTextfile, (but the textfile storage cannot support $worker!)
  • updateItemState($id, $state, $worker = null)  // this you can use for further processing, if you want

 .
addItem($arrayData)
 
$arrayData is a, maybe associative, array containing all your data for one record. The method stores it under the next free id under data in the Sqlite-DB-file and sets the state to 0. The field worker is empty for new added records.

Following is a basic working example for pushing data records into a queue:

// you have created a queue in PW, the ID of the page is 1420 for example
// here is some skeleton code for an importer that uses this queue

// get and validate the queue handle
if(! ($queue = $pages->get('id=1420')->wireQueue())) exit(); // we could not get the Queuepage

// now start to scan / read data for your imports, wrap each record into an array and put it into the queue
foreach($pages->find(YOURSELECTOR) as $p) {
    $data = array($p->field1, $p->field2);
    $queue->addItem($data);
}

.

.

getItem($worker = null)
 
$worker is an integer that you can define to suite your needs. If you don't use or don't want identify multiple workers, just ommit it. The method pulls one pending record from the queue, changes the state from 0 to 1, and returns an associative array with the keys id and data. array('id' => integer, 'data' => array)
You will need the id if you further want to use the queue to keep track of processing steps.
You must pull your stored $data from $array['data'] and use the id for further storing the state.

.

updateItemState($id, $state, $worker = null)
 
$id identifys the record a worker has processed, for $state you can define and use every integer you like, but not 0 or 1. If you also want to store altered data, and not only different states, you can use updateItem($id, $state, $worker = null, $data = null) instead of updateItemState().

.

Here is a working example with a bit pseudo code how workers can get a pending record to process it further and store back the result of the process:

// you have created a queue in PW, the ID of the page is 1420 for example
// here is some skeleton code for an importer that uses this queue

// get and validate the queue handle
if(! ($queue = $pages->get('id=1420')->wireQueue())) exit(); // we could not get the Queuepage

// we do not use different workers here in that example, so we do not pass an worker id here
$tmp = $queue->getItem();   // get a record from the queue
$record_id = $tmp['id'];    // store the record id
$data = $tmp['data'];       // get the data array

// process the $data ... 
// and get the result of the process, in this example true or false
$result = processMyRecord($data);

// as new records have a state = 0, fetched records have a state = 1, 
// you may define a state of 2 for successful processed records, and higher ones for failed records
$state = true === $result ? 2 : 3;
$queue->updateItemState($record_id, $state);

.

.

getItem4FurtherProcessing($state, $worker = null)
 
The $state you pass to the method is the state you want get the record for. If there is one pending, its state will be set +1 and the id and data is passed back to you in an associative array: array('id' => integer, 'data' => array).

---------

Here is a pseudo code example how (multiple) worker scripts may batch process queue records with the sqlite storage handler

// on the server in this example, everyscript will timeout / die after 180 seconds
// we start a timer
$time = Debug::timer();

// we use different instances of workers that pull and process records from the queue, 
// so additionally to the processings states, we also want to store the worker ids
$worker = 2000;

// now start to process items by pulling one after the other from the queue
while(150 > Debug::timer($time)) {
    $tmp = $queue->getItem($worker);   // get a record from the queue
    if(!$tmp) continue;                // maybe currently are no pending records available
    $record_id = $tmp['id'];           // store the record id
    $data = $tmp['data'];              // get the data array
    $result = processMyRecord($data);  // process the data and get back the result
    $state = true === $result ? 2 : 3; // define an integer for the new state, according to the result of processing the data
    $queue->updateItemState($record_id, $state, $worker);
}

// we are close to the timeout for the script, so finish it with calling it itself to get a fresh run
$session->redirect('./');
  • Like 10
Link to comment
Share on other sites

Just to clarify: the further processing does not have to be done from within Processwire, nor does it to be done by PHP. You can use PW to only collect the data, maybe in the example Sqlite storage handler. To further process the data records, you can use every software that can communicates to a Sqlite DBfile.

If you need different fields in a Sqlite table, just copy the WireQueueSqlite3 module, - change the file- and class name to be no duplicates of the distributed storage handler, - and, at least, change the fielddefinitions for the DB table. But you can change every thing you want.

If you want or need to use other storage types and setup a new WireQueueSomething module, - please feel free to submit / commit it. :)

Link to comment
Share on other sites

hi horst,

thank you for your module. could be that i need it one day, though i have to say it was all new to me and your explanation in the other thread was helpful for me: https://processwire.com/talk/topic/11927-wirequeue-needs-testing/?p=111287 :)

thank you also for the screencasts that helped me a lot understanding what it is all about.

the further processing must not be done from within Processwire...

i think you mean "does not have to be done" as "must not be done" = "darf nicht innerhalb pw gemacht werden", right?

could you please explain why you used TXT/SQLite for data storage? why not use a mysql table from within pw? thank you :)

Link to comment
Share on other sites

@BernhardB: thanks for the language explanation, I have corrected it above in my post! :)

I haven't used MySQL tables, because my main motivation was to have a storage that not belongs to the PW-DB. I think this is useful in regard of Backup / Restore processes, if one have not mass temporary records with it. Filebased storage is portable and easy to use (TXT). SQLite is also portable but has DB support what lets you simply push, pull and update records. For example, think of sending newsletters to 5k recipients, SQLite is very convenient to handle this.

Another usecase what I have had: A website collected infos over a time of two weeks and the (post) processing should be done afterwards. Here we simply have implemented a download link for the customer into the editpage, so that he could store the textfile (CSV style), locally on his computer for further processing with excel.

So, I believe there will be also usecases where a MySQL table is a better storage. For all storage types that will need initial config params, I have implemented a simple Textareafield into the queue page, what can be used to take that settings. (one can use it as key = value pairs each in its own row, like in some other modules, maybe. But it is up to the developer if he like to use that simple approach or if he implements something other). The Textareafield is collapsed by default for admins, and hidden for others. (Whereas the queue pages require minimum the role "wire-queue-admin" or superuser for everything else than viewing the state or count).

So, hopefully someone will make a mysql storage handler and commit it here or at github. :)

(or another DB)

Edited by horst
  • Like 1
Link to comment
Share on other sites

Hey Horst, 

Looks really impressive, but I'm wondering how this module differs from the queuing done in IftRunner combined with Page Actions. I'm comparing both at the moment for a rather large project we are working on. One thing I see it that you have different queues whereas IftRunner only has one queue to store them all. Any other differences that you are aware of?

Link to comment
Share on other sites

Hi Arjen,

I haven't used IftRunner and doesn't know it. So, sorry, I cannot answer anything in regard of comparing. If you already know IftRunner and it does support what you need, I would stick with it. WireQueue is new and not (widely) tested currently. In fact, currently it is only tested by me in a few use cases. Before using it in a huge project, you definetly should do thouroughful testing.

With a quick look through the repository, it seems that queuing in IftRunner is done internally, means, it is mapped to page actions, you do not have to populate it manually, but you also cannot use it externally, without page actions. But not really sure on this, - it was my first read here.

Link to comment
Share on other sites

...  Pity you haven't used it yet. IftRunner with (Page) Actions are really simple as a concept, but quite powerful. Antti wrote about their use case a while ago.

Thanks for the link to Anttis writeup.

------------------

Following is a description for what the SQLite handler was used in the past before I built WireQueue around it.

One use case was for an award winning Cinemaportal where they send up to 2x 13k emails in different newsletters every week. I should embed the WireMailSMTP module there and I suggested that we should use SQLite files for it.

This is running a half year in production now without any problems. At the beginning we have had a testing for 2 weeks where we slightly modified and tweeked some little parts. On that server where the website is hosted were running many many different programs and services, so, that for unknown reasons, sometimes the SQLite DB stopped working unexpected. But SQLite has a sort of transaction feature (Sqlite-Journal files), so that no recipient got lost. On every run we retrieve 500 pending recipients.

The only thing what could happen there was, that, when immediately after successful sending a mail and the SQLite crashed before it could update the record in the dbfile AND in the journal file, this single recipient will be picked up again with the next batch, what results in sending him the newsletter twice. So, this (only) happened a few times per week, (3-10) on 10k mails, = 0,1 percent. And as this may be a bit specific to that explicit server, as I haven't encountered it with other use cases, it seems to be much more secure to use SQLite with its transaction / rollback functionality as MySQL with MyISAM! Don't know about the differences when using MySQL and InnoDB. But we / you can build storage handlers for every DB we / you like. :)

So, what I have done with building WireQueue, is implementing it for easy usage and monitoring in PWs backend. post-1041-0-54734100-1455109217_thumb.pn

The previous used fieldsdefinition of the SQLite table has changed from userid, email, sent, tries to data, state, worker, but everything else of the SQLite part is identical in WireQueue.

Hope this helps a bit.

Edited by horst
added info about use cases for SQLite
  • Like 3
Link to comment
Share on other sites

  • 3 weeks later...

With the UI, you cannot! Archiveing means what it says. :)

If you want to stop using it, with the possibility to reuse it later, please use pause.

I'm not sure on this, as it isn't meant to do this by design, but you may try to cahnge the fieldvalue by API:

// get the desired queue page by its id
$queue = $pages->get("id=1234");

// get the fieldname that stores the state
$fieldname = WireQueue::WIRE_QUEUE_FIELD2;

// change value from 4 (archived) to 3 (paused), or 2 (active)
$queue->$fieldname = 3; // or 2
$queue->save();

BUT: Without warranty. I'm not sure if this has any sideeffects, its not tested!

Link to comment
Share on other sites

$pages->get(SELECTOR_TO_DESIRED_PAGE)->WireQueue() 

You have typos in your first two examples: it should be wireQueue and not WireQueue

ok no problem, but i think there should at least be a warning that this is an irreversible action :)

Link to comment
Share on other sites

$pages->get(SELECTOR_TO_DESIRED_PAGE)->WireQueue() 

You have typos in your first two examples: it should be wireQueue and not WireQueue

ok no problem, but i think there should at least be a warning that this is an irreversible action :)

Thanks for pointing to the typos. I corrected it.

  • Like 1
Link to comment
Share on other sites

hi horst,

i need a little more help, sorry. i don't really get the concept. for example when i have a queue to resize images, how would i setup this? when i have a queue "resizeimages" i could add all resize requests to this queue and then run the resize for every item. so far so good, but what if i have different users with images from different pages? user A adds 100 images to queue "resizeimages" and user B adds 100 images to the same queue. the queue now has 200 images in it if that happens at the same time. how does the queue know which images belong to wich user?

the "worker" is empty for new entries so i can't use this. or should i do an additem() and right after that an updateitemstate(...worker...)?

or am i completely on the wrong track with what i want to do?

thank you

edit: ok i don't think that would work because there would still be all items in the queue and there's no kind of "selector" to get only the items from one worker, right? so would i have to create a queue for every page where i want to resize images? i didn't try it so far, but i think that sould be possible via standard api creating new pages and setting the appropriate values?

Link to comment
Share on other sites

Sorry, I cannot follow. A queue only stores the data you put into it, nothing more. If you need references to whatever, put it into your data. That's the only way to have your needed data together. But this has nothing to do with the queue. Once again, the queue is nothing else than a storage, a DB table, with exactly 1 field for your data. Therefore, (I think you do this already), you have to wrap your data into an array. If you need more data-information as you currently have in it, just put it into the data array too. :)

Link to comment
Share on other sites

hmmm, ok i see you really didn't get me :) i was writing a more detailed answer but while i was writing i found a better solution for my usecase. i think i wanted to use the queue in a way it is not built for (but not the way you understood me, but thats no factor any more) ;)

Link to comment
Share on other sites

hi horst, me again ;)

just came across https://processwire.com/api/variables/log/ and as i read the docs i thought it is quite similar to txt-queues. i think one could easily create simple queues this way and would be interested in your opinion.

creating a new queue would be as simple as $log->save('examplequeue', 'mydata');

fetching lines from the end of the file is also easy (http://kongondo.github.io/ProcessWireAPIGen/master/source-class-WireLog.html#196-229) and could maybe modified by ryan with an option go get lines from the beginning of the file and not only the end. this functionality is already built into the filelog class http://kongondo.github.io/ProcessWireAPIGen/master/source-class-FileLog.html#164-216

only thing missing would be a method to delete rows from the beginning or end of the file, but that would also be really easy.

edit: don't know about the "workers", didn't get the concept...

Link to comment
Share on other sites

edit: don't know about the "workers", didn't get the concept...

Workers are separate processes, which run alongside your webserver stack on the host system. They can process things they get out of the queue without being reliant on the Request->Process->Response cycle of the webserver.

E.g. You want to have a way for users to request a big report on your website. As a big report sounds like, it take some time to generate the pdf for it (say ~1 Min). You wouldn't want your user to sit around and watch the nice browser spinner, waiting for the next page to pop up. You can now write the request in a queue and instantly return a message stating that the report will be generated and emailed to the user when ready.

To generate that report you may know the option of using a cron to trigger website logic, without an actual user requesting a website, but that's also flawed with things like timeouts and such, also it's only starting in intervals. That's where workers come in.

These are php (and other script) processes, which often run as services like e.g. your apache server does. These scripts contain some way to not exit early (e.g. infinite while loop) – which is the biggest difference to website request – and periodically check their queue if there's work to do. If your report request is in it, they generate the pdf, email it and if done delete the queue item. If there are other reports to do they'll move on to them, otherwise they sleep until another queue item is available. 

  • Like 5
Link to comment
Share on other sites

  • 3 months later...

@horst

Thanks for this module, I've been hammering it pretty hard for the past month and have just created a pull-request over on github for your consideration. I've been having a few reliability problems with the textfile driver's itemCount() method. I also wanted a fast way to purge a queue without having to call getItem() repeatedly.

The itemCount() method sometimes returns an incorrect count (anywhere from 0 to the true number of items in the queue). This happens even when there are multiple thousands of jobs queued up and it makes it unreliable as part of a conditional statement. The explanation is given in the commit that fixes the issue as part of the pull request.

Thanks for looking!

  • Like 6
Link to comment
Share on other sites

Many thanks @netcarver.

I must admit that I have written the Textfile-driver only for this module to have a basic driver. Other as the SQLite-driver, it has not have much testing. I'm glad to hear that you have done this and you contribute to the module. I have you added as colaborator to it, so that you can change / add other improvements / enhancements as you like. ^-^;)

  • Like 3
Link to comment
Share on other sites

  • 3 weeks later...

Thanks for adding me as a colaborator, @horst.

I've pushed a new dev branch to the repository that contains,

  • Bugfixes to the textfile driver's itemCount() method and small cleanups for edge cases.
  • New methods isEmpty(), purgeItems() and getItems() added to the base storage class and fully implemented in the textfile driver. The SQLite driver currently has stub implementations of these that need filling out.
  • The addition of a Redis driver for super-fast in-memory queues (implements the full base class interface.)

If anyone wants to give this a try before horst gets around to merging into the master branch, feel free to grab the zip (repository here.)

  • Like 5
Link to comment
Share on other sites

  • 4 weeks later...

Horst and I would like to announce the release of v1.0.0 of the WireQueue suite of modules.

There are now drivers for textfiles, redis and SQLite and the interface has been extended to include the isEmpty(), purgeItems() and getItems() methods.

Enjoy!

  • Like 12
Link to comment
Share on other sites

  • 2 months later...

I think I may have found a small problem with WireQueueTextfile, although it could be due to this PHP bug?

The array is being serialized in the addItem() function, but in my case an element's value is sometimes a string containing newline characters "\r\n". I believe WireQueueTextfile saves each array to one line, but these characters are causing the serialized string to be split onto multiple lines, which I'm assuming causes problems for WireQueue.

Would it make sense to somehow sanitize these values (outside of WireQueue) before adding them to the queue?

Link to comment
Share on other sites

Could you try replacing addItem() and getItem() with the following...

    public function addItem($arrayData) {
        if(!$this->_addItem()) return false;
        if(2 != $this->getState()) return false;
        if(!$fp = @fopen($this->getFilename(), 'ab')) return false;
        if(flock($fp, LOCK_EX)) {
            $arrayData = serialize($arrayData);
            $data = str_replace("\r\n", '\r\n', $arrayData) . "\n";
            $res = fwrite($fp, $data);
            fflush($fp);
            flock($fp, LOCK_UN);
            fclose($fp);
            return $res == strlen($data);
        }
        fclose($fp);
        return false;
    }

    public function getItem($worker = null) {
        if(!$this->_getItem()) return false;
        if(2 != $this->getState()) return false;
        if(!$fp = @fopen($this->getFilename(), 'rb+')) return false;
        if(flock($fp, LOCK_EX)) {
            $line = trim(fgets($fp));
            if(!$line) {
                flock($fp, LOCK_UN);
                fclose($fp);
                if(0 == $this->itemCount()) return null;
                return false;
            }
            // we have the first entry, now write all following data into a buffer
            $fpTmp = @fopen('php://temp/maxmemory:' . intval(1024 * 1024 * 5), 'rb+');
            while(!feof($fp)) fwrite($fpTmp, fread($fp, 4096));
            fseek($fp, 0, SEEK_SET);
            ftruncate($fp, 0);
            fseek($fpTmp, 0, SEEK_SET);
            // write back buffer into file
            while(!feof($fpTmp)) fwrite($fp, fread($fpTmp, 4096));
            fclose($fpTmp);
            fflush($fp);
            flock($fp, LOCK_UN);
            fclose($fp);
        }
        return unserialize(str_replace('\r\n', "\r\n", $line));
    }

Untested - YMMV!

  • Like 2
Link to comment
Share on other sites

No luck with that unfortunately, still lots of fragmented lines. Do the instances of '\r\n'  need to be wrapped in double quotes? I can't work out the logic here!

I should also note that these strings can include \r\n but are also sometimes littered with instances of \r or \n individually.

Link to comment
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
 Share

×
×
  • Create New...