finance-api/data/data_generator.py

128 lines
4.7 KiB
Python

import csv
import random
import string
from datetime import datetime
from datetime import timedelta
class StockDataGenerator:
def __init__(
self,
daily_price_fluctuation=.02,
daily_volume_fluctuation=.05,
date_format="%Y-%m-%d",
num_companies=4000,
start_date="2000-01-01",
end_date="2024-01-01",
csv_output_file="./output/tickers.csv",
chunk_size=500000,
volume_adjustment_factor=5
):
self.daily_price_fluctuation = daily_price_fluctuation
self.daily_volume_fluctuation = daily_volume_fluctuation
self.date_format = date_format
self.num_companies = num_companies
self.start_date = datetime.strptime(start_date, self.date_format)
self.end_date = datetime.strptime(end_date, self.date_format)
self.chunk_size = chunk_size
self.csv_output_file = csv_output_file
self.volume_adjustment_factor = volume_adjustment_factor
def generate_raw_data(self, output_path=None, start_date="2000-01-01", end_date="2024-01-01"):
tickers = self._generate_fake_tickers()
dates = self._generate_dates()
ticker_count = 0
for ticker in tickers:
daily_ticker_data = self._generate_stock_data(ticker, dates)
self._write_to_csv(daily_ticker_data, output_path)
ticker_count += 1
print(f"Generated data for: {ticker} and is {ticker_count} of {len(tickers)}")
def _generate_dates(self):
dates = []
current_date = self.start_date
while current_date <= self.end_date:
if current_date.weekday() < 5:
# Skip weekends.
dates.append(current_date.strftime(self.date_format))
current_date += timedelta(days=1)
return dates
def _generate_fake_tickers(self):
companies = []
while True:
ticker = "".join(random.choices(string.ascii_uppercase, k=4))
if ticker not in companies:
companies.append(ticker)
if len(companies) == self.num_companies:
break
return companies
def _generate_stock_data(self, symbol, dates):
num_days = len(dates)
staring_price = random.uniform(5, 500)
prices = [staring_price]
volumes = []
avg_daily_volume = random.randint(50000, 2000000)
for i in range(1, num_days):
# Simulate price using a random walk
negative_change = -self.daily_price_fluctuation
positive_change = self.daily_price_fluctuation
price = prices[i-1] * (1 + random.uniform(negative_change, positive_change))
prices.append(price)
# Simulate volume with some daily variability
negative_change = -self.daily_volume_fluctuation
positive_change = self.daily_volume_fluctuation
daily_volume = avg_daily_volume * (1 + random.uniform(negative_change, positive_change))
# Adjust volume based on price change magnitude
price_change = abs(prices[i] - prices[i-1]) / prices[i-1]
volume_adjustment = 1 + price_change * self.volume_adjustment_factor
daily_volume *= volume_adjustment
volumes.append(int(daily_volume))
stock_data = []
for i in range(num_days):
open_price = prices[i] * random.uniform(0.95, 1.05)
high_price = max(open_price, prices[i] * random.uniform(1.00, 1.10))
low_price = min(open_price, prices[i] * random.uniform(0.90, 0.99))
close_price = prices[i]
volume = volumes[i-1] if i > 0 else avg_daily_volume
stock_data.append({
"date": dates[i],
"symbol": f"{symbol}-fake",
"open": f"{open_price:.3f}",
"high": f"{high_price:.3f}",
"low": f"{low_price:.3f}",
"close": f"{close_price:.3f}",
"volume": volume
})
return stock_data
def _write_to_csv(self, data, output=None):
if output:
output_file = output
else:
output_file = self.csv_output_file
with open(output_file, "a") as file:
fieldnames = ["date", "symbol", "open", "high", "low", "close", "volume"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
if file.tell() == 0:
# Check if new file and write header.
writer.writeheader()
writer.writerows(data)
if __name__ == "__main__":
generator = StockDataGenerator()
generator.generate_raw_data()