Skip to content

Commit

Permalink
Merge pull request scrapy#4434 from BroodingKangaroo/ISSUE-4250-add_b…
Browse files Browse the repository at this point in the history
…atch_deliveries

Feed exports: add batch deliveries
  • Loading branch information
kmike authored Jul 29, 2020
2 parents 5265853 + a6c1d79 commit 5e2d1bd
Show file tree
Hide file tree
Showing 7 changed files with 616 additions and 102 deletions.
47 changes: 47 additions & 0 deletions docs/topics/feed-exports.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ These are the settings used for configuring the feed exports:
* :setting:`FEED_STORAGE_FTP_ACTIVE`
* :setting:`FEED_STORAGE_S3_ACL`
* :setting:`FEED_EXPORTERS`
* :setting:`FEED_EXPORT_BATCH_ITEM_COUNT`

.. currentmodule:: scrapy.extensions.feedexport

Expand Down Expand Up @@ -292,6 +293,7 @@ as a fallback value if that key is not provided for a specific feed definition.
* ``fields``: falls back to :setting:`FEED_EXPORT_FIELDS`
* ``indent``: falls back to :setting:`FEED_EXPORT_INDENT`
* ``store_empty``: falls back to :setting:`FEED_STORE_EMPTY`
* ``batch_item_count``: falls back to :setting:`FEED_EXPORT_BATCH_ITEM_COUNT`

.. setting:: FEED_EXPORT_ENCODING

Expand Down Expand Up @@ -446,6 +448,51 @@ format in :setting:`FEED_EXPORTERS`. E.g., to disable the built-in CSV exporter
'csv': None,
}


.. setting:: FEED_EXPORT_BATCH_ITEM_COUNT

FEED_EXPORT_BATCH_ITEM_COUNT
-----------------------------

Default: ``0``

If assigned an integer number higher than ``0``, Scrapy generates multiple output files
storing up to the specified number of items in each output file.

When generating multiple output files, you must use at least one of the following
placeholders in the feed URI to indicate how the different output file names are
generated:

* ``%(batch_time)s`` - gets replaced by a timestamp when the feed is being created
(e.g. ``2020-03-28T14-45-08.237134``)

* ``%(batch_id)d`` - gets replaced by the sequence number of the batch.

Use :ref:`printf-style string formatting <python:old-string-formatting>` to
alter the number format. For example, to make the batch ID a 5-digit
number by introducing leading zeroes as needed, use ``%(batch_id)05d``
(e.g. ``3`` becomes ``00003``, ``123`` becomes ``00123``).

For instance, if your settings include::

FEED_EXPORT_BATCH_ITEM_COUNT = 100

And your :command:`crawl` command line is::

scrapy crawl spidername -o "dirname/%(batch_id)d-filename%(batch_time)s.json"

The command line above can generate a directory tree like::

->projectname
-->dirname
--->1-filename2020-03-28T14-45-08.237134.json
--->2-filename2020-03-28T14-45-09.148903.json
--->3-filename2020-03-28T14-45-10.046092.json

Where the first and second files contain exactly 100 items. The last one contains
100 items or fewer.


.. _URIs: https://en.wikipedia.org/wiki/Uniform_Resource_Identifier
.. _Amazon S3: https://aws.amazon.com/s3/
.. _botocore: https://github.com/boto/botocore
Expand Down
160 changes: 115 additions & 45 deletions scrapy/extensions/feedexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import logging
import os
import re
import sys
import warnings
from datetime import datetime
Expand Down Expand Up @@ -206,14 +207,16 @@ def _store_in_thread(self, file):


class _FeedSlot:
def __init__(self, file, exporter, storage, uri, format, store_empty):
def __init__(self, file, exporter, storage, uri, format, store_empty, batch_id, uri_template):
self.file = file
self.exporter = exporter
self.storage = storage
# feed params
self.uri = uri
self.batch_id = batch_id
self.format = format
self.store_empty = store_empty
self.uri_template = uri_template
self.uri = uri
# flags
self.itemcount = 0
self._exporting = False
Expand Down Expand Up @@ -270,63 +273,112 @@ def __init__(self, crawler):
for uri, feed in self.feeds.items():
if not self._storage_supported(uri):
raise NotConfigured
if not self._settings_are_valid():
raise NotConfigured
if not self._exporter_supported(feed['format']):
raise NotConfigured

def open_spider(self, spider):
for uri, feed in self.feeds.items():
uri = uri % self._get_uri_params(spider, feed['uri_params'])
storage = self._get_storage(uri)
file = storage.open(spider)
exporter = self._get_exporter(
file=file,
format=feed['format'],
fields_to_export=feed['fields'],
encoding=feed['encoding'],
indent=feed['indent'],
)
slot = _FeedSlot(file, exporter, storage, uri, feed['format'], feed['store_empty'])
self.slots.append(slot)
if slot.store_empty:
slot.start_exporting()
uri_params = self._get_uri_params(spider, feed['uri_params'])
self.slots.append(self._start_new_batch(
batch_id=1,
uri=uri % uri_params,
feed=feed,
spider=spider,
uri_template=uri,
))

def close_spider(self, spider):
deferred_list = []
for slot in self.slots:
if not slot.itemcount and not slot.store_empty:
# We need to call slot.storage.store nonetheless to get the file
# properly closed.
d = defer.maybeDeferred(slot.storage.store, slot.file)
deferred_list.append(d)
continue
slot.finish_exporting()
logfmt = "%s %%(format)s feed (%%(itemcount)d items) in: %%(uri)s"
log_args = {'format': slot.format,
'itemcount': slot.itemcount,
'uri': slot.uri}
d = defer.maybeDeferred(slot.storage.store, slot.file)

# Use `largs=log_args` to copy log_args into function's scope
# instead of using `log_args` from the outer scope
d.addCallback(
lambda _, largs=log_args: logger.info(
logfmt % "Stored", largs, extra={'spider': spider}
)
)
d.addErrback(
lambda f, largs=log_args: logger.error(
logfmt % "Error storing", largs,
exc_info=failure_to_exc_info(f), extra={'spider': spider}
)
)
d = self._close_slot(slot, spider)
deferred_list.append(d)
return defer.DeferredList(deferred_list) if deferred_list else None

def _close_slot(self, slot, spider):
if not slot.itemcount and not slot.store_empty:
# We need to call slot.storage.store nonetheless to get the file
# properly closed.
return defer.maybeDeferred(slot.storage.store, slot.file)
slot.finish_exporting()
logfmt = "%s %%(format)s feed (%%(itemcount)d items) in: %%(uri)s"
log_args = {'format': slot.format,
'itemcount': slot.itemcount,
'uri': slot.uri}
d = defer.maybeDeferred(slot.storage.store, slot.file)

# Use `largs=log_args` to copy log_args into function's scope
# instead of using `log_args` from the outer scope
d.addCallback(
lambda _, largs=log_args: logger.info(
logfmt % "Stored", largs, extra={'spider': spider}
)
)
d.addErrback(
lambda f, largs=log_args: logger.error(
logfmt % "Error storing", largs,
exc_info=failure_to_exc_info(f), extra={'spider': spider}
)
)
return d

def _start_new_batch(self, batch_id, uri, feed, spider, uri_template):
"""
Redirect the output data stream to a new file.
Execute multiple times if FEED_EXPORT_BATCH_ITEM_COUNT setting or FEEDS.batch_item_count is specified
:param batch_id: sequence number of current batch
:param uri: uri of the new batch to start
:param feed: dict with parameters of feed
:param spider: user spider
:param uri_template: template of uri which contains %(batch_time)s or %(batch_id)d to create new uri
"""
storage = self._get_storage(uri)
file = storage.open(spider)
exporter = self._get_exporter(
file=file,
format=feed['format'],
fields_to_export=feed['fields'],
encoding=feed['encoding'],
indent=feed['indent'],
)
slot = _FeedSlot(
file=file,
exporter=exporter,
storage=storage,
uri=uri,
format=feed['format'],
store_empty=feed['store_empty'],
batch_id=batch_id,
uri_template=uri_template,
)
if slot.store_empty:
slot.start_exporting()
return slot

def item_scraped(self, item, spider):
slots = []
for slot in self.slots:
slot.start_exporting()
slot.exporter.export_item(item)
slot.itemcount += 1
# create new slot for each slot with itemcount == FEED_EXPORT_BATCH_ITEM_COUNT and close the old one
if (
self.feeds[slot.uri_template]['batch_item_count']
and slot.itemcount >= self.feeds[slot.uri_template]['batch_item_count']
):
uri_params = self._get_uri_params(spider, self.feeds[slot.uri_template]['uri_params'], slot)
self._close_slot(slot, spider)
slots.append(self._start_new_batch(
batch_id=slot.batch_id + 1,
uri=slot.uri_template % uri_params,
feed=self.feeds[slot.uri_template],
spider=spider,
uri_template=slot.uri_template,
))
else:
slots.append(slot)
self.slots = slots

def _load_components(self, setting_prefix):
conf = without_none_values(self.settings.getwithbase(setting_prefix))
Expand All @@ -343,6 +395,22 @@ def _exporter_supported(self, format):
return True
logger.error("Unknown feed format: %(format)s", {'format': format})

def _settings_are_valid(self):
"""
If FEED_EXPORT_BATCH_ITEM_COUNT setting or FEEDS.batch_item_count is specified uri has to contain
%(batch_time)s or %(batch_id)d to distinguish different files of partial output
"""
for uri_template, values in self.feeds.items():
if values['batch_item_count'] and not re.search(r'%\(batch_time\)s|%\(batch_id\)', uri_template):
logger.error(
'%(batch_time)s or %(batch_id)d must be in the feed URI ({}) if FEED_EXPORT_BATCH_ITEM_COUNT '
'setting or FEEDS.batch_item_count is specified and greater than 0. For more info see: '
'https://docs.scrapy.org/en/latest/topics/feed-exports.html#feed-export-batch-item-count'
''.format(uri_template)
)
return False
return True

def _storage_supported(self, uri):
scheme = urlparse(uri).scheme
if scheme in self.storages:
Expand All @@ -368,12 +436,14 @@ def _get_exporter(self, file, format, *args, **kwargs):
def _get_storage(self, uri):
return self._get_instance(self.storages[urlparse(uri).scheme], uri)

def _get_uri_params(self, spider, uri_params):
def _get_uri_params(self, spider, uri_params, slot=None):
params = {}
for k in dir(spider):
params[k] = getattr(spider, k)
ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')
params['time'] = ts
utc_now = datetime.utcnow()
params['time'] = utc_now.replace(microsecond=0).isoformat().replace(':', '-')
params['batch_time'] = utc_now.isoformat().replace(':', '-')
params['batch_id'] = slot.batch_id + 1 if slot is not None else 1
uripar_function = load_object(uri_params) if uri_params else lambda x, y: None
uripar_function(params, spider)
return params
1 change: 1 addition & 0 deletions scrapy/settings/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'stdout': 'scrapy.extensions.feedexport.StdoutFeedStorage',
}
FEED_EXPORT_BATCH_ITEM_COUNT = 0
FEED_EXPORTERS = {}
FEED_EXPORTERS_BASE = {
'json': 'scrapy.exporters.JsonItemExporter',
Expand Down
1 change: 1 addition & 0 deletions scrapy/utils/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_sources(use_closest=True):

def feed_complete_default_values_from_settings(feed, settings):
out = feed.copy()
out.setdefault("batch_item_count", settings.getint('FEED_EXPORT_BATCH_ITEM_COUNT'))
out.setdefault("encoding", settings["FEED_EXPORT_ENCODING"])
out.setdefault("fields", settings.getlist("FEED_EXPORT_FIELDS") or None)
out.setdefault("store_empty", settings.getbool("FEED_STORE_EMPTY"))
Expand Down
Loading

0 comments on commit 5e2d1bd

Please sign in to comment.