import csv import random import string from datetime import datetime from datetime import timedelta class StockDataGenerator: def __init__( self, daily_price_fluctuation=.02, daily_volume_fluctuation=.05, date_format="%Y-%m-%d", num_companies=4000, start_date="2000-01-01", end_date="2024-01-01", csv_output_file="./output/tickers.csv", chunk_size=500000, volume_adjustment_factor=5 ): self.daily_price_fluctuation = daily_price_fluctuation self.daily_volume_fluctuation = daily_volume_fluctuation self.date_format = date_format self.num_companies = num_companies self.start_date = datetime.strptime(start_date, self.date_format) self.end_date = datetime.strptime(end_date, self.date_format) self.chunk_size = chunk_size self.csv_output_file = csv_output_file self.volume_adjustment_factor = volume_adjustment_factor def generate_raw_data(self, output_path=None, start_date="2000-01-01", end_date="2024-01-01"): tickers = self._generate_fake_tickers() dates = self._generate_dates() ticker_count = 0 for ticker in tickers: daily_ticker_data = self._generate_stock_data(ticker, dates) self._write_to_csv(daily_ticker_data, output_path) ticker_count += 1 print(f"Generated data for: {ticker} and is {ticker_count} of {len(tickers)}") def _generate_dates(self): dates = [] current_date = self.start_date while current_date <= self.end_date: if current_date.weekday() < 5: # Skip weekends. dates.append(current_date.strftime(self.date_format)) current_date += timedelta(days=1) return dates def _generate_fake_tickers(self): companies = [] while True: ticker = "".join(random.choices(string.ascii_uppercase, k=4)) if ticker not in companies: companies.append(ticker) if len(companies) == self.num_companies: break return companies def _generate_stock_data(self, symbol, dates): num_days = len(dates) staring_price = random.uniform(5, 500) prices = [staring_price] volumes = [] avg_daily_volume = random.randint(50000, 2000000) for i in range(1, num_days): # Simulate price using a random walk negative_change = -self.daily_price_fluctuation positive_change = self.daily_price_fluctuation price = prices[i-1] * (1 + random.uniform(negative_change, positive_change)) prices.append(price) # Simulate volume with some daily variability negative_change = -self.daily_volume_fluctuation positive_change = self.daily_volume_fluctuation daily_volume = avg_daily_volume * (1 + random.uniform(negative_change, positive_change)) # Adjust volume based on price change magnitude price_change = abs(prices[i] - prices[i-1]) / prices[i-1] volume_adjustment = 1 + price_change * self.volume_adjustment_factor daily_volume *= volume_adjustment volumes.append(int(daily_volume)) stock_data = [] for i in range(num_days): open_price = prices[i] * random.uniform(0.95, 1.05) high_price = max(open_price, prices[i] * random.uniform(1.00, 1.10)) low_price = min(open_price, prices[i] * random.uniform(0.90, 0.99)) close_price = prices[i] volume = volumes[i-1] if i > 0 else avg_daily_volume stock_data.append({ "date": dates[i], "symbol": f"{symbol}-fake", "open": f"{open_price:.3f}", "high": f"{high_price:.3f}", "low": f"{low_price:.3f}", "close": f"{close_price:.3f}", "volume": volume }) return stock_data def _write_to_csv(self, data, output=None): if output: output_file = output else: output_file = self.csv_output_file with open(output_file, "a") as file: fieldnames = ["date", "symbol", "open", "high", "low", "close", "volume"] writer = csv.DictWriter(file, fieldnames=fieldnames) if file.tell() == 0: # Check if new file and write header. writer.writeheader() writer.writerows(data) if __name__ == "__main__": generator = StockDataGenerator() generator.generate_raw_data()