-
Notifications
You must be signed in to change notification settings - Fork 0
/
WeatherMachine.py
236 lines (201 loc) · 9.04 KB
/
WeatherMachine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import argparse
import re
from urllib.parse import quote_plus
import requests
DATA_HEADER_BASIC = {"F": ["Date", "Weather", "Temperature(F°)", "Feels Like", "Humidity", "UV Index", "Wind Direction",
"Wind Speed"],
"C": ["Date", "Weather", "Temperature(C°)", "Feels Like", "Humidity", "UV Index", "Wind Direction",
"Wind Speed"]}
DAILY_DATA_HEADER = {"F": ["Date", "Average Temperature(F°)", "Max Temperature", "Min Temperature", "UV Index"],
"C": ["Date", "Average Temperature(C°)", "Max Temperature", "Min Temperature", "UV Index"]} #, "", "", "", ""
HOURLY_DATA_HEADER = {"F": ["Time", "Weather", "Temperature(F°)", "Feels Like", "Humidity", "UV Index",
"Wind Chill","Wind Direction", "Wind Speed"],
"C": ["Time", "Weather", "Temperature(C°)", "Feels Like", "Humidity", "UV Index",
"Wind Chill", "Wind Direction", "Wind Speed"]}
def sendError(output):
output.insert(0, "ERROR")
def printOutput(line, output):
output.append(line)
return output
def dumpOutput(output):
if output:
for item in output:
print(item)
def getColumnWidth(header, data):
column_widths = [max(len(str(item)) for item in column) for column in zip(header, data)]
return column_widths
def printBorder(column_widths, output):
output = printOutput("*" + "*".join("-" * (width + 2) for width in column_widths) + "*", output)
return output
def printTable(headers, data, output):
# UNDERLINE = "\033[4m"
# RESET = "\033[0m"
columnWidths = [max(len(str(item)) for item in column) for column in zip(headers, *data)]
header_row = "|" + "|".join(f" {headers[i].ljust(columnWidths[i])} " for i in range(len(headers))) + "|"
separator_row = "|" + "|".join("_" * (width + 2) for width in columnWidths) + "|"
# if len(data) <= 1:
# UNDERLINE, RESET = '', '' # We don't need these if there's only one line of data
data_rows = [
"|" + "|".join(f" {str(row[i]).ljust(columnWidths[i])} " for i in range(len(row))) + "|"
# UNDERLINE + "|" + "|".join(f" {str(row[i]).ljust(columnWidths[i])} " for i in range(len(row))) + "|" + RESET
for row in data
]
# Print the table
output = printBorder(columnWidths, output)
output = printOutput(header_row, output)
output = printOutput(separator_row, output)
for row in data_rows:
output = printOutput(row, output)
output = printBorder(columnWidths, output)
return output
def getJsonFromUrl(url, output):
try:
response = requests.get(url)
response.raise_for_status() # This will raise an HTTPError for bad responses (4xx and 5xx)
data = response.json()
return data
except requests.exceptions.RequestException as e:
# Handle any errors that occurred during the request
sendError(output)
printOutput(f"An error occurred: {e}", output)
return None
def roundCoordinates(input_str):
def round_match(match):
number = float(match.group())
return f"{number:.2f}"
result_str = re.sub(r"\d+\.\d+", round_match, input_str)
result_str = re.sub(r"(\.\d*?[1-9])0+$", r"\1", result_str)
result_str = re.sub(r"\.(?=\D|$)", "", result_str)
return result_str
def timeFormat(time):
mid = len(time) // 2
# Split the string into two halves
firstHalf = time[:mid]
while len(firstHalf) < 2:
firstHalf = "0"+firstHalf
secondHalf = time[mid:]
while len(secondHalf) < 2:
secondHalf = "0"+secondHalf
# Join the halves with a colon
result = firstHalf + ':' + secondHalf
return result
def getPublicIP():
try:
response = requests.get("https://api.ipify.org?format=json")
ip = response.json().get("ip")
return ip
except requests.RequestException as e:
return None
def getLocation(ip):
try:
response = requests.get(
f"https://ipinfo.io/{ip}/json") # will get location, could be more precise. Unsure if effective on cellular networks
data = response.json()
location = data.get("loc", "Location not found")
return roundCoordinates(location)
except requests.RequestException as e:
return {"error": str(e)}
def formatDaily(weather, unit="F"):
data = [weather['date'], weather[f'avgtemp{unit}'],
weather[f'maxtemp{unit}'], weather[f'mintemp{unit}'], weather['uvIndex']]
return data
def formatOneDay(weather, unit):
if unit == "F":
speedUnits = "Miles"
else:
speedUnits = "Kmph"
data = [weather['localObsDateTime'], weather['weatherDesc'][0]['value'], weather[f'temp_{unit}'],
weather[f'FeelsLike{unit}'], weather['humidity'], weather['uvIndex'], weather['winddir16Point'],
weather[f'windspeed{speedUnits}']]
return data
def formatHourly(weather, unit):
if unit == "F":
speedUnits = "Miles"
else:
speedUnits = "Kmph"
data = [timeFormat(weather['time']), weather['weatherDesc'][0]['value'], weather[f'temp{unit}'],
weather[f'FeelsLike{unit}'], weather['humidity'], weather['uvIndex'], weather[f'WindChill{unit}'], weather['winddir16Point'],
weather[f'windspeed{speedUnits}']]
return data
def getData(args, output): # fetch a weather forecast from a city
data = getJsonFromUrl(f'https://wttr.in/{quote_plus(args.Location)}?format=j1', output)
if data is None or data['request'][0]['query'] == "Ban Not": # wttr api redirects to Ban Not if not found. documentation was unclear why
sendError(output)
printOutput(f'Location provided could not be found. Try another format as shown in https://wttr.in/:help', output)
return
if args.m:
units = "C"
else:
units = "F"
location = data['nearest_area'][0]['areaName'][0]['value']
output = printOutput(f"Weather in {location}", output)
if args.f:
buffer = []
for daily in data['weather']:
buffer.append(formatDaily(daily, units)) #.extend(["", "", "", ""])
output = printTable(DAILY_DATA_HEADER[units], buffer, output)
buffer = []
for hourly in daily['hourly']:
buffer.append(formatHourly(hourly, units))
output = printTable(HOURLY_DATA_HEADER[units], buffer, output)
buffer = []
elif not args.f and args.H:
buffer = []
buffer.append(formatOneDay(data['current_condition'][0], units))
output = printTable(DATA_HEADER_BASIC[units], buffer, output)
buffer = []
for hourly in data['weather'][0]['hourly']:
buffer.append(formatHourly(hourly, units))
output = printTable(HOURLY_DATA_HEADER[units], buffer, output)
else:
data = formatOneDay(data['current_condition'][0], units)
output = printTable(DATA_HEADER_BASIC[units], [data], output)
return output
def getArgs(parser, input):
args = parser.parse_args(input)
# Check if -L is not used and Location is empty
if not args.l and len(args.Location) == 0:
parser.error("Location is required unless the -L flag is used.")
# possibly flag an error if Location is given and -L is used. will default to text first currently.
# elif args.l and not len(args.Location) == 0:
# parser.error("Location is not used if the -L flag is used.")
elif len(args.Location) > 0:
args.Location = ' '.join(args.Location) # Combine words into a single string
elif args.l:
ip = getPublicIP()
if ip:
args.Location = getLocation(ip)
else:
parser.error("Device Location is not found. Couldn't get IP. Please check your settings")
return args
def getParser():
parser = argparse.ArgumentParser(description="Process a location string with optional flags.")
parser.add_argument("-f", action="store_true",
help="-f Multi-Day Forecast") # wanted to do 5-day forecast, accidentally picked an API that didn't have more than 3 days
parser.add_argument("-H", action="store_true", help="-H Daily Hourly Forecast")
parser.add_argument("-l", action="store_true", help="-l get Current Location")
parser.add_argument("-m", action="store_true", help="-m use Metric")
parser.add_argument("Location", type=str, nargs='*', help="Location string (required unless -l is used)")
return parser
def run(input):
output = []
output = getData(input, output)
dumpOutput(output)
return output
if __name__ == '__main__':
parser = getParser()
while True:
try:
# Get input from the user
input_str = input("Enter a Location and flags. Type 'Exit' to Exit: ")
output = []
args = getArgs(parser, input_str.split())
if args.Location == "Exit":
raise KeyboardInterrupt
getData(args, output)
dumpOutput(output)
except KeyboardInterrupt:
print("\nExiting...")
break
except SystemExit:
pass # To prevent argparse from closing the loop