Skip to content
Snippets Groups Projects
Commit e235d496 authored by jsclose's avatar jsclose
Browse files

merged queue refactor crawler

parents 7f7884f9 daa3194e
No related branches found
No related tags found
1 merge request!8Origin/constraint solver
Showing with 288 additions and 99 deletions
...@@ -7,6 +7,27 @@ add_executable(QueueTest ...@@ -7,6 +7,27 @@ add_executable(QueueTest
shared/ProducerConsumerQueue.h shared/ProducerConsumerQueue.h
shared/ProducerConsumerQueue_test.cpp) shared/ProducerConsumerQueue_test.cpp)
add_executable(TryPopTest
shared/TryPopTest.cpp
shared/ProducerConsumerQueue.h
shared/ThreadClass.h
shared/url.h
crawler/crawler.cpp
crawler/UrlFrontier.cpp
crawler/Readers/StreamReader.h
crawler/Readers/HttpReader.cpp
crawler/Readers/HttpsReader.cpp
crawler/Readers/LocalReader.cpp
crawler/spider.cpp
util/util.cpp
shared/Document.cpp
parser/Parser.cpp
util/Stemmer.cpp
util/Tokenizer.cpp
util/stringProcessing.cpp
indexer/Indexer.cpp
)
add_executable(crawler-parser-Test add_executable(crawler-parser-Test
main.cpp main.cpp
shared/ProducerConsumerQueue.h shared/ProducerConsumerQueue.h
...@@ -215,6 +236,8 @@ add_executable(MasterReader-tests ...@@ -215,6 +236,8 @@ add_executable(MasterReader-tests
find_package(OpenSSL REQUIRED) find_package(OpenSSL REQUIRED)
target_link_libraries(TryPopTest OpenSSL::SSL)
target_link_libraries(ParserTest OpenSSL::SSL) target_link_libraries(ParserTest OpenSSL::SSL)
target_link_libraries(isolated-integration OpenSSL::SSL pthread) target_link_libraries(isolated-integration OpenSSL::SSL pthread)
......
No preview for this file type
#include "HttpReader.h" #include "HttpReader.h"
#include <sys/time.h>
std::runtime_error HTTPConnectionError( "Error connecting HTTP to url" ); std::runtime_error HTTPConnectionError( "Error connecting HTTP to url" );
...@@ -55,6 +56,13 @@ bool HttpReader::request ( ) ...@@ -55,6 +56,13 @@ bool HttpReader::request ( )
send( sock, getMessage.c_str( ), getMessage.length( ), 0 ); send( sock, getMessage.c_str( ), getMessage.length( ), 0 );
bool isSuccess = checkStatus( ); bool isSuccess = checkStatus( );
//set timeout option
struct timeval tv;
tv.tv_sec = 10;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
return isSuccess; return isSuccess;
} }
......
...@@ -40,6 +40,12 @@ bool HttpsReader::request ( ) ...@@ -40,6 +40,12 @@ bool HttpsReader::request ( )
assert( connectResult == 0 ); assert( connectResult == 0 );
// set timeout val before binding the ssl to the sock
struct timeval tv;
tv.tv_sec = 10;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
// Build an SSL layer and set it to read/write // Build an SSL layer and set it to read/write
// to the socket we've connected. // to the socket we've connected.
......
...@@ -16,21 +16,21 @@ ...@@ -16,21 +16,21 @@
// then adds both to the url map and the host map // then adds both to the url map and the host map
bool UrlFrontier::checkUrl( ParsedUrl *url ) bool UrlFrontier::checkUrl( ParsedUrl url )
{ {
if( Blacklist.find( url->getHost( ) ) != Blacklist.end( ) ) if( Blacklist.find( url.getHost( ) ) != Blacklist.end( ) )
return false; return false;
//Looks to see if the complete url already exists, if so return //Looks to see if the complete url already exists, if so return
if ( this->duplicateUrlMap->find( url->getCompleteUrl( )) != this->duplicateUrlMap->end( )) if ( this->duplicateUrlMap->find( url.getCompleteUrl( )) != this->duplicateUrlMap->end( ))
{ {
//update the anchor text //update the anchor text
if ( !url->getAnchorText( ).empty( ) || url->getAnchorText( ) != "") if ( !url.getAnchorText( ).empty( ) || url.getAnchorText( ) != "")
{ {
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
(*duplicateUrlMap)[ url->getCompleteUrl( ) ][ url->getAnchorText( ) ]++; (*duplicateUrlMap)[ url.getCompleteUrl( ) ][ url.getAnchorText( ) ]++;
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
} }
//add the new //add the new
...@@ -44,26 +44,26 @@ bool UrlFrontier::checkUrl( ParsedUrl *url ) ...@@ -44,26 +44,26 @@ bool UrlFrontier::checkUrl( ParsedUrl *url )
time( &now ); time( &now );
double difference = 0; double difference = 0;
//Has the domain been seen? //Has the domain been seen?
if ( this->domainMap->find( url->getHost( )) != this->domainMap->end( )) if ( this->domainMap->find( url.getHost( )) != this->domainMap->end( ))
{ {
//get the last time it was seen and find the time difference //get the last time it was seen and find the time difference
time_t lastSeen = this->domainMap->at( url->getHost( )); time_t lastSeen = this->domainMap->at( url.getHost( ));
difference = difftime( now, lastSeen ); difference = difftime( now, lastSeen );
if ( difference == 0 ) if ( difference == 0 )
difference = .01; difference = .01;
else else
difference = difference / 100; difference = difference / 100;
url->updateScore( difference ); url.updateScore( difference );
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
(*domainMap)[ url->getHost( ) ] = now; (*domainMap)[ url.getHost( ) ] = now;
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
} }
else else
{ {
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
this->domainMap->insert( std::make_pair( url->getHost( ), now )); //otherwise add to the map the current time this->domainMap->insert( std::make_pair( url.getHost( ), now )); //otherwise add to the map the current time
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
...@@ -72,7 +72,7 @@ bool UrlFrontier::checkUrl( ParsedUrl *url ) ...@@ -72,7 +72,7 @@ bool UrlFrontier::checkUrl( ParsedUrl *url )
//add url to the duplicate url map //add url to the duplicate url map
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
(*duplicateUrlMap)[ url->getCompleteUrl( ) ][ url->getAnchorText( ) ] = 1; (*duplicateUrlMap)[ url.getCompleteUrl( ) ][ url.getAnchorText( ) ] = 1;
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
return true; return true;
...@@ -80,10 +80,10 @@ bool UrlFrontier::checkUrl( ParsedUrl *url ) ...@@ -80,10 +80,10 @@ bool UrlFrontier::checkUrl( ParsedUrl *url )
} }
void UrlFrontier::Push( ParsedUrl *url ) void UrlFrontier::Push( ParsedUrl url )
{ {
//if the url has been seen? if so, dont add it //if the url has been seen? if so, dont add it
if ( url->isValid ) if ( url.isValid )
{ {
if ( checkUrl( url )) if ( checkUrl( url ))
...@@ -104,10 +104,37 @@ void UrlFrontier::Push( ParsedUrl *url ) ...@@ -104,10 +104,37 @@ void UrlFrontier::Push( ParsedUrl *url )
} }
} }
bool UrlFrontier::try_pop( ParsedUrl& result )
ParsedUrl *UrlFrontier::Pop()
{ {
gettimeofday(&now, NULL);
timeToWait.tv_sec = now.tv_sec + 5;
timeToWait.tv_nsec = (now.tv_usec+1000UL*100)*1000UL;
int retval;
pthread_mutex_lock(&m);
while(queue.empty()){
retval = pthread_cond_timedwait(&consumer_cv, &m, &timeToWait);
if(retval != 0){
fprintf(stderr, "pthread_cond_timedwait %s\n",
strerror(retval));
pthread_mutex_unlock(&m);
return false;
}
}
result = std::move(queue.top());
queue.pop();
pthread_mutex_unlock(&m);
return true;
}
ParsedUrl UrlFrontier::Pop()
{
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
...@@ -116,7 +143,7 @@ ParsedUrl *UrlFrontier::Pop() ...@@ -116,7 +143,7 @@ ParsedUrl *UrlFrontier::Pop()
pthread_cond_wait( &consumer_cv, &m ); pthread_cond_wait( &consumer_cv, &m );
} }
ParsedUrl *front = queue.top( ); ParsedUrl front = queue.top( );
queue.pop( ); queue.pop( );
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
...@@ -167,12 +194,11 @@ void UrlFrontier::writeDataToDisk() ...@@ -167,12 +194,11 @@ void UrlFrontier::writeDataToDisk()
while ( !queue.empty( )) while ( !queue.empty( ))
{ {
ParsedUrl *url = queue.top( ); ParsedUrl url = queue.top( );
queue.pop( ); queue.pop( );
string url_disk = url->getCompleteUrl() + "\n"; string url_disk = url.getCompleteUrl() + "\n";
write( file, url_disk.c_str( ), strlen( url_disk.c_str( ) )); write( file, url_disk.c_str( ), strlen( url_disk.c_str( ) ));
url = 0;
delete url;
} }
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
...@@ -196,7 +222,7 @@ void UrlFrontier::readDataFromDisk( ) ...@@ -196,7 +222,7 @@ void UrlFrontier::readDataFromDisk( )
if ( *files == '\n' ) if ( *files == '\n' )
{ {
ParsedUrl *url = new ParsedUrl( testFile ); ParsedUrl url(testFile);
cout << "Pushing: " << testFile << " to queue\n"; cout << "Pushing: " << testFile << " to queue\n";
Push( url ); Push( url );
testFile = ""; testFile = "";
...@@ -212,8 +238,8 @@ void UrlFrontier::readDataFromDisk( ) ...@@ -212,8 +238,8 @@ void UrlFrontier::readDataFromDisk( )
void UrlFrontier::readBlackList() void UrlFrontier::readBlackList()
{ {
string blackListFile = "/crawler/blacklist.txt" string blackListFile = "/crawler/blacklist.txt";
char *hosts = util::getFileMap( fileName ); char *hosts = util::getFileMap( blackListFile );
string toBlackList; string toBlackList;
while ( *hosts ) while ( *hosts )
...@@ -221,7 +247,7 @@ void UrlFrontier::readBlackList() ...@@ -221,7 +247,7 @@ void UrlFrontier::readBlackList()
if ( *hosts == '\n' ) if ( *hosts == '\n' )
{ {
Blacklist.insert(toBlackList) Blacklist.insert(toBlackList);
toBlackList = ""; toBlackList = "";
} }
else else
...@@ -231,7 +257,7 @@ void UrlFrontier::readBlackList() ...@@ -231,7 +257,7 @@ void UrlFrontier::readBlackList()
} }
} }
}
......
...@@ -17,15 +17,15 @@ typedef unordered_map<string , anchorToCountMap> urlMap; ...@@ -17,15 +17,15 @@ typedef unordered_map<string , anchorToCountMap> urlMap;
class ComparisonClass { class ComparisonClass {
public: public:
bool operator() (ParsedUrl *lhs , ParsedUrl *rhs) { bool operator() (ParsedUrl lhs , ParsedUrl rhs) {
//comparison code here //comparison code here
return lhs->getScore() > rhs->getScore(); return lhs.getScore() > rhs.getScore();
} }
}; };
class UrlFrontier class UrlFrontier : public ProducerConsumerQueue<ParsedUrl>
{ {
public: public:
...@@ -33,16 +33,18 @@ class UrlFrontier ...@@ -33,16 +33,18 @@ class UrlFrontier
readBlackList(); readBlackList();
}; };
void Push ( ParsedUrl * url ); void Push ( ParsedUrl url ) override;
bool checkUrl(ParsedUrl * url); bool try_pop( ParsedUrl& result ) override;
ParsedUrl Pop ( ) override ;
size_t Size() override;
bool checkUrl(ParsedUrl url);
void readBlackList( ); void readBlackList( );
void printAnchorTable( ); void printAnchorTable( );
set < string > Blacklist ; set < string > Blacklist ;
ParsedUrl * Pop ( );
size_t Size(); std::priority_queue<ParsedUrl , std::vector<ParsedUrl>, ComparisonClass> queue;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t consumer_cv = PTHREAD_COND_INITIALIZER;
std::priority_queue<ParsedUrl *, std::vector<ParsedUrl*>, ComparisonClass> queue;
//Writes the duplicate url map and priorty queue from disk //Writes the duplicate url map and priorty queue from disk
void writeDataToDisk( ); void writeDataToDisk( );
......
...@@ -13,11 +13,11 @@ using DocIndex = const unordered_map< string, vector< unsigned long > >; ...@@ -13,11 +13,11 @@ using DocIndex = const unordered_map< string, vector< unsigned long > >;
*/ */
void Crawler::SpawnSpiders ( size_t num_spiders , atomic_bool * alive) void Crawler::SpawnSpiders ( size_t num_spiders , atomic_bool * alive, int numdocs)
{ {
for ( size_t i = 0; i < num_spiders; i++ ) for ( size_t i = 0; i < num_spiders; i++ )
{ {
Spider *temp = new Spider( this->mode, this->urlFrontier, this->IndexerQueue , alive); Spider *temp = new Spider( this->mode, this->urlFrontier, this->IndexerQueue , alive, numdocs);
temp->StartThread( ); temp->StartThread( );
this->spiders.push_back( temp ); this->spiders.push_back( temp );
} }
......
...@@ -30,7 +30,7 @@ public: ...@@ -30,7 +30,7 @@ public:
{ }; { };
//spawns a number of works //spawns a number of works
void SpawnSpiders ( size_t num_spiders, atomic_bool* alive ); void SpawnSpiders ( size_t num_spiders, atomic_bool* alive, int numdocs );
......
...@@ -84,7 +84,7 @@ size_t Spider::hash ( const char *s ) ...@@ -84,7 +84,7 @@ size_t Spider::hash ( const char *s )
* *
*/ */
ParsedUrl * Spider::getUrl ( ) ParsedUrl Spider::getUrl ( )
{ {
return urlFrontier->Pop( ); return urlFrontier->Pop( );
} }
...@@ -105,45 +105,37 @@ void Spider::run ( ) ...@@ -105,45 +105,37 @@ void Spider::run ( )
{ {
std::cout << "Spider is crawling" << endl; std::cout << "Spider is crawling" << endl;
int cond = 0; int cond = 0;
ParsedUrl currentUrl;
while (*alive && cond < 100)
{
if(cond % 25 == 0)
{
cout << "Spider has crawled" << to_string(cond) << endl;
}
while (*alive && cond < docs_to_crawl)
if(urlFrontier->Size() > 0) {
{ bool not_empty = urlFrontier->try_pop(currentUrl);
ParsedUrl * currentUrl = getUrl( ); if(not_empty) {
size_t docID = hash( currentUrl->getCompleteUrl().c_str() ); size_t docID = hash(currentUrl.getCompleteUrl().c_str());
if ( shouldURLbeCrawled( docID ) ) if (shouldURLbeCrawled(docID)) {
{ StreamReader *reader = SR_factory(&currentUrl, this->mode);
StreamReader *reader = SR_factory( currentUrl, this->mode ); if (reader) {
if(reader) bool success = reader->request();
{ if (success) {
bool success = reader->request( ); cout << "Parsing " << currentUrl.getCompleteUrl();
if ( success ) DocIndex *dict = parser.execute(reader);
{ IndexerQueue->Push(dict);
cout << "Parsing " << currentUrl->getCompleteUrl();
DocIndex *dict = parser.execute( reader ); reader->closeReader();
IndexerQueue->Push( dict ); //delete dict;
reader->closeReader( ); cond++;
//delete dict;
cond++;
}
} }
}
delete reader; delete reader;
}
} }
}
} }
cout << "Spider has finished running " << endl; cout << "Spider has finished running " << endl;
return; return;
......
...@@ -22,22 +22,24 @@ class Spider : public ThreadClass ...@@ -22,22 +22,24 @@ class Spider : public ThreadClass
public: public:
Spider ( string mode_in, Spider ( string mode_in,
UrlFrontier *url_q_in, UrlFrontier *url_q_in,
ProducerConsumerQueue< DocIndex * > *doc_index_queue_in, ProducerConsumerQueue< DocIndex * > *doc_index_queue_in,
atomic_bool * bool_in atomic_bool * bool_in,
int numdocs
) )
: mode( mode_in ), : mode( mode_in ),
urlFrontier( url_q_in ), urlFrontier( url_q_in ),
parser( url_q_in ), parser( url_q_in ),
IndexerQueue( doc_index_queue_in ), IndexerQueue( doc_index_queue_in ),
alive( bool_in ) alive( bool_in ),
docs_to_crawl(numdocs)
{ {
}; };
//Takes a url off of the url frontier //Takes a url off of the url frontier
ParsedUrl * getUrl ( ); ParsedUrl getUrl ( );
virtual void run ( ); virtual void run ( );
...@@ -61,5 +63,6 @@ private: ...@@ -61,5 +63,6 @@ private:
string mode; string mode;
Parser parser; Parser parser;
atomic_bool* alive; atomic_bool* alive;
int docs_to_crawl;
}; };
\ No newline at end of file
...@@ -45,8 +45,8 @@ int main ( int argc, char *argv[] ) ...@@ -45,8 +45,8 @@ int main ( int argc, char *argv[] )
//string url2 = "https:"; //string url2 = "https:";
//string bad_url = "http-equiv=\"X-UA-Compatible\" content=\"IE=edge,chrome=1\" />"; //string bad_url = "http-equiv=\"X-UA-Compatible\" content=\"IE=edge,chrome=1\" />";
ParsedUrl * url = new ParsedUrl(url1); ParsedUrl url(url1);
ParsedUrl * url_1 = new ParsedUrl(url2); ParsedUrl url_1(url2);
urlFrontier->Push(url); urlFrontier->Push(url);
urlFrontier->Push(url_1); urlFrontier->Push(url_1);
......
...@@ -23,13 +23,37 @@ ...@@ -23,13 +23,37 @@
#include <chrono> #include <chrono>
#include <future> #include <future>
#include <ctime> #include <ctime>
using DocIndex = const unordered_map< string, vector< unsigned long > >; using DocIndex = const unordered_map< string, vector< unsigned long > >;
using namespace std; using namespace std;
atomic_bool *alive = new atomic_bool(true); atomic_bool *alive = new atomic_bool(true);
//atomic_bool has_shutdown = false;
/*
void wait_to_shutdown(Indexer& indexer, Crawler* crawler, UrlFrontier* urlFrontier, ProducerConsumerQueue< DocIndex * > *IndexerQueue)
{
cout << "Press anything to quit" << endl;
char c = 't';
while(c != 'Q' && !has_shutdown)
{
c = getchar();
}
if(has_shutdown) return;
crawler->passAnchorTextToIndex( );
indexer.Kill();
indexer.WaitForFinish( );
urlFrontier->writeDataToDisk();
delete urlFrontier;
delete IndexerQueue;
>>>>>>> QueueRefactor
cout << "Indexer has finished running " << endl;
}
*/
void signalHandler( int signum ) { void signalHandler( int signum ) {
cout << "Interrupt signal (" << signum << ") received.\n"; cout << "Interrupt signal (" << signum << ") received.\n";
...@@ -145,7 +169,7 @@ int main ( int argc, char *argv[] ) ...@@ -145,7 +169,7 @@ int main ( int argc, char *argv[] )
if ( *seeds == '\n' ) if ( *seeds == '\n' )
{ {
ParsedUrl * url = new ParsedUrl( testFile ); ParsedUrl url(testFile);
cout << "Pushing: " << testFile << " to queue\n"; cout << "Pushing: " << testFile << " to queue\n";
urlFrontier->Push( url ); urlFrontier->Push( url );
testFile = ""; testFile = "";
...@@ -157,8 +181,8 @@ int main ( int argc, char *argv[] ) ...@@ -157,8 +181,8 @@ int main ( int argc, char *argv[] )
if ( testFile != "" ) if ( testFile != "" )
{ {
cout << "Pushing: " << testFile << " to queue\n"; cout << "Pushing: " << testFile << " to queue\n";
ParsedUrl * url = new ParsedUrl( testFile ); ParsedUrl url1(testFile);
urlFrontier->Push( url ); urlFrontier->Push( url1 );
} }
} }
else else
...@@ -171,7 +195,10 @@ int main ( int argc, char *argv[] ) ...@@ -171,7 +195,10 @@ int main ( int argc, char *argv[] )
indexer.StartThread( ); indexer.StartThread( );
Crawler *crawler = new Crawler( mode, urlFrontier, IndexerQueue, AnchorQueue ); Crawler *crawler = new Crawler( mode, urlFrontier, IndexerQueue, AnchorQueue );
crawler->SpawnSpiders( numberOfSpiders , alive);
//atomic_bool *alive = new atomic_bool(true);
crawler->SpawnSpiders( numberOfSpiders , alive, DocsToCrawl);
string input; string input;
...@@ -179,8 +206,9 @@ int main ( int argc, char *argv[] ) ...@@ -179,8 +206,9 @@ int main ( int argc, char *argv[] )
if(DocsToCrawl > 0 ) if(DocsToCrawl > 0 )
{ {
cout << "Crawling 100,000 documents for each spider" << endl; cout << "Crawling: " << DocsToCrawl << " documents for each spider" << endl;
crawler->WaitOnAllSpiders( ); crawler->WaitOnAllSpiders( );
//has_shutdown = true;
crawler->passAnchorTextToIndex( ); crawler->passAnchorTextToIndex( );
indexer.Kill(); indexer.Kill();
indexer.WaitForFinish( ); indexer.WaitForFinish( );
......
...@@ -361,9 +361,9 @@ void Parser::pushToUrlQueue ( string url, ParsedUrl * currentUrl, string anchorT ...@@ -361,9 +361,9 @@ void Parser::pushToUrlQueue ( string url, ParsedUrl * currentUrl, string anchorT
{ {
try try
{ {
ParsedUrl *pUrl = new ParsedUrl( url ); ParsedUrl url_(url);
pUrl->setAnchorText( anchorText ); url_.setAnchorText( anchorText );
urlFrontier->Push( pUrl ); urlFrontier->Push( url_ );
if ( debug ) if ( debug )
{ {
cout << url << endl; cout << url << endl;
......
s = "http://www."
for i in range(0,10):
x = s + str(i) + ".com"
print(x)
...@@ -9,9 +9,9 @@ void ProducerConsumerQueue< T >::Push ( T obj ) ...@@ -9,9 +9,9 @@ void ProducerConsumerQueue< T >::Push ( T obj )
{ {
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
queue.push( obj ); queue_.push( obj );
if ( queue.size( ) == 1 ) if ( queue_.size( ) == 1 )
{ {
pthread_cond_broadcast( &consumer_cv ); pthread_cond_broadcast( &consumer_cv );
} }
...@@ -19,18 +19,48 @@ void ProducerConsumerQueue< T >::Push ( T obj ) ...@@ -19,18 +19,48 @@ void ProducerConsumerQueue< T >::Push ( T obj )
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
} }
template< class T >
bool ProducerConsumerQueue< T >::try_pop(T &result)
{
gettimeofday(&now, NULL);
timeToWait.tv_sec = now.tv_sec + 5;
timeToWait.tv_nsec = (now.tv_usec+1000UL*100)*1000UL;
int retval;
pthread_mutex_lock(&m);
while(queue_.empty()){
retval = pthread_cond_timedwait(&consumer_cv, &m, &timeToWait);
if(retval != 0){
fprintf(stderr, "pthread_cond_timedwait %s\n",
strerror(retval));
pthread_mutex_unlock(&m);
return false;
}
}
result = std::move(queue_.front());
queue_.pop();
pthread_mutex_unlock(&m);
return true;
}
template< class T > template< class T >
T ProducerConsumerQueue< T >::Pop ( ) T ProducerConsumerQueue< T >::Pop ( )
{ {
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
while ( queue.empty( ) == true ) while ( queue_.empty( ) == true )
{ {
pthread_cond_wait( &consumer_cv, &m ); pthread_cond_wait( &consumer_cv, &m );
} }
T front = queue.front( ); T front = queue_.front( );
queue.pop( ); queue_.pop( );
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
...@@ -41,7 +71,7 @@ template< class T > ...@@ -41,7 +71,7 @@ template< class T >
size_t ProducerConsumerQueue< T >::Size ( ) size_t ProducerConsumerQueue< T >::Size ( )
{ {
pthread_mutex_lock( &m ); pthread_mutex_lock( &m );
size_t size = queue.size( ); size_t size = queue_.size( );
pthread_mutex_unlock( &m ); pthread_mutex_unlock( &m );
return size; return size;
} }
\ No newline at end of file
...@@ -7,6 +7,9 @@ ...@@ -7,6 +7,9 @@
#include <queue> #include <queue>
#include <pthread.h> #include <pthread.h>
#include <chrono>
#include <sys/time.h>
//for now use STL queue, create better one later //for now use STL queue, create better one later
...@@ -14,10 +17,6 @@ ...@@ -14,10 +17,6 @@
template< class T > template< class T >
class ProducerConsumerQueue class ProducerConsumerQueue
{ {
private:
std::queue< T > queue;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t consumer_cv = PTHREAD_COND_INITIALIZER;
public: public:
...@@ -25,15 +24,20 @@ public: ...@@ -25,15 +24,20 @@ public:
{ } { }
void Push ( T obj ); virtual void Push ( T obj );
virtual bool try_pop(T& result);
T Pop ( ); virtual T Pop ( );
virtual size_t Size ( );
size_t Size ( );
//Right now these pass objects by value but //Right now these pass objects by value but
// probably should pass pointers in future // probably should pass pointers in future
protected:
std::queue< T > queue_;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t consumer_cv = PTHREAD_COND_INITIALIZER;
struct timespec timeToWait;
struct timeval now;
}; };
//Necessary because this class is templated //Necessary because this class is templated
......
//
// Created by Ben Bergkamp on 4/5/18.
//
#include "ProducerConsumerQueue.h"
#include "../crawler/UrlFrontier.h"
#include <iostream>
using namespace std;
int main()
{
ProducerConsumerQueue< int > queue;
queue.Push(2);
int x;
bool ret;
cout << "-----Testing Producer Consumer Queue-----\n";
cout << "Expecting: 1, 2, 0\n";
ret = queue.try_pop(x);
cout << "success: " << ret << endl;
cout << "val: " << x << endl;
ret = queue.try_pop(x);
cout << "success: " << ret << endl;
cout << "-----Now Testing Url Frontier-----\n";
cout << "Expecting: 1, http://www.espn.com, 0\n";
UrlFrontier fr;
ParsedUrl ps("http://www.espn.com");
ParsedUrl result;
fr.Push(ps);
ret = fr.try_pop(result);
cout << "success: " << ret << endl;
cout << "val: " << result.getCompleteUrl() << endl;
ret = queue.try_pop(x);
cout << "success: " << ret << endl;
}
\ No newline at end of file
http://www.alkdjhfalkd.com
\ No newline at end of file
http://www.0.com
1
http://www.1.com
akjdlhfad
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment