Skip to content
Snippets Groups Projects
Commit d8cc4e0c authored by jsclose's avatar jsclose
Browse files

intergrated indexer producer consumer queue

parent 1e92c676
No related branches found
No related tags found
No related merge requests found
No preview for this file type
......@@ -3,43 +3,56 @@
//
#include "HttpReader.h"
std::runtime_error HTTPConnectionError("Error connecting HTTP to url");
void HttpReader::request()
bool HttpReader::request()
{
sock = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
assert( sock != -1 );
try
{
sock = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
assert( sock != -1 );
// Get the host address.
// Get the host address.
struct hostent *host = gethostbyname( url.Host );
assert( host );
struct hostent *host = gethostbyname( url.Host );
if( host == nullptr)
throw HTTPConnectionError;
assert( host );
struct sockaddr_in address;
memset( &address, 0, sizeof( address ) );
address.sin_family = AF_INET;
address.sin_port = htons( 80 );
memcpy( &address.sin_addr, host->h_addr, host->h_length );
struct sockaddr_in address;
memset( &address, 0, sizeof( address ));
address.sin_family = AF_INET;
address.sin_port = htons( 80 );
memcpy( &address.sin_addr, host->h_addr, host->h_length );
// Connect to the host.
// Connect to the host.
int connectResult = connect( sock, ( struct sockaddr * )&address,
sizeof( address ) );
assert( connectResult == 0 );
int connectResult = connect( sock, (struct sockaddr *) &address,
sizeof( address ));
assert( connectResult == 0 );
// Send a GET message for the desired page.
// Send a GET message for the desired page.
cout << "Socket Reader is pulling from the web" << endl;
cout << "Socket Reader is pulling from the web" << endl;
string getMessage = "GET ";
getMessage += url.CompleteUrl;
getMessage += " HTTP/1.1\r\nHost: ";
getMessage += url.Host;
getMessage += "\r\nConnection: close\r\n\r\n";
string getMessage = "GET ";
getMessage += url.CompleteUrl;
getMessage += " HTTP/1.1\r\nHost: ";
getMessage += url.Host;
getMessage += "\r\nConnection: close\r\n\r\n";
cout << getMessage << endl;
send( sock, getMessage.c_str( ), getMessage.length( ), 0 );
cout << getMessage << endl;
send( sock, getMessage.c_str( ), getMessage.length( ), 0 );
bool isSuccess = checkStatus( );
return isSuccess;
}
catch (std::exception& e)
{
cerr << "Error trying to connect to Host" << endl;
return false;
}
}
bool HttpReader::fillBuffer(char * buf, size_t buf_size)
......
......@@ -10,7 +10,7 @@ class HttpReader : public StreamReader
public:
HttpReader( ParsedUrl url_in ) : url( url_in ) { }
void request();
bool request();
bool fillBuffer(char * buf, size_t buf_size);
bool checkStatus();
string PageToString();
......
......@@ -4,54 +4,71 @@
#include "HttpsReader.h"
void HttpsReader::request()
std::runtime_error HTTPSconnectionError("Error connecting HTTPS to url");
bool HttpsReader::request()
{
struct hostent *host = gethostbyname( url.Host );
assert( host );
try
{
struct hostent *host = gethostbyname( url.Host );
if(host == nullptr)
throw HTTPSconnectionError;
struct sockaddr_in address;
memset( &address, 0, sizeof( address ) );
address.sin_family = AF_INET;
address.sin_port = htons( 443 );
memcpy( &address.sin_addr, host->h_addr, host->h_length );
assert( host );
struct sockaddr_in address;
memset( &address, 0, sizeof( address ));
address.sin_family = AF_INET;
address.sin_port = htons( 443 );
memcpy( &address.sin_addr, host->h_addr, host->h_length );
// Create a TCP/IP socket.
// Create a TCP/IP socket.
sock = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
assert( sock != -1 );
sock = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP );
assert( sock != -1 );
// Connect the socket to the host address.
// Connect the socket to the host address.
int connectResult = connect( sock, ( struct sockaddr * )&address,
sizeof( address ) );
assert( connectResult == 0 );
int connectResult = connect( sock, (struct sockaddr *) &address,
sizeof( address ));
assert( connectResult == 0 );
// Build an SSL layer and set it to read/write
// to the socket we've connected.
// Build an SSL layer and set it to read/write
// to the socket we've connected.
ctx = SSL_CTX_new( SSLv23_method( ) );
assert( ctx );
ssl = SSL_new( ctx );
assert( ssl );
ctx = SSL_CTX_new( SSLv23_method( ));
SSL_set_fd( ssl, sock );
assert( ctx );
ssl = SSL_new( ctx );
assert( ssl );
// Establish an SSL connection.
SSL_set_fd( ssl, sock );
int sslConnectResult = SSL_connect( ssl );
assert( sslConnectResult == 1 );
// Establish an SSL connection.
// Send a GET message for the desired page through the SSL.
int sslConnectResult = SSL_connect( ssl );
if(sslConnectResult != 1)
throw HTTPSconnectionError;
assert( sslConnectResult == 1 );
string getMessage = "GET ";
getMessage += url.CompleteUrl;
getMessage += " HTTP/1.1\r\nHost: ";
getMessage += url.Host;
getMessage += "\r\nConnection: close\r\n\r\n";
// Send a GET message for the desired page through the SSL.
cout << getMessage << endl;
SSL_write( ssl, getMessage.c_str( ), getMessage.length( ) );
string getMessage = "GET ";
getMessage += url.CompleteUrl;
getMessage += " HTTP/1.1\r\nHost: ";
getMessage += url.Host;
getMessage += "\r\nConnection: close\r\n\r\n";
cout << getMessage << endl;
SSL_write( ssl, getMessage.c_str( ), getMessage.length( ));
bool isSuccess = checkStatus( );
return isSuccess;
}
catch (std::exception& e) {
cerr << "Error trying to connect to Host" << endl;
return false;
}
}
bool HttpsReader::fillBuffer(char * buf, size_t buf_size)
......@@ -102,6 +119,7 @@ ParsedUrl HttpsReader::getUrl()
void HttpsReader::closeReader()
{
SSL_shutdown(ssl);
SSL_free(ssl);
SSL_CTX_free(ctx);
......
......@@ -12,7 +12,7 @@ public:
HttpsReader( ParsedUrl url_in ) : url( url_in ) { }
void request();
bool request();
bool fillBuffer(char * buf, size_t buf_size);
string PageToString();
ParsedUrl getUrl();
......
......@@ -5,10 +5,11 @@
#include "LocalReader.h"
void LocalReader::request()
bool LocalReader::request()
{
//FIXME
//open the file?
return true;
}
bool LocalReader::fillBuffer(char * buf, size_t buf_size){
......
......@@ -12,7 +12,7 @@ public:
LocalReader( string url_in ) : fileName( url_in ) { }
void request();
bool request();
bool fillBuffer(char * buf, size_t buf_size);
bool checkStatus();
string PageToString();
......
......@@ -23,7 +23,7 @@ class StreamReader
{
public:
StreamReader() {};
virtual void request() = 0;
virtual bool request() = 0;
virtual bool fillBuffer(char * buf, size_t buf_size) = 0;
virtual bool checkStatus() = 0;
virtual string PageToString() = 0;
......
......@@ -8,7 +8,7 @@ void Crawler::SpawnSpiders( size_t num_spiders, unordered_map < string, int > *d
{
for ( size_t i = 0; i < num_spiders; i++ )
{
Spider *temp = new Spider( this->mode, this->urlFrontier, docMapLookup, duplicateUrlMap );
Spider *temp = new Spider( this->mode, this->urlFrontier, docMapLookup, duplicateUrlMap , this->IndexerQueue);
temp->StartThread( );
this->spiders.push_back( temp );
}
......
......@@ -11,13 +11,14 @@
*
*/
using namespace std;
using DocIndex = const unordered_map< string, vector< unsigned long > >;
class Crawler
{
public:
Crawler( string mode_in, ProducerConsumerQueue < ParsedUrl > *url_q_in )
: mode( mode_in ), urlFrontier( url_q_in )
Crawler( string mode_in, ProducerConsumerQueue < ParsedUrl > *url_q_in , ProducerConsumerQueue< DocIndex* > *doc_index_queue_in)
: IndexerQueue (doc_index_queue_in), mode( mode_in ), urlFrontier( url_q_in )
{ };
//spawns a number of works
......@@ -32,6 +33,7 @@ public:
private:
vector < Spider * > spiders;
ProducerConsumerQueue < ParsedUrl > *urlFrontier;
ProducerConsumerQueue< DocIndex* > *IndexerQueue;
//CrawlerStatistics housekeeper;
string mode;
......
......@@ -95,15 +95,22 @@ void Spider::FuncToRun()
{
StreamReader *reader = SR_factory( currentUrl, this->mode );
DocIndex * dict = parser.execute (reader);
bool success = reader->request();
if(success)
{
DocIndex * dict = parser.execute (reader);
//IndexerQueue->Push(dict);
printDocIndex(dict);
reader->closeReader();
printDocIndex(dict);
reader->closeReader();
//delete dict;
cond++;
}
delete reader;
delete dict;
cond++;
}
......@@ -162,26 +169,7 @@ bool Spider::shouldURLbeCrawled( size_t docID )
this->duplicateUrlMap->insert(std::make_pair(docID, 1));
return true;
}
/*
//search for url in doc cache
auto locationOnDisk = this->docMapLookup->find( url.CompleteUrl );
//bool protectedByRobots = checkRobots( url );
//if it doesnt find anything for that url key
if ( locationOnDisk == this->docMapLookup->end( ))
{
return true;
}
else
{
//Just for testing
Document::PrintDocMap(url.CompleteUrl, locationOnDisk->second);
}
return false;
*/
return true;
}
/*
......
......@@ -17,19 +17,31 @@
using namespace std;
using DocIndex = const unordered_map< string, vector< unsigned long > >;
class Spider : public ThreadClass
{
public:
Spider( string mode_in, ProducerConsumerQueue < ParsedUrl > *url_q_in,
unordered_map < string, int > *doc_map_lookup_in, unordered_map < size_t, int > *duplicate_url_map_in )
: mode( mode_in ), urlFrontier( url_q_in ), docMapLookup( doc_map_lookup_in ), parser( url_q_in), duplicateUrlMap(duplicate_url_map_in)
Spider( string mode_in,
ProducerConsumerQueue < ParsedUrl > *url_q_in,
unordered_map < string, int > *doc_map_lookup_in,
unordered_map < size_t, int > *duplicate_url_map_in ,
ProducerConsumerQueue < DocIndex* > *doc_index_queue_in
)
: mode( mode_in ),
urlFrontier( url_q_in ),
docMapLookup( doc_map_lookup_in ),
parser ( url_q_in),
duplicateUrlMap(duplicate_url_map_in),
IndexerQueue(doc_index_queue_in)
{
};
//Takes a url off of the url frontier
ParsedUrl getUrl();
......@@ -46,9 +58,10 @@ private:
int locationOnDisk;
ProducerConsumerQueue < ParsedUrl > *urlFrontier;
ProducerConsumerQueue< DocIndex* > *IndexerQueue;
unordered_map < size_t, int > *duplicateUrlMap;
string mode;
unordered_map < string, int > *docMapLookup;
Parser parser;
Parser parser ;
};
\ No newline at end of file
......@@ -24,6 +24,7 @@
#define PATH_TO_INDEX = 'bin/index/wordIDX'
#define PATH_TO_DOC_INDEX = 'bin/index/docIDX'
using DocIndex = const unordered_map< string, vector< unsigned long > >;
using namespace std;
......@@ -93,6 +94,10 @@ int main( int argc, char *argv[] )
unordered_map < size_t, int > *duplicateUrlMap = new unordered_map < size_t, int >( );
ProducerConsumerQueue<ParsedUrl> *urlFrontier = new ProducerConsumerQueue<ParsedUrl>();
ProducerConsumerQueue< DocIndex* > *IndexerQueue = new ProducerConsumerQueue<DocIndex*>();
char *seeds;
if (mode == "local")
......@@ -122,7 +127,7 @@ int main( int argc, char *argv[] )
}
unordered_map < string, int > *docMapLookUp = new unordered_map < string, int >( );
Crawler crawler( mode, urlFrontier );
Crawler crawler( mode, urlFrontier, IndexerQueue );
crawler.SpawnSpiders(numberOfSpiders , docMapLookUp, duplicateUrlMap);
......
......@@ -31,9 +31,8 @@ const unordered_map< string, vector< unsigned long > > *Parser::execute ( Stream
*/
void Parser::parse ( StreamReader* reader, Tokenizer *tokenizer )
{
reader->request();
bool success = reader->checkStatus();
if(success) {
string html = reader->PageToString();
ParsedUrl currentUrl = reader->getUrl();
......@@ -50,9 +49,12 @@ void Parser::parse ( StreamReader* reader, Tokenizer *tokenizer )
offsetURL = tokenizer->execute( url, offsetURL, Tokenizer::URL );
while ( htmlIt != html.end( ) )
while ( htmlIt != html.end( ) )
{
// if open bracket
if( *htmlIt == '\0')
break;
if ( *htmlIt == '<' )
{
auto begCloseTag = findNext( "</", htmlIt );
......@@ -95,7 +97,7 @@ void Parser::parse ( StreamReader* reader, Tokenizer *tokenizer )
}
}
}
}
/**
......
......@@ -24,6 +24,9 @@ public:
*/
Tokenizer ( );
/**
* Returns pointer to the docIndex dictionary
*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment