Skip to content

Commit

Permalink
✨ Add scrap
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Sep 7, 2024
1 parent 22b4dad commit 881732c
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 22 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"ext-iconv": "*",
"ext-openswoole": "*",
"amphp/amp": "^3.0",
"darkwood/flow": "^1.2",
"darkwood/flow": "dev-1.x-dev",
"doctrine/dbal": "^3",
"doctrine/doctrine-bundle": "^2.12",
"doctrine/doctrine-migrations-bundle": "^3.3",
Expand Down
44 changes: 23 additions & 21 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 169 additions & 0 deletions src/Command/ScrapCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
<?php

declare(strict_types=1);

namespace App\Command;

use App\Job\FlowExamples\ScrapUrlJob;
use App\Job\FlowExamples\ScrapUrlsJob;
use App\Model\UrlContent;
use Fiber;
use Flow\AsyncHandler\DeferAsyncHandler;
use Flow\Driver\FiberDriver;
use Flow\Flow\Flow;
use Flow\Ip;
use Flow\IpStrategy\FlattenIpStrategy;
use Flow\Job\YJob;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Contracts\HttpClient\HttpClientInterface;

use function count;
use function sprintf;

#[AsCommand(
name: 'app:scrap',
description: 'This allows scrap pages with flow',
)]
class ScrapCommand extends Command
{
private HttpClientInterface $httpClient;

public function __construct(HttpClientInterface $httpClient)
{
parent::__construct();
$this->httpClient = $httpClient;
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);

$driver = new FiberDriver();

function getUserData($user, $httpClient)

Check failure on line 47 in src/Command/ScrapCommand.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function App\Command\getUserData() has no return type specified.

Check failure on line 47 in src/Command/ScrapCommand.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function App\Command\getUserData() has parameter $httpClient with no type specified.

Check failure on line 47 in src/Command/ScrapCommand.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function App\Command\getUserData() has parameter $user with no type specified.

Check failure on line 47 in src/Command/ScrapCommand.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Inner named functions are not supported by PHPStan. Consider refactoring to an anonymous function, class method, or a top-level-defined function. See issue #165 (https://github.com/phpstan/phpstan/issues/165) for more details.
{
$userId = $user['id'];
$todosUrl = "https://jsonplaceholder.typicode.com/users/{$userId}/todos";
$postsUrl = "https://jsonplaceholder.typicode.com/users/{$userId}/posts";

$responses = [
'todos' => $httpClient->request('GET', $todosUrl),
'posts' => $httpClient->request('GET', $postsUrl),
];

Fiber::suspend();

$todos = $responses['todos']->toArray();
$posts = $responses['posts']->toArray();

$user['todos'] = $todos;
$user['posts'] = $posts;

return $user;
}

$flow = Flow::do(function () use ($io) {
yield new ScrapUrlsJob();
yield static function (array $urlDatas) use ($io) {
$io->writeln(sprintf('ScrapUrlsJob : Finished scrapping %d urls', count($urlDatas)));

return $urlDatas;
};
yield [new ScrapUrlJob(), null, new FlattenIpStrategy()];
yield static function (UrlContent $urlData) use ($io) {
$io->writeln(sprintf('ScrapUrlJob : Finished scrapping %s', $urlData->url));
};

yield static fn () => [null, []];
yield new YJob(function ($rec) {
return function ($data) use ($rec) {
[$i, $users] = $data;
if ($i === null) {
$response = $this->httpClient->request('GET', 'https://jsonplaceholder.typicode.com/users');
Fiber::suspend();
$users = $response->toArray();

return $rec([0, $users]);
}
if ($i >= 0 && $i < count($users)) {
$users[$i] = getUserData($users[$i], $this->httpClient);

Check failure on line 93 in src/Command/ScrapCommand.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function getUserData not found.

return $rec([$i + 1, $users]);
}

return $users;
};
});
yield static function ($users) use ($io) {
$io->writeln(sprintf('ScrapYJob : Finished scrapping %d', count($users)));
};

yield static fn () => [null, []];
yield [new YJob(function ($rec) {
return function ($args) use ($rec) {
[$data, $defer] = $args;

return $defer(function ($complete, $async) use ($data, $defer, $rec) {
[$i, $users] = $data;
if ($i === null) {
$response = $this->httpClient->request('GET', 'https://jsonplaceholder.typicode.com/users');
Fiber::suspend();
$users = $response->toArray();

$async($rec([[0, $users], $defer]), function ($result) use ($complete) {
$complete($result);
});
} else if ($i >= 0 && $i < count($users)) {
$users[$i] = getUserData($users[$i], $this->httpClient);

Check failure on line 121 in src/Command/ScrapCommand.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Function getUserData not found.

$async($rec([[$i + 1, $users], $defer]), function ($result) use ($complete) {
$complete($result);
});
} else {
$complete([$users, $defer]);
}
});
};
}), null, null, null, new DeferAsyncHandler()];
yield static function ($users) use ($io) {
$io->writeln(sprintf('ScrapYDeferJob : Finished scrapping %d', count($users)));
};
}, ['driver' => $driver]);

$flow(new Ip([
new UrlContent('https://www.google.fr'),
new UrlContent('https://www.apple.com'),
new UrlContent('https://www.microsoft.com'),
new UrlContent('https://www.amazon.com'),
new UrlContent('https://www.facebook.com'),
new UrlContent('https://www.netflix.com'),
new UrlContent('https://www.spotify.com'),
new UrlContent('https://www.wikipedia.org'),
new UrlContent('https://www.x.com'),
new UrlContent('https://www.instagram.com'),
new UrlContent('https://www.linkedin.com'),
new UrlContent('https://www.reddit.com'),
new UrlContent('https://www.ebay.com'),
new UrlContent('https://www.cnn.com'),
new UrlContent('https://www.bbc.co.uk'),
new UrlContent('https://www.yahoo.com'),
new UrlContent('https://www.bing.com'),
new UrlContent('https://www.pinterest.com'),
new UrlContent('https://www.tumblr.com'),
new UrlContent('https://www.paypal.com'),
new UrlContent('https://www.dropbox.com'),
new UrlContent('https://www.adobe.com'),
new UrlContent('https://www.salesforce.com'),
]));

$flow->await();

$io->success('Scraping is done.');

return Command::SUCCESS;
}
}
71 changes: 71 additions & 0 deletions src/IpStrategy/FlattenIpStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

declare(strict_types=1);

namespace Flow\IpStrategy;

use Flow\Event;
use Flow\Event\PoolEvent;
use Flow\Event\PullEvent;
use Flow\Event\PushEvent;
use Flow\Exception\LogicException;
use Flow\Ip;
use Flow\IpPool;
use Flow\IpStrategyInterface;

/**
* @template T
*
* @implements IpStrategyInterface<T>
*/
class FlattenIpStrategy implements IpStrategyInterface
{
/**
* @var IpPool<T>
*/
private IpPool $ipPool;

Check failure on line 26 in src/IpStrategy/FlattenIpStrategy.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Property Flow\IpStrategy\FlattenIpStrategy::$ipPool has unknown class Flow\IpPool as its type.

Check failure on line 26 in src/IpStrategy/FlattenIpStrategy.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Property Flow\IpStrategy\FlattenIpStrategy::$ipPool has unknown class Flow\IpPool as its type.

public function __construct()
{
$this->ipPool = new IpPool();

Check failure on line 30 in src/IpStrategy/FlattenIpStrategy.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Instantiated class Flow\IpPool not found.
}

public static function getSubscribedEvents(): array
{
return [
Event::PUSH => 'push',
Event::PULL => 'pull',
Event::POOL => 'pool',

Check failure on line 38 in src/IpStrategy/FlattenIpStrategy.php

View workflow job for this annotation

GitHub Actions / Execute PHPStan analysis (8.3)

Access to undefined constant Flow\Event::POOL.
];
}

/**
* @param PushEvent<T> $event
*/
public function push(PushEvent $event): void
{
$ip = $event->getIp();
if (!is_iterable($ip->data)) {
throw new LogicException('Ip data must be iterable');
}
foreach ($ip->data as $data) {
$this->ipPool->addIp(new Ip($data));
}
}

/**
* @param PullEvent<T> $event
*/
public function pull(PullEvent $event): void
{
$ip = $this->ipPool->shiftIp();
if ($ip !== null) {
$event->addIp($ip);
}
}

public function pool(PoolEvent $event): void
{
$event->addIps($this->ipPool->getIps());
}
}
Loading

0 comments on commit 881732c

Please sign in to comment.