A robust, production-ready stock market data pipeline built on Google Cloud Platform (GCP) that processes and analyzes high-frequency stock data in real-time. This project demonstrates advanced data engineering practices including parallel processing, data validation, and real-time analytics.
- Real-time Processing: Fetches and processes stock data at 5-minute intervals
- Scalable Architecture: Built on GCP services for high availability and scalability
- Intelligent Rate Limiting: Smart API key rotation system
- Robust Error Handling: Comprehensive retry mechanisms and validation
- Advanced Analytics: Real-time technical indicators and market analysis
- Interactive Dashboard: Rich visualization powered by Streamlit
- Data Integrity: Multi-layer deduplication and validation processes
- High Performance: Processes ~4,000 data points per stock over 30 days
-
Data Collection
- Alpha Vantage API integration
- Rate limit management
- Initial data validation
-
Message Queue
- Google Pub/Sub implementation
- Asynchronous message processing
- Message persistence and retry logic
-
Data Processing
Raw Data -> Validation -> Transformation -> Technical Analysis -> Storage
- Data cleaning and normalization
- Technical indicator calculation
- Real-time analytics processing
-
Storage Layer
- BigQuery: Structured data storage
- Cloud Storage: Raw data archival
- Dual-write consistency patterns
Symbol | Company | Sector | Update Frequency |
---|---|---|---|
AMZN | Amazon | Technology | 5 min |
TSLA | Tesla | Automotive | 5 min |
AAPL | Apple | Technology | 5 min |
GOOGL | Technology | 5 min | |
MSFT | Microsoft | Technology | 5 min |
IBM | IBM | Technology | 5 min |
JPM | JPMorgan | Finance | 5 min |
PFE | Pfizer | Healthcare | 5 min |
XOM | ExxonMobil | Energy | 5 min |
KO | Coca-Cola | Consumer | 5 min |
- Python 3.9+
- Google Cloud Platform
- Docker & Docker Compose
- Alpha Vantage API
- Cloud Pub/Sub
- BigQuery
- Cloud Storage
- Cloud Functions (optional)
- Python 3.9+
- GCP Account with enabled billing #Get Your Service Key from GCP - Place it in the keys/
- Alpha Vantage API key
- Docker
-
Clone & Configure Environment
# Clone repository git clone https://github.com/ansh-info/StockPulse.git cd StockPulse # Create virtual environment python -m venv venv source venv/bin/activate # Windows: venv\Scripts\activate # Install dependencies pip install -r requirements.txt
-
GCP Configuration
# GET YOUR KEY - PLACE IT IN THE keys/ # Set up service account export GOOGLE_APPLICATION_CREDENTIALS="path/to/key.json" # Configure gcloud CLI gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS gcloud config set project YOUR_PROJECT_ID
-
Update Configuration
# config.py and .env GCP_CONFIG = { "GCP_PROJECT_ID": "your-project-id", "GCP_BUCKET_NAME": "your-bucket-name", "GCP_TOPIC_NAME": "your-topic-name", "GCP_DATASET_NAME": "your-dataset-name" } ALPHA_VANTAGE_KEY = { "ALPHA_VANTAGE_KEY_1": "your-api-key-1" }
# Build and run with Docker Compose
docker-compose up -d
# Check container status
docker-compose ps
# View logs
docker-compose logs -f
# Interact with gcloudsdk
docker exec -it gcloudsdk /bin/bash
# Interact with python container
docker exec -it python /bin/bash
-
Initialize the Environment
source venv/bin/activate export GOOGLE_APPLICATION_CREDENTIALS="path/to/key.json"
-
Run Core Components
# Start data loader pipeline (wait for the tables to be created) python bigquery_loader.py # Start data pipeline (wait for the data to be fetched and published) python stocks_pipeline.py # Run deduplication process (start after the bigquery_loader completes) python dedup_pipeline.py # Launch dashboard (run it from the app/ - to get white background) streamlit run dashboard.py
- Real-time stock price visualization
- Technical analysis indicators:
- Moving Averages (SMA, EMA)
- RSI (Relative Strength Index)
- MACD (Moving Average Convergence Divergence)
- Volume analysis with VWAP
- Customizable timeframes
- Interactive candlestick charts
StockPulse/
โ
โโโ LICENSE
โโโ README.md
โโโ app
โย ย โโโ __init__.py
โย ย โโโ dashboard.py
โโโ docker-compose.yml
โโโ docs
โย ย โโโ docs.md
โโโ keys
โย ย โโโ key.example.json
โย ย โโโ key.json
โโโ requirements.txt
โโโ src
โย ย โโโ __init__.py
โย ย โโโ __pycache__
โย ย โย ย โโโ __init__.cpython-39.pyc
โย ย โโโ config
โย ย โย ย โโโ __init__.py
โย ย โย ย โโโ __pycache__
โย ย โย ย โย ย โโโ __init__.cpython-39.pyc
โย ย โย ย โย ย โโโ config.cpython-39.pyc
โย ย โย ย โโโ config.py
โย ย โโโ ingestion
โย ย โย ย โโโ __init__.py
โย ย โย ย โโโ stocks_pipeline.py
โย ย โโโ loader
โย ย โย ย โโโ __init__.py
โย ย โย ย โโโ bigquery_loader.py
โย ย โโโ preprocessing
โย ย โโโ __init__.py
โย ย โโโ data_preprocessor.py
โย ย โโโ dedup_pipeline.py
โย ย โโโ preprocessing_pipeline.py
โโโ tests
โโโ __init__.py
โโโ check_gcs_buckets.py
@retry(
retry_on_exception=retry_if_exception_type(Exception),
wait_exponential_multiplier=1000,
wait_exponential_max=10000,
stop_max_attempt_number=3
)
- Timestamp format validation
- Price range checks
- Volume validation
- Data completeness verification
- Machine Learning integration for price prediction
- Real-time alerting system
- Advanced technical indicators
- Performance optimization
- Enhanced monitoring and logging
- API endpoint for data access
Detailed documentation is available in the /docs
directory:
- API Documentation
- Setup Guide
- Troubleshooting Guide
- Best Practices
This project is licensed under the MIT License - see the LICENSE file for details.
If you use this project in your research, please cite:
@software{StockPulse_2024,
author = {Ansh Kumar and Apoorva Gupta},
title = {StockPulse: GCP-powered platform for real-time stock market data processing and visualization},
year = {2024},
url = {https://github.com/ansh-info/StockPulse.git}
}